4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / http_server.py PY
"""
Simple HTTP Server to exfil JuiceBox files via POST command.

This module is a python3 version of the file_upload_server.py found on:
https://docs.silabs.com/gecko-os/4/standard/4.2/cmd/apps/http-upload.

The direct reference is:
https://docs.silabs.com/resources/gecko-os/examples/http_upload/file_upload_server.py
"""

import os
import posixpath
import http.server
import urllib.parse
import cgi
import shutil
import socketserver
import socket
import sys
from io import BytesIO

DOMAIN = "0.0.0.0"
PORT = 8000
if len(sys.argv) > 2:
    PORT = int(sys.argv[2])
    DOMAIN = sys.argv[1]
elif len(sys.argv) > 1:
    PORT = int(sys.argv[1])

DEBUG = False


class SimpleHTTPRequestHandler(http.server.BaseHTTPRequestHandler):
    """Simple HTTP request handler with POST command.
    Can receive a file uploaded by the client.
    """

    server_version = "SimpleHTTPWithUpload/"

    def do_GET(self):
        """Serve a GET request."""
        infoPage = '''<!DOCTYPE html>
<html>
<head>
<title>File Upload Server</title>
</head>
<style>body {font-family:Calibri,Arial,Helvetica,Sans-Serif;} a { text-decoration: none;}</style>
<body>
<h1>File Upload Server</h1>
</body></html>
'''
        # Encode the infoPage string to bytes before writing to BytesIO
        f = BytesIO()  # Use BytesIO for binary data
        f.write(infoPage.encode('utf-8'))  # Encode string to bytes
        length = f.tell()
        f.seek(0)
        self.send_response(200)
        self.send_header("Content-type", "text/html")
        self.send_header("Content-Length", str(length))
        self.end_headers()
        if f:
            self.copyfile(f, self.wfile)
            f.close()

    def do_POST(self):
        """Serve a POST request."""
        r, info = self.handle_post_data()
        print("{}\nfrom host:{} port:{}".format(info, self.client_address[0], self.client_address[1]))
        f = BytesIO()  # Use BytesIO for binary data
        f.write('POST result\n'.encode('utf-8'))  # Encode string to bytes
        if r:
            f.write("Success".encode('utf-8'))  # Encode string to bytes
        else:
            f.write("Failure".encode('utf-8'))  # Encode string to bytes
        f.write(info.encode('utf-8'))  # Encode string to bytes
        length = f.tell()
        f.seek(0)
        self.send_response(200)
        self.send_header("Content-type", "text/plain")
        self.send_header("Content-Length", str(length))
        self.end_headers()
        if f:
            self.copyfile(f, self.wfile)
            f.close()

    def handle_post_data(self):
        content_type_header = self.headers.get('Content-Type')
        if not content_type_header:
            return (False, "Content-Type header missing.")

        ctype, pdict = cgi.parse_header(content_type_header)

        if ctype != 'multipart/form-data':
            return (False, "Content-Type must be multipart/form-data.")

        if 'boundary' not in pdict:
            return (False, "Boundary not found in Content-Type header.")

        boundary = pdict['boundary'].encode('utf-8')

        remainbytes = int(self.headers['content-length'])

        # Read the first boundary line
        line = self.rfile.readline()
        if DEBUG:
            print('line:{}'.format(line))
        remainbytes -= len(line)
        if not boundary in line:
            return (False, "Content does NOT begin with boundary")

        # Read the Content-Disposition line
        disposition_line_bytes = self.rfile.readline()
        if DEBUG:
            print('disposition_line_bytes:{}'.format(disposition_line_bytes))
        remainbytes -= len(disposition_line_bytes)

        # Decode the disposition line to string for cgi.parse_header
        try:
            disposition_line_str = disposition_line_bytes.decode('utf-8')
            _, disp_params = cgi.parse_header(disposition_line_str)
            filename_str = disp_params.get('filename')

            if not filename_str:
                return (False, "Unable to determine file name from Content-Disposition header.")

            # The filename might contain path separators, so just take the basename
            filename_basename = os.path.basename(filename_str)

            path = self.translate_path(self.path)
            fn = os.path.join(path, filename_basename)

        except Exception as e:
            return (False, f"Error parsing Content-Disposition header: {e}")

        # Read the Content-Type of the file (e.g., image/jpeg) - usually the next line
        line = self.rfile.readline()
        if DEBUG:
            print('line:{}'.format(line))
        remainbytes -= len(line)

        # Read the empty line separating headers from content
        line = self.rfile.readline()
        if DEBUG:
            print('line:{}'.format(line))
        remainbytes -= len(line)

        try:
            out = open(fn, 'wb')
        except IOError:
            return (False, "Unable to create file {} for write.".format(fn))

        preline = self.rfile.readline()
        if DEBUG:
            print('preline:{}'.format(preline))
        remainbytes -= len(preline)
        while remainbytes > 0:
            line = self.rfile.readline()
            if DEBUG:
                print('line:{}'.format(line))
            remainbytes -= len(line)
            if boundary in line:
                preline = preline[0:-1]
                if preline.endswith(b'\r'):
                    preline = preline[0:-1]
                out.write(preline)
                out.close()
                return (True, "Successful Upload:\nFile:{}".format(fn))
            else:
                out.write(preline)
                preline = line
        return (False, "Unexpected end of data.")

    def translate_path(self, path):
        """Translate a /-separated PATH to the local filename syntax."""
        # abandon query parameters
        path = path.split('?', 1)[0]
        path = path.split('#', 1)[0]
        path = posixpath.normpath(urllib.parse.unquote(path))
        words = path.split('/')
        words = list(filter(None, words))
        path = os.getcwd()
        for word in words:
            _, word = os.path.splitdrive(word)
            _, word = os.path.split(word)
            if word in (os.curdir, os.pardir):
                continue
            path = os.path.join(path, word)
        return path

    def copyfile(self, source, outputfile):
        shutil.copyfileobj(source, outputfile)


def main():

    Handler = SimpleHTTPRequestHandler
    httpd = socketserver.TCPServer(("", PORT), Handler)
    print("file_upload_server.py")
    print("You can specify server port as an argument. Default port:8000")
    print("Serving at: http://{domain}:{port}".format(domain=DOMAIN or "localhost", port=PORT))
    print("<Server IP>: {}".format(socket.gethostbyname(socket.gethostname())))
    httpd.serve_forever()


if __name__ == '__main__':
    main()