README.md
Rendering markdown...
#!/usr/bin/env python3
"""
CVE-2025-20393 Scanner
Cisco Secure Email Gateway & Email and Web Manager RCE Vulnerability Scanner
Author: thesystemowner
Version: 1.0
Date: December 2025
Description:
This scanner detects Cisco SEG/SEWM appliances vulnerable to CVE-2025-20393,
a critical unauthenticated RCE vulnerability in Cisco AsyncOS affecting
devices with Spam Quarantine feature exposed to the internet.
CVSS Score: 10.0 (Critical)
CWE-20: Improper Input Validation
Active exploitation in the wild by UAT-9686 (Chinese APT)
"""
import argparse
import requests
import socket
import sys
import json
import re
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urlparse
import warnings
warnings.filterwarnings('ignore')
class Colors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
class CVE202520393Scanner:
def __init__(self, timeout=10, user_agent=None):
self.timeout = timeout
self.user_agent = user_agent or "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
self.session = requests.Session()
self.session.verify = False
self.session.headers.update({'User-Agent': self.user_agent})
# Known Spam Quarantine ports (default and common)
self.spam_quarantine_ports = [6025, 82, 443, 8443]
# IOCs from Cisco Talos UAT-9686 analysis
self.iocs = {
'aquashell_signatures': [
b'aquashell',
b'AquaShell',
b'python.*backdoor'
],
'aquatunnel_signatures': [
b'aquatunnel',
b'reverse.*ssh',
b'chisel'
],
'aquapurge_signatures': [
b'aquapurge',
b'log.*clear'
]
}
def print_banner(self):
banner = f"""
{Colors.OKCYAN}╔═══════════════════════════════════════════════════════════════╗
║ CVE-2025-20393 Vulnerability Scanner ║
║ Cisco Secure Email Gateway & Email and Web Manager ║
║ Unauthenticated RCE Scanner ║
║ ║
║ CVSS: 10.0 (Critical) | CWE-20: Improper Input Validation ║
║ Active Exploitation: UAT-9686 (Chinese APT) ║
╚═══════════════════════════════════════════════════════════════╝{Colors.ENDC}
"""
print(banner)
def banner_grab(self, target, port):
"""Grab server banner to identify Cisco AsyncOS"""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(self.timeout)
sock.connect((target, port))
sock.send(b"HEAD / HTTP/1.1\r\nHost: " + target.encode() + b"\r\n\r\n")
banner = sock.recv(1024).decode('utf-8', errors='ignore')
return banner
except Exception as e:
return None
def check_cisco_product(self, target, port):
"""Check if target is a Cisco SEG/SEWM appliance"""
try:
url = f"https://{target}:{port}"
response = self.session.get(url, timeout=self.timeout, allow_redirects=True)
# Check for Cisco AsyncOS indicators
indicators = [
'cisco',
'ironport',
'asyncos',
'secure email gateway',
'secure email and web manager',
'spam quarantine'
]
content = response.text.lower()
headers = str(response.headers).lower()
matches = []
for indicator in indicators:
if indicator in content or indicator in headers:
matches.append(indicator)
if matches:
return {
'is_cisco': True,
'indicators': matches,
'status_code': response.status_code,
'server_header': response.headers.get('Server', 'Unknown')
}
except Exception as e:
pass
return {'is_cisco': False}
def check_spam_quarantine(self, target, port):
"""Check if Spam Quarantine interface is accessible"""
paths = [
'/quarantine',
'/spamquarantine',
'/spam',
'/sma-login',
'/login'
]
for path in paths:
try:
url = f"https://{target}:{port}{path}"
response = self.session.get(url, timeout=self.timeout, allow_redirects=False)
# Check for spam quarantine indicators
if response.status_code in [200, 301, 302]:
content = response.text.lower()
if any(keyword in content for keyword in ['quarantine', 'spam', 'cisco', 'ironport']):
return {
'exposed': True,
'path': path,
'status_code': response.status_code,
'url': url
}
except Exception as e:
continue
return {'exposed': False}
def check_version_disclosure(self, target, port):
"""Attempt to identify AsyncOS version"""
try:
url = f"https://{target}:{port}"
response = self.session.get(url, timeout=self.timeout)
# Common version disclosure patterns
version_patterns = [
r'AsyncOS\s+(\d+\.\d+\.\d+)',
r'Version:\s*(\d+\.\d+\.\d+)',
r'asyncos[_-](\d+\.\d+\.\d+)'
]
for pattern in version_patterns:
match = re.search(pattern, response.text, re.IGNORECASE)
if match:
return {
'version_found': True,
'version': match.group(1)
}
except Exception as e:
pass
return {'version_found': False}
def scan_target(self, target):
"""Comprehensive scan of a single target"""
print(f"\n{Colors.OKBLUE}[*] Scanning: {target}{Colors.ENDC}")
results = {
'target': target,
'timestamp': datetime.now().isoformat(),
'vulnerable': False,
'risk_level': 'Unknown',
'findings': []
}
# Check common ports
for port in self.spam_quarantine_ports:
print(f" {Colors.OKCYAN}[+] Checking port {port}...{Colors.ENDC}")
# Banner grabbing
banner = self.banner_grab(target, port)
if banner and 'cisco' in banner.lower():
results['findings'].append(f"Cisco banner detected on port {port}")
# Check if it's a Cisco product
cisco_check = self.check_cisco_product(target, port)
if cisco_check['is_cisco']:
print(f" {Colors.OKGREEN}[✓] Cisco AsyncOS product detected!{Colors.ENDC}")
print(f" {Colors.WARNING} Indicators: {', '.join(cisco_check['indicators'])}{Colors.ENDC}")
results['findings'].append(f"Cisco product identified on port {port}: {cisco_check['indicators']}")
# Check for exposed Spam Quarantine
spam_check = self.check_spam_quarantine(target, port)
if spam_check['exposed']:
print(f" {Colors.FAIL}[!] SPAM QUARANTINE EXPOSED: {spam_check['url']}{Colors.ENDC}")
results['vulnerable'] = True
results['risk_level'] = 'CRITICAL'
results['findings'].append(f"Spam Quarantine exposed at {spam_check['url']}")
# Check version
version_check = self.check_version_disclosure(target, port)
if version_check['version_found']:
print(f" {Colors.WARNING}[!] Version detected: {version_check['version']}{Colors.ENDC}")
results['findings'].append(f"AsyncOS version: {version_check['version']}")
return results
def generate_report(self, results, output_file=None):
"""Generate vulnerability report"""
report = {
'scan_info': {
'cve': 'CVE-2025-20393',
'severity': 'CRITICAL',
'cvss_score': 10.0,
'description': 'Cisco Secure Email Gateway & Email and Web Manager Unauthenticated RCE',
'threat_actor': 'UAT-9686 (Chinese APT)',
'exploitation_status': 'Active in the wild',
'scan_date': datetime.now().isoformat()
},
'results': results
}
# Print summary
vulnerable_targets = [r for r in results if r['vulnerable']]
print(f"\n{Colors.BOLD}{'='*60}{Colors.ENDC}")
print(f"{Colors.HEADER}[SCAN SUMMARY]{Colors.ENDC}")
print(f"{Colors.BOLD}{'='*60}{Colors.ENDC}")
print(f"Total targets scanned: {len(results)}")
print(f"{Colors.FAIL}Vulnerable targets: {len(vulnerable_targets)}{Colors.ENDC}")
print(f"{Colors.OKGREEN}Secure targets: {len(results) - len(vulnerable_targets)}{Colors.ENDC}")
if vulnerable_targets:
print(f"\n{Colors.FAIL}{Colors.BOLD}[!] CRITICAL VULNERABILITIES FOUND:{Colors.ENDC}")
for target in vulnerable_targets:
print(f" • {target['target']} - Risk Level: {target['risk_level']}")
for finding in target['findings']:
print(f" - {finding}")
# Save to file
if output_file:
with open(output_file, 'w') as f:
json.dump(report, f, indent=4)
print(f"\n{Colors.OKGREEN}[+] Report saved to: {output_file}{Colors.ENDC}")
return report
def scan_targets(self, targets, threads=10):
"""Scan multiple targets concurrently"""
results = []
with ThreadPoolExecutor(max_workers=threads) as executor:
futures = {executor.submit(self.scan_target, target): target for target in targets}
for future in as_completed(futures):
try:
result = future.result()
results.append(result)
except Exception as e:
target = futures[future]
print(f"{Colors.FAIL}[!] Error scanning {target}: {str(e)}{Colors.ENDC}")
return results
def parse_targets(target_input):
"""Parse target input (single target, file, or CIDR)"""
targets = []
if '/' in target_input: # CIDR notation
try:
import ipaddress
network = ipaddress.ip_network(target_input, strict=False)
targets = [str(ip) for ip in network.hosts()]
except Exception as e:
print(f"{Colors.FAIL}[!] Invalid CIDR notation: {str(e)}{Colors.ENDC}")
sys.exit(1)
elif target_input.startswith('http'):
parsed = urlparse(target_input)
targets = [parsed.netloc]
else:
try:
with open(target_input, 'r') as f:
targets = [line.strip() for line in f if line.strip()]
except FileNotFoundError:
targets = [target_input]
return targets
def main():
parser = argparse.ArgumentParser(
description='CVE-2025-20393 Scanner - Cisco AsyncOS Spam Quarantine RCE',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s -t 192.168.1.100
%(prog)s -t targets.txt -o report.json
%(prog)s -t 10.0.0.0/24 --threads 20
%(prog)s -t mail.company.com --timeout 15
IOCs for CVE-2025-20393:
- AquaShell (Python backdoor)
- AquaTunnel (Reverse SSH tunnel)
- Chisel (Tunneling tool)
- AquaPurge (Log clearing utility)
"""
)
parser.add_argument('-t', '--target', required=True,
help='Target IP, hostname, file, or CIDR range')
parser.add_argument('-o', '--output',
help='Output JSON report file')
parser.add_argument('--timeout', type=int, default=10,
help='Connection timeout in seconds (default: 10)')
parser.add_argument('--threads', type=int, default=10,
help='Number of concurrent threads (default: 10)')
parser.add_argument('--user-agent',
help='Custom User-Agent string')
args = parser.parse_args()
scanner = CVE202520393Scanner(
timeout=args.timeout,
user_agent=args.user_agent
)
scanner.print_banner()
targets = parse_targets(args.target)
print(f"\n{Colors.OKBLUE}[*] Loaded {len(targets)} target(s){Colors.ENDC}")
print(f"\n{Colors.WARNING}[!] Disclaimer: This tool is for authorized security testing only.{Colors.ENDC}")
print(f"{Colors.WARNING}[!] Unauthorized access to computer systems is illegal.{Colors.ENDC}\n")
try:
input(f"{Colors.OKGREEN}Press ENTER to start scanning...{Colors.ENDC}")
except KeyboardInterrupt:
print(f"\n{Colors.FAIL}[!] Scan cancelled by user{Colors.ENDC}")
sys.exit(0)
results = scanner.scan_targets(targets, threads=args.threads)
scanner.generate_report(results, args.output)
print(f"\n{Colors.OKCYAN}[*] Scan completed!{Colors.ENDC}")
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print(f"\n{Colors.FAIL}[!] Interrupted by user{Colors.ENDC}")
sys.exit(0)