userprocess.py   [plain text]


import logging
import target
import struct

from xnu import *
from core.operating_system import Armv8_RegisterSet, Armv7_RegisterSet, I386_RegisterSet, X86_64RegisterSet

CPU_TYPE_I386 = 0x00000007
CPU_TYPE_X86_64 = 0x01000007
CPU_TYPE_ARM = 0x0000000c
CPU_TYPE_ARM64 = 0x0100000c


CPU_SUBTYPE_X86_64_ALL = 3
CPU_SUBTYPE_X86_64_H = 8
CPU_SUBTYPE_ARMV8 = 13
CPU_SUBTYPE_ARM_V7 = 9
CPU_SUBTYPE_ARM_V7S = 11
CPU_SUBTYPE_ARM_V7K = 12


def GetRegisterSetForCPU(cputype, subtype):
    retval = X86_64RegisterSet
    if cputype in (CPU_TYPE_ARM, CPU_TYPE_ARM64):
        if subtype == CPU_SUBTYPE_ARMV8:
            retval = Armv8_RegisterSet
        else:
            retval = Armv7_RegisterSet
    elif cputype == CPU_TYPE_I386:
        retval = I386_RegisterSet

    return retval.register_info['registers']


class UserThreadObject(object):
    """representation of userspace thread"""
    def __init__(self, thr_obj, cputype, cpusubtype, kern_cputype):
        super(UserThreadObject, self).__init__()
        self.thread = thr_obj
        self.registerset = GetRegisterSetForCPU(cputype, cpusubtype)
        self.thread_id = unsigned(self.thread.thread_id)
        self.is64Bit = bool(cputype & 0x01000000)
        isKern64Bit = bool(kern_cputype & 0x01000000)

        if self.is64Bit:
            if cputype == CPU_TYPE_X86_64:
                self.reg_type = "x86_64"
                self.saved_state = Cast(self.thread.machine.iss, 'x86_saved_state_t *').uss.ss_64
            if cputype == CPU_TYPE_ARM64:
                self.reg_type = "arm64"
                self.saved_state = self.thread.machine.upcb.uss.ss_64
        else:
            if cputype == CPU_TYPE_I386:
                self.reg_type = "i386"
                self.saved_state = Cast(self.thread.machine.iss, 'x86_saved_state_t *').uss.ss_32
            if cputype == CPU_TYPE_ARM:
                self.reg_type = "arm"
                if not isKern64Bit:
                    self.saved_state = self.thread.machine.PcbData
                else:
                    self.saved_state = self.thread.machine.contextData.ss.uss.ss_32
        logging.debug("created thread id 0x%x of type %s, kern_cputype 0x%x cputype 0x%x"
                      % (self.thread_id, self.reg_type, kern_cputype, cputype))

    def getRegisterValueByName(self, name):
        if self.reg_type == 'arm64':
            if name in ('x0', 'x1', 'x2', 'x3', 'x4', 'x5', 'x6', 'x7', 'x8', 'x9', 'x10', 'x11', 'x12', 'x13', 'x14', 'x15', 'x16', 'x17', 'x18', 'x19', 'x20', 'x21', 'x22', 'x23', 'x24', 'x25', 'x26', 'x27', 'x28'):
                return unsigned(getattr(self.saved_state, 'x')[int(name.strip('x'))])

            return unsigned(getattr(self.saved_state, name))

        if self.reg_type == "x86_64":
            if name in ('rip', 'rflags', 'cs', 'rsp', 'cpu'):
                return unsigned(getattr(self.saved_state.isf, name))
            return unsigned(getattr(self.saved_state, name))

        if self.reg_type == "arm":
            if name in ('r0', 'r1', 'r2', 'r3', 'r4', 'r5', 'r6', 'r7', 'r8', 'r9', 'r10', 'r11', 'r12'):
                retval = unsigned(getattr(self.saved_state, 'r')[int(name.strip('r'))])
            else:
                retval = unsigned(getattr(self.saved_state, name))
            return retval

        #TODO for i386

    def getName(self):
        return str(self.thread_id)

    def getRegisterData(self, reg_num):
        """ returns None if there is error """
        if reg_num < 0 or reg_num >= len(self.registerset):
            logging.warning("regnum %d is not defined for thread_id 0x%x" % (reg_num, self.thread_id))
            return None
        return self.getRegisterValueByName(self.registerset[reg_num]['name'])


class UserProcess(target.Process):
    """ Represent a user process and thread states """
    def __init__(self, task):
        self.task = task
        self.proc = Cast(task.bsd_info, 'proc_t')
        dataregisters64bit = False
        ptrsize = 4

        if task.t_flags & 0x1:
            ptrsize = 8
        if task.t_flags & 0x2:
            dataregisters64bit = 8

        cputype = CPU_TYPE_X86_64
        cpusubtype = CPU_SUBTYPE_X86_64_ALL

        if kern.arch in ('arm'):
            cputype = CPU_TYPE_ARM
            cpusubtype = CPU_SUBTYPE_ARM_V7
        elif kern.arch in ('armv8', 'arm64'):
            cputype = CPU_TYPE_ARM64
            cpusubtype = CPU_SUBTYPE_ARMV8

        super(UserProcess, self).__init__(cputype, cpusubtype, ptrsize)

        self.hinfo['ostype'] = 'macosx'
        if cputype != CPU_TYPE_X86_64:
            self.hinfo['ostype'] = 'ios'

        self.cputype = unsigned(self.proc.p_cputype)
        self.cpusubtype = unsigned(self.proc.p_cpusubtype)
        self.registerset = GetRegisterSetForCPU(cputype, cpusubtype)
        logging.debug("process %s is64bit: %d ptrsize: %d cputype: %d  cpusubtype:%d",
                      hex(self.proc), int(dataregisters64bit), ptrsize,
                      self.cputype, self.cpusubtype
                      )
        self.threads = {}
        self.threads_ids_list = []
        logging.debug("iterating over threads in process")
        for thval in IterateQueue(task.threads, 'thread *', 'task_threads'):
            self.threads[unsigned(thval.thread_id)] = UserThreadObject(thval, self.cputype, self.cpusubtype, cputype)
            self.threads_ids_list.append(unsigned(thval.thread_id))

    def getRegisterDataForThread(self, th_id, reg_num):
        if th_id not in self.threads:
            logging.critical("0x%x thread id is not found in this task")
            return ''
        if reg_num < 0 or reg_num >= len(self.registerset):
            logging.warning("regnum %d is not defined for thread_id 0x%x" % (reg_num, th_id))
            return ''
        value = self.threads[th_id].getRegisterData(reg_num)
        return self.encodeRegisterData(value, bytesize=self.registerset[reg_num]['bitsize']/8)

    def getRegisterCombinedDataForThread(self, th_id):
        if th_id not in self.threads:
            logging.critical("0x%x thread id is not found in this task" % th_id)
            return ''
        cur_thread = self.threads[th_id]
        retval = 'thread:%s;name:%s;' % (self.encodeThreadID(th_id), cur_thread.getName())
        pos = 0
        for rinfo in self.registerset:
            name = rinfo['name']
            format = "%02x:%s;"
            value = cur_thread.getRegisterValueByName(name)
            value_endian_correct_str = self.encodeRegisterData(value, bytesize=(rinfo['bitsize']/8))
            retval += format % (pos, value_endian_correct_str)
            pos += 1
        return retval

    def getThreadStopInfo(self, th_id):
        if th_id not in self.threads:
            logging.critical("0x%x thread id is not found in this task")
            return ''
        return 'T02' + self.getRegisterCombinedDataForThread(th_id) + 'threads:' + self.getThreadsInfo()+';'

    def getRegisterInfo(self, regnum):
        #something similar to
        #"name:x1;bitsize:64;offset:8;encoding:uint;format:hex;gcc:1;dwarf:1;set:General Purpose Registers;"
        if regnum > len(self.registerset):
            logging.debug("No register_info for number %d." % regnum)
            return 'E45'

        rinfo = self.registerset[regnum]
        retval = ''
        for i in rinfo.keys():
            i_val = str(rinfo[i])
            if i == 'set':
                i_val = 'General Purpose Registers'
            retval += '%s:%s;' % (str(i), i_val)

        return retval

    def getProcessInfo(self):
        retval = ''
        #pid:d22c;parent-pid:d34d;real-uid:ecf;real-gid:b;effective-uid:ecf;effective-gid:b;cputype:1000007;cpusubtype:3;
        #ostype:macosx;vendor:apple;endian:little;ptrsize:8;
        pinfo = {'effective-uid': 'ecf', 'effective-gid': 'b', 'endian': 'little', 'vendor': 'apple'}
        pinfo['pid'] = "%x" % (GetProcPIDForTask(self.task))
        pinfo['parent-pid'] = "%x" % (unsigned(self.proc.p_ppid))
        pinfo['ptrsize'] = str(self.ptrsize)
        pinfo['ostype'] = 'macosx'
        pinfo['cputype'] = "%x" % self.cputype
        pinfo['cpusubtype'] = "%x" % self.cpusubtype
        pinfo['real-uid'] = "%x" % (unsigned(self.proc.p_ruid))
        pinfo['real-gid'] = "%x" % (unsigned(self.proc.p_rgid))
        if str(kern.arch).find('arm') >= 0:
            pinfo['ostype'] = 'ios'
        for i in pinfo.keys():
            i_val = str(pinfo[i])
            retval += '%s:%s;' % (str(i), i_val)
        return retval

    def readMemory(self, address, size):
        data = GetUserDataAsString(self.task, address, size)
        if not data:
            logging.error("Failed to read memory task:{: <#018x} {: <#018x} {:d}".format(self.task, address, size))
        return self.encodeByteString(data)

    def getSharedLibInfoAddress(self):
        return unsigned(self.task.all_image_info_addr)