README.md
Rendering markdown...
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
CVE-2021-30809 PoC Server
Server HTTP per testare un harness crash-inducing per UAF WebKit su PS4
"""
import http.server
import socketserver
import os
import sys
import datetime
import threading
import time
import socket
import urllib.parse
import ipaddress
# Configurazione del server
PORT = 8080
HOST = '0.0.0.0'
# Utility: verifica RFC1918
RFC1918_NETS = [
ipaddress.ip_network('10.0.0.0/8'),
ipaddress.ip_network('172.16.0.0/12'),
ipaddress.ip_network('192.168.0.0/16'),
]
def is_rfc1918_ipv4(ip: str) -> bool:
try:
ip_obj = ipaddress.ip_address(ip)
if ip_obj.version != 4:
return False
return any(ip_obj in net for net in RFC1918_NETS)
except Exception:
return False
def is_valid_lan_candidate(ip: str) -> bool:
try:
ip_obj = ipaddress.ip_address(ip)
if ip_obj.version != 4:
return False
if ip_obj.is_loopback or ip_obj.is_link_local or ip_obj.is_unspecified:
return False
# Escludi CGNAT 100.64.0.0/10
if ip_obj in ipaddress.ip_network('100.64.0.0/10'):
return False
return True
except Exception:
return False
# Ordine di preferenza: 192.168.x.x, 10.x.x.x, 172.16-31.x.x
def lan_priority(ip: str) -> int:
if ip.startswith('192.168.'):
return 0
if ip.startswith('10.'):
return 1
# 172.16.0.0 - 172.31.255.255
try:
ip_obj = ipaddress.ip_address(ip)
if ip_obj in ipaddress.ip_network('172.16.0.0/12'):
return 2
except Exception:
pass
return 3
# Enumerazione IP locali
def enumerate_local_ipv4():
addrs = set()
try:
# getaddrinfo del hostname
for fam, _, _, _, sockaddr in socket.getaddrinfo(socket.gethostname(), None):
if fam == socket.AF_INET:
addrs.add(sockaddr[0])
except Exception:
pass
try:
# Altri nomi eventuali
hn = socket.gethostname()
for ip in socket.gethostbyname_ex(hn)[2]:
addrs.add(ip)
except Exception:
pass
return list(addrs)
# Calcolo dinamico IP locale (preferisci SOLO RFC1918 validi)
try:
_s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
_s.connect(("1.1.1.1", 80))
detected_ip = _s.getsockname()[0]
except Exception:
detected_ip = '127.0.0.1'
finally:
try:
_s.close()
except Exception:
pass
candidates_raw = [detected_ip] + enumerate_local_ipv4()
# Filtra candidati validi
valid_candidates = [ip for ip in candidates_raw if is_valid_lan_candidate(ip)]
# Solo RFC1918
lan_candidates = [ip for ip in valid_candidates if is_rfc1918_ipv4(ip)]
lan_candidates = sorted(set(lan_candidates), key=lan_priority)
# Scelta finale
if lan_candidates:
SERVER_IP = lan_candidates[0]
else:
# Fallback: se detected_ip è valido e non link-local/loopback/CGNAT, usalo, altrimenti localhost
SERVER_IP = detected_ip if is_valid_lan_candidate(detected_ip) and not ipaddress.ip_address(detected_ip).is_loopback else 'localhost'
SERVER_URL = f"http://{SERVER_IP}:{PORT}"
class CustomHTTPRequestHandler(http.server.SimpleHTTPRequestHandler):
"""Custom HTTP handler con logging dettagliato"""
def log_message(self, format, *args):
"""Override del logging per aggiungere timestamp e dettagli e non crashare se headers non è disponibile"""
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
client_ip = None
try:
client_ip = self.client_address[0]
except Exception:
client_ip = "?"
try:
ua = None
hdrs = getattr(self, 'headers', None)
if hdrs is not None:
ua = hdrs.get('User-Agent', 'Unknown')
else:
ua = 'Unknown'
print(f"[{timestamp}] {client_ip} - {format % args}")
print(f"[{timestamp}] User-Agent: {ua}")
if ua and ('PlayStation' in ua or 'PS4' in ua):
print(f"[{timestamp}] *** PS4 BROWSER DETECTED! ***")
print(f"[{timestamp}] Full User-Agent: {ua}")
except Exception:
# fallback minimale
print(f"[{timestamp}] {client_ip} - {format % args}")
def _send_cors(self):
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
self.send_header('Access-Control-Allow-Headers', '*')
def do_OPTIONS(self):
self.send_response(204)
self._send_cors()
self.end_headers()
def do_GET(self):
"""Routing GET con headers corretti e senza inviare 200 prima del percorso"""
print(f"\n=== NEW REQUEST ===")
print(f"Path: {self.path}")
print(f"Client: {self.client_address[0]}:{self.client_address[1]}")
try:
if self.path == '/' or self.path == '/index.html':
self.send_response(200)
self._send_cors()
self.send_header('Content-Type', 'text/html; charset=utf-8')
self.end_headers()
with open('exploit.html', 'rb') as f:
self.wfile.write(f.read())
return
if self.path == '/poc.js':
self.send_response(200)
self._send_cors()
self.send_header('Content-Type', 'application/javascript')
self.end_headers()
with open('poc.js', 'rb') as f:
self.wfile.write(f.read())
return
if self.path == '/server-info.js':
self.send_response(200)
self._send_cors()
self.send_header('Content-Type', 'application/javascript')
self.end_headers()
payload = (
"window.SERVER_INFO = {"
f"ip: '{SERVER_IP}', port: {PORT}, url: '{SERVER_URL}', ips: {lan_candidates}"
"};"
)
self.wfile.write(payload.encode('utf-8'))
return
if self.path == '/ip':
self.send_response(200)
self._send_cors()
self.send_header('Content-Type', 'text/plain; charset=utf-8')
self.end_headers()
self.wfile.write(SERVER_IP.encode('utf-8'))
return
if self.path == '/favicon.ico':
# Evita 404 rumorosi
self.send_response(204)
self._send_cors()
self.end_headers()
return
# Non trovato
self.send_response(404)
self._send_cors()
self.send_header('Content-Type', 'text/plain; charset=utf-8')
self.end_headers()
self.wfile.write(b'Not Found')
except FileNotFoundError:
self.send_response(404)
self._send_cors()
self.send_header('Content-Type', 'text/plain; charset=utf-8')
self.end_headers()
self.wfile.write(b'File not found')
def do_POST(self):
"""Handler per POST requests (log dei crash)"""
try:
content_length = int(self.headers.get('Content-Length', 0) or 0)
except Exception:
content_length = 0
post_data = self.rfile.read(content_length) if content_length else b''
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"\n[{timestamp}] *** POST DATA RECEIVED ***")
try:
print(f"Data: {post_data.decode('utf-8', errors='ignore')}")
except Exception:
print("[WARN] Unable to decode POST data")
self.send_response(200)
self._send_cors()
self.send_header('Content-Type', 'text/plain; charset=utf-8')
self.end_headers()
self.wfile.write(b'OK')
def print_banner():
"""Stampa il banner di avvio"""
print("=" * 60)
print(" CVE-2021-30809 WebKit UAF PoC Server")
print(" Target: PlayStation 4 Browser (WebKit 605.1.15)")
print("=" * 60)
print(f"Server starting on {HOST}:{PORT}")
print(f"Access URL: {SERVER_URL}")
if lan_candidates:
print("LAN candidates:")
for ip in lan_candidates:
print(f" - http://{ip}:{PORT}")
else:
print("No RFC1918 LAN IP detected. Using fallback.")
print("Waiting for PS4 connections...")
print("=" * 60)
def monitor_connections():
"""Thread per monitorare le connessioni"""
while True:
time.sleep(30)
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"\n[{timestamp}] Server still running, waiting for connections... ({SERVER_URL})")
def main():
"""Funzione principale"""
print_banner()
try:
# Crea il server
with http.server.ThreadingHTTPServer((HOST, PORT), CustomHTTPRequestHandler) as httpd:
print(f"Server successfully started on {SERVER_URL}")
# Avvia thread di monitoraggio
monitor_thread = threading.Thread(target=monitor_connections, daemon=True)
monitor_thread.start()
# Avvia il server
httpd.serve_forever()
except KeyboardInterrupt:
print("\n\nServer shutting down...")
sys.exit(0)
except Exception as e:
print(f"Error starting server: {e}")
sys.exit(1)
if __name__ == '__main__':
main()