4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / cve-2025-20393.py PY
#!/usr/bin/env python3
"""
CVE-2025-20393 Scanner
Cisco Secure Email Gateway & Email and Web Manager RCE Vulnerability Scanner
Author: thesystemowner
Version: 1.0
Date: December 2025

Description:
    This scanner detects Cisco SEG/SEWM appliances vulnerable to CVE-2025-20393,
    a critical unauthenticated RCE vulnerability in Cisco AsyncOS affecting
    devices with Spam Quarantine feature exposed to the internet.
    
    CVSS Score: 10.0 (Critical)
    CWE-20: Improper Input Validation
    Active exploitation in the wild by UAT-9686 (Chinese APT)
"""

import argparse
import requests
import socket
import sys
import json
import re
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urlparse
import warnings
warnings.filterwarnings('ignore')

class Colors:
    HEADER = '\033[95m'
    OKBLUE = '\033[94m'
    OKCYAN = '\033[96m'
    OKGREEN = '\033[92m'
    WARNING = '\033[93m'
    FAIL = '\033[91m'
    ENDC = '\033[0m'
    BOLD = '\033[1m'
    UNDERLINE = '\033[4m'

class CVE202520393Scanner:
    def __init__(self, timeout=10, user_agent=None):
        self.timeout = timeout
        self.user_agent = user_agent or "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
        self.session = requests.Session()
        self.session.verify = False
        self.session.headers.update({'User-Agent': self.user_agent})
        
        # Known Spam Quarantine ports (default and common)
        self.spam_quarantine_ports = [6025, 82, 443, 8443]
        
        # IOCs from Cisco Talos UAT-9686 analysis
        self.iocs = {
            'aquashell_signatures': [
                b'aquashell',
                b'AquaShell',
                b'python.*backdoor'
            ],
            'aquatunnel_signatures': [
                b'aquatunnel',
                b'reverse.*ssh',
                b'chisel'
            ],
            'aquapurge_signatures': [
                b'aquapurge',
                b'log.*clear'
            ]
        }
        
    def print_banner(self):
        banner = f"""
{Colors.OKCYAN}╔═══════════════════════════════════════════════════════════════╗
║          CVE-2025-20393 Vulnerability Scanner                 ║
║     Cisco Secure Email Gateway & Email and Web Manager        ║
║                  Unauthenticated RCE Scanner                  ║
║                                                               ║
║  CVSS: 10.0 (Critical) | CWE-20: Improper Input Validation   ║
║  Active Exploitation: UAT-9686 (Chinese APT)                 ║
╚═══════════════════════════════════════════════════════════════╝{Colors.ENDC}
"""
        print(banner)
    
    def banner_grab(self, target, port):
        """Grab server banner to identify Cisco AsyncOS"""
        try:
            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
                sock.settimeout(self.timeout)
                sock.connect((target, port))
                sock.send(b"HEAD / HTTP/1.1\r\nHost: " + target.encode() + b"\r\n\r\n")
                banner = sock.recv(1024).decode('utf-8', errors='ignore')
                return banner
        except Exception as e:
            return None
    
    def check_cisco_product(self, target, port):
        """Check if target is a Cisco SEG/SEWM appliance"""
        try:
            url = f"https://{target}:{port}"
            response = self.session.get(url, timeout=self.timeout, allow_redirects=True)
            
            # Check for Cisco AsyncOS indicators
            indicators = [
                'cisco',
                'ironport',
                'asyncos',
                'secure email gateway',
                'secure email and web manager',
                'spam quarantine'
            ]
            
            content = response.text.lower()
            headers = str(response.headers).lower()
            
            matches = []
            for indicator in indicators:
                if indicator in content or indicator in headers:
                    matches.append(indicator)
            
            if matches:
                return {
                    'is_cisco': True,
                    'indicators': matches,
                    'status_code': response.status_code,
                    'server_header': response.headers.get('Server', 'Unknown')
                }
        except Exception as e:
            pass
        
        return {'is_cisco': False}
    
    def check_spam_quarantine(self, target, port):
        """Check if Spam Quarantine interface is accessible"""
        paths = [
            '/quarantine',
            '/spamquarantine',
            '/spam',
            '/sma-login',
            '/login'
        ]
        
        for path in paths:
            try:
                url = f"https://{target}:{port}{path}"
                response = self.session.get(url, timeout=self.timeout, allow_redirects=False)
                
                # Check for spam quarantine indicators
                if response.status_code in [200, 301, 302]:
                    content = response.text.lower()
                    if any(keyword in content for keyword in ['quarantine', 'spam', 'cisco', 'ironport']):
                        return {
                            'exposed': True,
                            'path': path,
                            'status_code': response.status_code,
                            'url': url
                        }
            except Exception as e:
                continue
        
        return {'exposed': False}
    
    def check_version_disclosure(self, target, port):
        """Attempt to identify AsyncOS version"""
        try:
            url = f"https://{target}:{port}"
            response = self.session.get(url, timeout=self.timeout)
            
            # Common version disclosure patterns
            version_patterns = [
                r'AsyncOS\s+(\d+\.\d+\.\d+)',
                r'Version:\s*(\d+\.\d+\.\d+)',
                r'asyncos[_-](\d+\.\d+\.\d+)'
            ]
            
            for pattern in version_patterns:
                match = re.search(pattern, response.text, re.IGNORECASE)
                if match:
                    return {
                        'version_found': True,
                        'version': match.group(1)
                    }
        except Exception as e:
            pass
        
        return {'version_found': False}
    
    def scan_target(self, target):
        """Comprehensive scan of a single target"""
        print(f"\n{Colors.OKBLUE}[*] Scanning: {target}{Colors.ENDC}")
        
        results = {
            'target': target,
            'timestamp': datetime.now().isoformat(),
            'vulnerable': False,
            'risk_level': 'Unknown',
            'findings': []
        }
        
        # Check common ports
        for port in self.spam_quarantine_ports:
            print(f"  {Colors.OKCYAN}[+] Checking port {port}...{Colors.ENDC}")
            
            # Banner grabbing
            banner = self.banner_grab(target, port)
            if banner and 'cisco' in banner.lower():
                results['findings'].append(f"Cisco banner detected on port {port}")
            
            # Check if it's a Cisco product
            cisco_check = self.check_cisco_product(target, port)
            if cisco_check['is_cisco']:
                print(f"    {Colors.OKGREEN}[✓] Cisco AsyncOS product detected!{Colors.ENDC}")
                print(f"    {Colors.WARNING}    Indicators: {', '.join(cisco_check['indicators'])}{Colors.ENDC}")
                results['findings'].append(f"Cisco product identified on port {port}: {cisco_check['indicators']}")
                
                # Check for exposed Spam Quarantine
                spam_check = self.check_spam_quarantine(target, port)
                if spam_check['exposed']:
                    print(f"    {Colors.FAIL}[!] SPAM QUARANTINE EXPOSED: {spam_check['url']}{Colors.ENDC}")
                    results['vulnerable'] = True
                    results['risk_level'] = 'CRITICAL'
                    results['findings'].append(f"Spam Quarantine exposed at {spam_check['url']}")
                    
                    # Check version
                    version_check = self.check_version_disclosure(target, port)
                    if version_check['version_found']:
                        print(f"    {Colors.WARNING}[!] Version detected: {version_check['version']}{Colors.ENDC}")
                        results['findings'].append(f"AsyncOS version: {version_check['version']}")
        
        return results
    
    def generate_report(self, results, output_file=None):
        """Generate vulnerability report"""
        report = {
            'scan_info': {
                'cve': 'CVE-2025-20393',
                'severity': 'CRITICAL',
                'cvss_score': 10.0,
                'description': 'Cisco Secure Email Gateway & Email and Web Manager Unauthenticated RCE',
                'threat_actor': 'UAT-9686 (Chinese APT)',
                'exploitation_status': 'Active in the wild',
                'scan_date': datetime.now().isoformat()
            },
            'results': results
        }
        
        # Print summary
        vulnerable_targets = [r for r in results if r['vulnerable']]
        print(f"\n{Colors.BOLD}{'='*60}{Colors.ENDC}")
        print(f"{Colors.HEADER}[SCAN SUMMARY]{Colors.ENDC}")
        print(f"{Colors.BOLD}{'='*60}{Colors.ENDC}")
        print(f"Total targets scanned: {len(results)}")
        print(f"{Colors.FAIL}Vulnerable targets: {len(vulnerable_targets)}{Colors.ENDC}")
        print(f"{Colors.OKGREEN}Secure targets: {len(results) - len(vulnerable_targets)}{Colors.ENDC}")
        
        if vulnerable_targets:
            print(f"\n{Colors.FAIL}{Colors.BOLD}[!] CRITICAL VULNERABILITIES FOUND:{Colors.ENDC}")
            for target in vulnerable_targets:
                print(f"  • {target['target']} - Risk Level: {target['risk_level']}")
                for finding in target['findings']:
                    print(f"    - {finding}")
        
        # Save to file
        if output_file:
            with open(output_file, 'w') as f:
                json.dump(report, f, indent=4)
            print(f"\n{Colors.OKGREEN}[+] Report saved to: {output_file}{Colors.ENDC}")
        
        return report
    
    def scan_targets(self, targets, threads=10):
        """Scan multiple targets concurrently"""
        results = []
        
        with ThreadPoolExecutor(max_workers=threads) as executor:
            futures = {executor.submit(self.scan_target, target): target for target in targets}
            
            for future in as_completed(futures):
                try:
                    result = future.result()
                    results.append(result)
                except Exception as e:
                    target = futures[future]
                    print(f"{Colors.FAIL}[!] Error scanning {target}: {str(e)}{Colors.ENDC}")
        
        return results

def parse_targets(target_input):
    """Parse target input (single target, file, or CIDR)"""
    targets = []
    
    if '/' in target_input:  # CIDR notation
        try:
            import ipaddress
            network = ipaddress.ip_network(target_input, strict=False)
            targets = [str(ip) for ip in network.hosts()]
        except Exception as e:
            print(f"{Colors.FAIL}[!] Invalid CIDR notation: {str(e)}{Colors.ENDC}")
            sys.exit(1)
    elif target_input.startswith('http'):
        parsed = urlparse(target_input)
        targets = [parsed.netloc]
    else:
        try:
            with open(target_input, 'r') as f:
                targets = [line.strip() for line in f if line.strip()]
        except FileNotFoundError:
            targets = [target_input]
    
    return targets

def main():
    parser = argparse.ArgumentParser(
        description='CVE-2025-20393 Scanner - Cisco AsyncOS Spam Quarantine RCE',
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  %(prog)s -t 192.168.1.100
  %(prog)s -t targets.txt -o report.json
  %(prog)s -t 10.0.0.0/24 --threads 20
  %(prog)s -t mail.company.com --timeout 15

IOCs for CVE-2025-20393:
  - AquaShell (Python backdoor)
  - AquaTunnel (Reverse SSH tunnel)
  - Chisel (Tunneling tool)
  - AquaPurge (Log clearing utility)
        """
    )
    
    parser.add_argument('-t', '--target', required=True,
                        help='Target IP, hostname, file, or CIDR range')
    parser.add_argument('-o', '--output', 
                        help='Output JSON report file')
    parser.add_argument('--timeout', type=int, default=10,
                        help='Connection timeout in seconds (default: 10)')
    parser.add_argument('--threads', type=int, default=10,
                        help='Number of concurrent threads (default: 10)')
    parser.add_argument('--user-agent',
                        help='Custom User-Agent string')
    
    args = parser.parse_args()
    
    scanner = CVE202520393Scanner(
        timeout=args.timeout,
        user_agent=args.user_agent
    )
    
    scanner.print_banner()
    
    targets = parse_targets(args.target)
    print(f"\n{Colors.OKBLUE}[*] Loaded {len(targets)} target(s){Colors.ENDC}")
    
    print(f"\n{Colors.WARNING}[!] Disclaimer: This tool is for authorized security testing only.{Colors.ENDC}")
    print(f"{Colors.WARNING}[!] Unauthorized access to computer systems is illegal.{Colors.ENDC}\n")
    
    try:
        input(f"{Colors.OKGREEN}Press ENTER to start scanning...{Colors.ENDC}")
    except KeyboardInterrupt:
        print(f"\n{Colors.FAIL}[!] Scan cancelled by user{Colors.ENDC}")
        sys.exit(0)
    
    results = scanner.scan_targets(targets, threads=args.threads)
    scanner.generate_report(results, args.output)
    
    print(f"\n{Colors.OKCYAN}[*] Scan completed!{Colors.ENDC}")

if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print(f"\n{Colors.FAIL}[!] Interrupted by user{Colors.ENDC}")
        sys.exit(0)