4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / mock_server.py PY
#!/usr/bin/env python3
"""
Mock FastGPT Sandbox Server for POC Testing
============================================
This simulates a vulnerable FastGPT sandbox to test the POC.
"""

from http.server import HTTPServer, BaseHTTPRequestHandler
import json
import sys
import io
from contextlib import redirect_stdout, redirect_stderr

class VulnerableSandboxHandler(BaseHTTPRequestHandler):
    """Simulates a vulnerable FastGPT sandbox"""
    
    def log_message(self, format, *args):
        print(f"[SERVER] {args[0]}")
    
    def do_GET(self):
        if self.path == '/health':
            self.send_response(200)
            self.send_header('Content-type', 'application/json')
            self.end_headers()
            self.wfile.write(json.dumps({"status": "ok", "version": "4.9.0"}).encode())
        else:
            self.send_response(404)
            self.end_headers()
    
    def do_POST(self):
        if self.path == '/api/sandbox/run':
            content_length = int(self.headers['Content-Length'])
            post_data = self.rfile.read(content_length)
            
            try:
                data = json.loads(post_data.decode())
                code = data.get('code', '')
                
                print(f"[SANDBOX] Executing code: {code[:100]}...")
                
                # VULNERABLE: Execute code without proper sandboxing
                # This simulates the vulnerability in fastgpt-sandbox < 4.9.11
                result = self._execute_vulnerable(code)
                
                self.send_response(200)
                self.send_header('Content-type', 'application/json')
                self.end_headers()
                self.wfile.write(json.dumps({
                    "success": True,
                    "result": str(result),
                    "output": str(result)
                }).encode())
                
            except Exception as e:
                self.send_response(500)
                self.send_header('Content-type', 'application/json')
                self.end_headers()
                self.wfile.write(json.dumps({
                    "success": False,
                    "error": str(e)
                }).encode())
        else:
            self.send_response(404)
            self.end_headers()
    
    def _execute_vulnerable(self, code: str):
        """
        VULNERABLE code execution - simulates CVE-2025-49131
        In a real vulnerable sandbox, restrictions are insufficient
        """
        # Capture stdout
        stdout_capture = io.StringIO()
        
        try:
            # Create execution environment (VULNERABLE - no restrictions)
            exec_globals = {
                '__builtins__': __builtins__,
                '__name__': '__sandbox__',
            }
            exec_locals = {}
            
            # Try to evaluate as expression first
            try:
                with redirect_stdout(stdout_capture):
                    result = eval(code, exec_globals, exec_locals)
                return result
            except SyntaxError:
                # If not an expression, execute as statements
                with redirect_stdout(stdout_capture):
                    exec(code, exec_globals, exec_locals)
                
                # Check for result variable
                if 'result' in exec_locals:
                    return exec_locals['result']
                
                output = stdout_capture.getvalue()
                return output if output else "Code executed successfully"
                
        except Exception as e:
            return f"Error: {str(e)}"


def run_server(port=3001):
    server_address = ('', port)
    httpd = HTTPServer(server_address, VulnerableSandboxHandler)
    print(f"[*] Starting vulnerable mock sandbox on port {port}")
    print(f"[*] Simulating CVE-2025-49131 (fastgpt-sandbox < 4.9.11)")
    print(f"[*] Server ready for POC testing...")
    print("-" * 50)
    httpd.serve_forever()


if __name__ == '__main__':
    port = int(sys.argv[1]) if len(sys.argv) > 1 else 3001
    run_server(port)