4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / server.py PY
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
CVE-2021-30809 PoC Server
Server HTTP per testare un harness crash-inducing per UAF WebKit su PS4
"""

import http.server
import socketserver
import os
import sys
import datetime
import threading
import time
import socket
import urllib.parse
import ipaddress

# Configurazione del server
PORT = 8080
HOST = '0.0.0.0'  
# Utility: verifica RFC1918
RFC1918_NETS = [
    ipaddress.ip_network('10.0.0.0/8'),
    ipaddress.ip_network('172.16.0.0/12'),
    ipaddress.ip_network('192.168.0.0/16'),
]

def is_rfc1918_ipv4(ip: str) -> bool:
    try:
        ip_obj = ipaddress.ip_address(ip)
        if ip_obj.version != 4:
            return False
        return any(ip_obj in net for net in RFC1918_NETS)
    except Exception:
        return False

def is_valid_lan_candidate(ip: str) -> bool:
    try:
        ip_obj = ipaddress.ip_address(ip)
        if ip_obj.version != 4:
            return False
        if ip_obj.is_loopback or ip_obj.is_link_local or ip_obj.is_unspecified:
            return False
        # Escludi CGNAT 100.64.0.0/10
        if ip_obj in ipaddress.ip_network('100.64.0.0/10'):
            return False
        return True
    except Exception:
        return False

# Ordine di preferenza: 192.168.x.x, 10.x.x.x, 172.16-31.x.x
def lan_priority(ip: str) -> int:
    if ip.startswith('192.168.'):
        return 0
    if ip.startswith('10.'):
        return 1
    # 172.16.0.0 - 172.31.255.255
    try:
        ip_obj = ipaddress.ip_address(ip)
        if ip_obj in ipaddress.ip_network('172.16.0.0/12'):
            return 2
    except Exception:
        pass
    return 3

# Enumerazione IP locali
def enumerate_local_ipv4():
    addrs = set()
    try:
        # getaddrinfo del hostname
        for fam, _, _, _, sockaddr in socket.getaddrinfo(socket.gethostname(), None):
            if fam == socket.AF_INET:
                addrs.add(sockaddr[0])
    except Exception:
        pass
    try:
        # Altri nomi eventuali
        hn = socket.gethostname()
        for ip in socket.gethostbyname_ex(hn)[2]:
            addrs.add(ip)
    except Exception:
        pass
    return list(addrs)

# Calcolo dinamico IP locale (preferisci SOLO RFC1918 validi)
try:
    _s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    _s.connect(("1.1.1.1", 80))
    detected_ip = _s.getsockname()[0]
except Exception:
    detected_ip = '127.0.0.1'
finally:
    try:
        _s.close()
    except Exception:
        pass

candidates_raw = [detected_ip] + enumerate_local_ipv4()
# Filtra candidati validi
valid_candidates = [ip for ip in candidates_raw if is_valid_lan_candidate(ip)]
# Solo RFC1918
lan_candidates = [ip for ip in valid_candidates if is_rfc1918_ipv4(ip)]
lan_candidates = sorted(set(lan_candidates), key=lan_priority)

# Scelta finale
if lan_candidates:
    SERVER_IP = lan_candidates[0]
else:
    # Fallback: se detected_ip è valido e non link-local/loopback/CGNAT, usalo, altrimenti localhost
    SERVER_IP = detected_ip if is_valid_lan_candidate(detected_ip) and not ipaddress.ip_address(detected_ip).is_loopback else 'localhost'

SERVER_URL = f"http://{SERVER_IP}:{PORT}"

class CustomHTTPRequestHandler(http.server.SimpleHTTPRequestHandler):
    """Custom HTTP handler con logging dettagliato"""
    
    def log_message(self, format, *args):
        """Override del logging per aggiungere timestamp e dettagli e non crashare se headers non è disponibile"""
        timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        client_ip = None
        try:
            client_ip = self.client_address[0]
        except Exception:
            client_ip = "?"
        try:
            ua = None
            hdrs = getattr(self, 'headers', None)
            if hdrs is not None:
                ua = hdrs.get('User-Agent', 'Unknown')
            else:
                ua = 'Unknown'
            print(f"[{timestamp}] {client_ip} - {format % args}")
            print(f"[{timestamp}] User-Agent: {ua}")
            if ua and ('PlayStation' in ua or 'PS4' in ua):
                print(f"[{timestamp}] *** PS4 BROWSER DETECTED! ***")
                print(f"[{timestamp}] Full User-Agent: {ua}")
        except Exception:
            # fallback minimale
            print(f"[{timestamp}] {client_ip} - {format % args}")
    
    def _send_cors(self):
        self.send_header('Access-Control-Allow-Origin', '*')
        self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
        self.send_header('Access-Control-Allow-Headers', '*')
    
    def do_OPTIONS(self):
        self.send_response(204)
        self._send_cors()
        self.end_headers()
    
    def do_GET(self):
        """Routing GET con headers corretti e senza inviare 200 prima del percorso"""
        print(f"\n=== NEW REQUEST ===")
        print(f"Path: {self.path}")
        print(f"Client: {self.client_address[0]}:{self.client_address[1]}")
        
        try:
            if self.path == '/' or self.path == '/index.html':
                self.send_response(200)
                self._send_cors()
                self.send_header('Content-Type', 'text/html; charset=utf-8')
                self.end_headers()
                with open('exploit.html', 'rb') as f:
                    self.wfile.write(f.read())
                return
            
            if self.path == '/poc.js':
                self.send_response(200)
                self._send_cors()
                self.send_header('Content-Type', 'application/javascript')
                self.end_headers()
                with open('poc.js', 'rb') as f:
                    self.wfile.write(f.read())
                return
            
            if self.path == '/server-info.js':
                self.send_response(200)
                self._send_cors()
                self.send_header('Content-Type', 'application/javascript')
                self.end_headers()
                payload = (
                    "window.SERVER_INFO = {"
                    f"ip: '{SERVER_IP}', port: {PORT}, url: '{SERVER_URL}', ips: {lan_candidates}"
                    "};"
                )
                self.wfile.write(payload.encode('utf-8'))
                return
            
            if self.path == '/ip':
                self.send_response(200)
                self._send_cors()
                self.send_header('Content-Type', 'text/plain; charset=utf-8')
                self.end_headers()
                self.wfile.write(SERVER_IP.encode('utf-8'))
                return
            
            if self.path == '/favicon.ico':
                # Evita 404 rumorosi
                self.send_response(204)
                self._send_cors()
                self.end_headers()
                return
            
            # Non trovato
            self.send_response(404)
            self._send_cors()
            self.send_header('Content-Type', 'text/plain; charset=utf-8')
            self.end_headers()
            self.wfile.write(b'Not Found')
        except FileNotFoundError:
            self.send_response(404)
            self._send_cors()
            self.send_header('Content-Type', 'text/plain; charset=utf-8')
            self.end_headers()
            self.wfile.write(b'File not found')
    
    def do_POST(self):
        """Handler per POST requests (log dei crash)"""
        try:
            content_length = int(self.headers.get('Content-Length', 0) or 0)
        except Exception:
            content_length = 0
        post_data = self.rfile.read(content_length) if content_length else b''
        
        timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        print(f"\n[{timestamp}] *** POST DATA RECEIVED ***")
        try:
            print(f"Data: {post_data.decode('utf-8', errors='ignore')}")
        except Exception:
            print("[WARN] Unable to decode POST data")
        
        self.send_response(200)
        self._send_cors()
        self.send_header('Content-Type', 'text/plain; charset=utf-8')
        self.end_headers()
        self.wfile.write(b'OK')

def print_banner():
    """Stampa il banner di avvio"""
    print("=" * 60)
    print("  CVE-2021-30809 WebKit UAF PoC Server")
    print("  Target: PlayStation 4 Browser (WebKit 605.1.15)")
    print("=" * 60)
    print(f"Server starting on {HOST}:{PORT}")
    print(f"Access URL: {SERVER_URL}")
    if lan_candidates:
        print("LAN candidates:")
        for ip in lan_candidates:
            print(f" - http://{ip}:{PORT}")
    else:
        print("No RFC1918 LAN IP detected. Using fallback.")
    print("Waiting for PS4 connections...")
    print("=" * 60)


def monitor_connections():
    """Thread per monitorare le connessioni"""
    while True:
        time.sleep(30)
        timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        print(f"\n[{timestamp}] Server still running, waiting for connections... ({SERVER_URL})")


def main():
    """Funzione principale"""
    print_banner()
    
    try:
        # Crea il server
        with http.server.ThreadingHTTPServer((HOST, PORT), CustomHTTPRequestHandler) as httpd:
            print(f"Server successfully started on {SERVER_URL}")
            
            # Avvia thread di monitoraggio
            monitor_thread = threading.Thread(target=monitor_connections, daemon=True)
            monitor_thread.start()
            
            # Avvia il server
            httpd.serve_forever()
            
    except KeyboardInterrupt:
        print("\n\nServer shutting down...")
        sys.exit(0)
    except Exception as e:
        print(f"Error starting server: {e}")
        sys.exit(1)

if __name__ == '__main__':
    main()