4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / frida_util.py PY
#!/usr/bin/env python3

__author__ = 'Chariton Karamitas <[email protected]>'


import sys
import os
import collections
import random
import time
import multiprocessing
import re
import xml.dom.minidom


try:
    import frida
except ImportError:
    sys.exit('Please install frida-python and run again')

import common
import logger
import adb


import getopt


FridaInstance = collections.namedtuple('FrindaInstance', ['pid', 'address', 'port'])


class FridaServer(object):

    def __init__(self, serial=None, frida_path=None):
        super(FridaServer, self).__init__()
        self._adb = adb.ADBAbstractions(serial)
        self._logger = logger.get_logger(self.__class__.__name__)

        if frida_path is None:
            self._frida_path = self._get_frida_path()
        else:
            self._frida_path = frida_path

        self._frida_instance = None
        self._frida_process = None



    def __str__(self):
        return '<%s serial=%s>' % (self.__class__.__name__, self._adb.serial)



    def _get_listen_address(self, cmdline):

        options, _ = getopt.gnu_getopt(cmdline[1:], 'd:hl:vD',
            ['directory=', 'help', 'listen=', 'verbose', 'daemonize', 'version'])

        address = '0.0.0.0'
        port = '27042'

        for o, v in options:
            if o in ['-l', '--listen']:
                if ':' in v:
                    address, port = v.split(':')
                else:
                    address = v

        return address, port



    def _get_running_instance(self):

        instance = None

        basename = os.path.basename(self._frida_path)

        #
        # Enumerate all system processes and look for Frida servers running as
        # root and using the same ABI.
        #
        for line in self._adb.su(['ps', '-A', '-w', '-o', 'PID,UID,GID,BIT,NAME,CMDLINE']).splitlines()[1:]:

            pid, uid, gid, bits, name, cmdline = re.split('\s+', line.strip(), 5)

            if name == basename and uid == '0' and gid == '0' and bits == str(self.bits):
                address, port = self._get_listen_address(cmdline)

                self._logger.debug('Found Frida server instance pid=%s uid=%s, gid=%s, bits=%s at %s:%s',
                    pid, uid, gid, bits, address, port)

                instance = FridaInstance(pid=pid, address=address, port=port)

        return instance



    def start(self):

        if self._frida_instance is None:

            #
            # Enumerate Frida server instances.
            #
            instance = self._get_running_instance()
            process = None

            #
            # If no Frida server is currently running, attempt to spawn a new
            # instance.
            #
            if instance is None:
                self._logger.info('Attempting to spawn a new Frida server instance')
                process = multiprocessing.Process(target=self._adb.su, args=([self._frida_path],))
                process.start()

                time.sleep(2)
                instance = self._get_running_instance()

            #
            # Still no running instance? Means we failed to start the server.
            #
            if instance is None:
                raise RuntimeError('Frida server is not running and failed to spawn a new instance')

            self._frida_instance = instance
            self._frida_process = process



    def stop(self):

        if self._frida_process is not None:
            self._frida_process.terminate()
            self._frida_process.join()



class FridaServer64(FridaServer):

    def __init__(self, serial=None, frida_path=None):
        super(FridaServer64, self).__init__(serial=serial, frida_path=frida_path)
        self.bits = 64

    def _get_frida_path(self):

        frida_path = None
        for abi in self._adb.getprop('ro.product.cpu.abilist64').split(','):
            if abi.startswith('x86_64'):
                frida_path = '%s/frida-server-*-android-x86_64' % common.TMPDIR
            elif abi.startswith('arm64'):
                frida_path = '%s/frida-server-*-android-arm64' % common.TMPDIR

        if frida_path is None:
            raise RuntimeError('Unknown 64-bit ABIs')

        frida_path = self._adb.shell(['echo', '-n', frida_path])

        if '*' in frida_path:
            raise RuntimeError('Frida server for 64-bit not found at "%s"' % frida_path)

        self._logger.info('Auto detected Frida server for 64-bit at "%s"', frida_path)

        return frida_path



class FridaServer32(FridaServer):

    def __init__(self, serial=None, frida_path=None):
        super(FridaServer32, self).__init__(serial=serial, frida_path=frida_path)
        self.bits = 32

    def _get_frida_path(self):

        frida_path = None
        for abi in self._adb.getprop('ro.product.cpu.abilist32').split(','):
            if abi.startswith('x86'):
                frida_path = '%s/frida-server-*-android-x86' % common.TMPDIR
            elif abi.startswith('arm'):
                frida_path = '%s/frida-server-*-android-arm' % common.TMPDIR

        if frida_path is None:
            raise RuntimeError('Unknown 32-bit ABIs')

        frida_path = self._adb.shell(['echo', '-n', frida_path])

        if '*' in frida_path:
            raise RuntimeError('Frida server for 32-bit not found at "%s"' % frida_path)

        self._logger.info('Auto detected Frida server for 32-bit at "%s"', frida_path)

        return frida_path



#
# A class implementing the factory design pattern for returning both 32-bit and
# 64-bit instances of Frida server, based on the ABI of the process that needs
# to be instrumented.
#
class FridaServerFactory(object):

    def __init__(self, serial=None):
        super(FridaServerFactory, self).__init__()
        self._logger = logger.get_logger(self.__class__.__name__)
        self._adb = adb.ADBAbstractions(serial)
        self._serial = serial
        self._frida_server32 = None
        self._frida_server64 = None

    def __str__(self):
        return '<FridaServerFactory serial=%s>' % self._serial


    def _get_frida_server64(self):

        if self._frida_server64 is None:
            self._logger.debug('Creating 64-bit Frida server instance')
            self._frida_server64 = FridaServer64(serial=self._serial)
        else:
            self._logger.debug('Returning existing 64-bit Frida server instance')

        return self._frida_server64


    def _get_frida_server32(self):

        if self._frida_server32 is None:
            self._logger.debug('Creating 32-bit Frida server instance')
            self._frida_server32 = FridaServer32(serial=self._serial)
        else:
            self._logger.debug('Returning existing 32-bit Frida server instance')

        return self._frida_server32


    def get_frida_server(self, process):

        server = None

        #
        # Enumerate all system processes and match `process' against the PID or
        # name of each item. We are only interested in determining the ABI, so,
        # we break on the first match.
        #
        for line in self._adb.su(['ps', '-A', '-w', '-o', 'PID,BIT,NAME']).splitlines()[1:]:
            pid, bits, name = re.split('\s+', line.strip(), 2)
            if pid == process or name == process:
                self._logger.debug('Matched %s-bit process "%s" (%s)', bits, name, pid)
                if bits == '64':
                    server = self._get_frida_server64()
                elif bits == '32':
                    server = self._get_frida_server32()
                break

        #
        # If `process' does not match the PID or name of a currently running
        # process, assume it's a package name and look it up in the package
        # database.
        #
        if server is None:
            data = self._adb.su(['cat', '/data/system/packages.xml'])

            dom = xml.dom.minidom.parseString(data)

            for package in dom.getElementsByTagName('package'):

                name = None
                if package.hasAttribute('name'):
                    name = package.attributes['name'].value

                if name == process and package.hasAttribute('primaryCpuAbi'):
                    abi = package.attributes['primaryCpuAbi'].value
                    self._logger.debug('Matched package "%s" ABI "%s"', name, abi)
                    if abi.startswith('arm64') or abi.startswith('x86_64'):
                        server = self._get_frida_server64()
                    elif abi.startswith('arm') or abi.startswith('x86'):
                        server = self._get_frida_server32()
                    break

        #
        # Looks like `process' matches neither a running process nor a package
        # name. We can do nothing.
        #
        if server is None:
            raise RuntimeError('No such process or package "%s"' % process)

        return server



class Frida(object):

    def __init__(self, serial=None):
        super(Frida, self).__init__()
        self._logger = logger.get_logger(self.__class__.__name__)
        self._frida_server_factory = FridaServerFactory(serial=serial)
        self._serial = serial
        self._frida_server = None
        self._device = None
        self._session = None

        found = False
        while not found:
            try:
                if serial is not None:
                    self._logger.info('Frida using device with serial "%s"', serial)
                    device = frida.get_device_manager().get_device(serial)
                else:
                    self._logger.info('No device serial given; using default USB device')
                    device = frida.get_device_manager().get_usb_device()

                found = True

            except frida.InvalidArgumentError as exception:
                self._logger.info('Frida exception "%s"; retrying in 2 secs.', str(exception))
                time.sleep(2)

        self._logger.info('Found device "%s" (%s) over %s' , device.name, device.id, device.type.upper())
        self._device = device


    def _receive_message(self, message, data):

        message_type = message['type']
        if message_type == 'error':
            self._logger.error('Frida error: %s (%s:%d)' % \
                (message['description'], message['fileName'], message['lineNumber']))
        elif message_type == 'send':
            self._logger.info(message['payload'])


    def load_script_file(self, filename):

        self._logger.info('Loading script %s', filename)

        script = None
        with open(filename) as fp:
            script = self._session.create_script(fp.read())
            script.on('message', self._receive_message)
            script.load()

        return script


    def load_script(self, script):

        self._logger.info('Loading script')
        script = self._session.create_script(script)
        script.on('message', self._receive_message)
        script.load()

        return script


    def unload_script(self, script):
        self._logger.info('Unloading script %s', str(script))
        script.unload()


    def attach(self, process):
        self._logger.debug('Preparing Frida server')
        self._frida_server = self._frida_server_factory.get_frida_server(process)
        self._frida_server.start()

        self._logger.debug('Attaching to process %s', str(process))
        self._session = self._device.attach(process)

        try:
            self._pid = int(process)
        except ValueError:
            self._pid = self._device.get_process(process).pid

        self._logger.debug('Attached to PID %d', self._pid)


    def spawn(self, name):
        self._logger.debug('Preparing Frida server')
        self._frida_server = self._frida_server_factory.get_frida_server(name)
        self._frida_server.start()

        self._logger.debug('Killing any previous instances of %s', name)
        try:
            self._device.kill(name)
        except frida.ProcessNotFoundError:
            self._logger.debug('No previous instances found')

        self._logger.debug('Spawning %s', name)
        self._pid = self._device.spawn(name)

        self._logger.debug('Attaching to PID %d', self._pid)
        self._session = self._device.attach(self._pid)


    def resume(self):
        self._logger.debug('Resuming PID %d', self._pid)
        self._device.resume(self._pid)


    def detach(self):
        self._logger.debug('Detaching from session %s', str(self._session))
        self._session.detach()
        self._session = None
        self._frida_server.stop()
        self._frida_server = None
        self._pid = None