import logging
import target
import struct
from xnu import *
from core.operating_system import Armv8_RegisterSet, Armv7_RegisterSet, I386_RegisterSet, X86_64RegisterSet
CPU_TYPE_I386 = 0x00000007
CPU_TYPE_X86_64 = 0x01000007
CPU_TYPE_ARM = 0x0000000c
CPU_TYPE_ARM64 = 0x0100000c
CPU_SUBTYPE_X86_64_ALL = 3
CPU_SUBTYPE_X86_64_H = 8
CPU_SUBTYPE_ARMV8 = 13
CPU_SUBTYPE_ARM_V7 = 9
CPU_SUBTYPE_ARM_V7S = 11
CPU_SUBTYPE_ARM_V7K = 12
def GetRegisterSetForCPU(cputype, subtype):
retval = X86_64RegisterSet
if cputype in (CPU_TYPE_ARM, CPU_TYPE_ARM64):
if subtype == CPU_SUBTYPE_ARMV8:
retval = Armv8_RegisterSet
else:
retval = Armv7_RegisterSet
elif cputype == CPU_TYPE_I386:
retval = I386_RegisterSet
return retval.register_info['registers']
class UserThreadObject(object):
"""representation of userspace thread"""
def __init__(self, thr_obj, cputype, cpusubtype, kern_cputype):
super(UserThreadObject, self).__init__()
self.thread = thr_obj
self.registerset = GetRegisterSetForCPU(cputype, cpusubtype)
self.thread_id = unsigned(self.thread.thread_id)
self.is64Bit = bool(cputype & 0x01000000)
isKern64Bit = bool(kern_cputype & 0x01000000)
if self.is64Bit:
if cputype == CPU_TYPE_X86_64:
self.reg_type = "x86_64"
self.saved_state = Cast(self.thread.machine.iss, 'x86_saved_state_t *').uss.ss_64
if cputype == CPU_TYPE_ARM64:
self.reg_type = "arm64"
self.saved_state = self.thread.machine.upcb.uss.ss_64
else:
if cputype == CPU_TYPE_I386:
self.reg_type = "i386"
self.saved_state = Cast(self.thread.machine.iss, 'x86_saved_state_t *').uss.ss_32
if cputype == CPU_TYPE_ARM:
self.reg_type = "arm"
if not isKern64Bit:
self.saved_state = self.thread.machine.PcbData
else:
self.saved_state = self.thread.machine.contextData.ss.uss.ss_32
logging.debug("created thread id 0x%x of type %s, kern_cputype 0x%x cputype 0x%x"
% (self.thread_id, self.reg_type, kern_cputype, cputype))
def getRegisterValueByName(self, name):
if self.reg_type == 'arm64':
if name in ('x0', 'x1', 'x2', 'x3', 'x4', 'x5', 'x6', 'x7', 'x8', 'x9', 'x10', 'x11', 'x12', 'x13', 'x14', 'x15', 'x16', 'x17', 'x18', 'x19', 'x20', 'x21', 'x22', 'x23', 'x24', 'x25', 'x26', 'x27', 'x28'):
return unsigned(getattr(self.saved_state, 'x')[int(name.strip('x'))])
return unsigned(getattr(self.saved_state, name))
if self.reg_type == "x86_64":
if name in ('rip', 'rflags', 'cs', 'rsp', 'cpu'):
return unsigned(getattr(self.saved_state.isf, name))
return unsigned(getattr(self.saved_state, name))
if self.reg_type == "arm":
if name in ('r0', 'r1', 'r2', 'r3', 'r4', 'r5', 'r6', 'r7', 'r8', 'r9', 'r10', 'r11', 'r12'):
retval = unsigned(getattr(self.saved_state, 'r')[int(name.strip('r'))])
else:
retval = unsigned(getattr(self.saved_state, name))
return retval
def getName(self):
return str(self.thread_id)
def getRegisterData(self, reg_num):
""" returns None if there is error """
if reg_num < 0 or reg_num >= len(self.registerset):
logging.warning("regnum %d is not defined for thread_id 0x%x" % (reg_num, self.thread_id))
return None
return self.getRegisterValueByName(self.registerset[reg_num]['name'])
class UserProcess(target.Process):
""" Represent a user process and thread states """
def __init__(self, task):
self.task = task
self.proc = Cast(task.bsd_info, 'proc_t')
dataregisters64bit = False
ptrsize = 4
if task.t_flags & 0x1:
ptrsize = 8
if task.t_flags & 0x2:
dataregisters64bit = 8
cputype = CPU_TYPE_X86_64
cpusubtype = CPU_SUBTYPE_X86_64_ALL
if kern.arch in ('arm'):
cputype = CPU_TYPE_ARM
cpusubtype = CPU_SUBTYPE_ARM_V7
elif kern.arch in ('armv8', 'arm64'):
cputype = CPU_TYPE_ARM64
cpusubtype = CPU_SUBTYPE_ARMV8
super(UserProcess, self).__init__(cputype, cpusubtype, ptrsize)
self.hinfo['ostype'] = 'macosx'
if cputype != CPU_TYPE_X86_64:
self.hinfo['ostype'] = 'ios'
self.cputype = unsigned(self.proc.p_cputype)
self.cpusubtype = unsigned(self.proc.p_cpusubtype)
self.registerset = GetRegisterSetForCPU(cputype, cpusubtype)
logging.debug("process %s is64bit: %d ptrsize: %d cputype: %d cpusubtype:%d",
hex(self.proc), int(dataregisters64bit), ptrsize,
self.cputype, self.cpusubtype
)
self.threads = {}
self.threads_ids_list = []
logging.debug("iterating over threads in process")
for thval in IterateQueue(task.threads, 'thread *', 'task_threads'):
self.threads[unsigned(thval.thread_id)] = UserThreadObject(thval, self.cputype, self.cpusubtype, cputype)
self.threads_ids_list.append(unsigned(thval.thread_id))
def getRegisterDataForThread(self, th_id, reg_num):
if th_id not in self.threads:
logging.critical("0x%x thread id is not found in this task")
return ''
if reg_num < 0 or reg_num >= len(self.registerset):
logging.warning("regnum %d is not defined for thread_id 0x%x" % (reg_num, th_id))
return ''
value = self.threads[th_id].getRegisterData(reg_num)
return self.encodeRegisterData(value, bytesize=self.registerset[reg_num]['bitsize']/8)
def getRegisterCombinedDataForThread(self, th_id):
if th_id not in self.threads:
logging.critical("0x%x thread id is not found in this task" % th_id)
return ''
cur_thread = self.threads[th_id]
retval = 'thread:%s;name:%s;' % (self.encodeThreadID(th_id), cur_thread.getName())
pos = 0
for rinfo in self.registerset:
name = rinfo['name']
format = "%02x:%s;"
value = cur_thread.getRegisterValueByName(name)
value_endian_correct_str = self.encodeRegisterData(value, bytesize=(rinfo['bitsize']/8))
retval += format % (pos, value_endian_correct_str)
pos += 1
return retval
def getThreadStopInfo(self, th_id):
if th_id not in self.threads:
logging.critical("0x%x thread id is not found in this task")
return ''
return 'T02' + self.getRegisterCombinedDataForThread(th_id) + 'threads:' + self.getThreadsInfo()+';'
def getRegisterInfo(self, regnum):
if regnum > len(self.registerset):
logging.debug("No register_info for number %d." % regnum)
return 'E45'
rinfo = self.registerset[regnum]
retval = ''
for i in rinfo.keys():
i_val = str(rinfo[i])
if i == 'set':
i_val = 'General Purpose Registers'
retval += '%s:%s;' % (str(i), i_val)
return retval
def getProcessInfo(self):
retval = ''
pinfo = {'effective-uid': 'ecf', 'effective-gid': 'b', 'endian': 'little', 'vendor': 'apple'}
pinfo['pid'] = "%x" % (GetProcPIDForTask(self.task))
pinfo['parent-pid'] = "%x" % (unsigned(self.proc.p_ppid))
pinfo['ptrsize'] = str(self.ptrsize)
pinfo['ostype'] = 'macosx'
pinfo['cputype'] = "%x" % self.cputype
pinfo['cpusubtype'] = "%x" % self.cpusubtype
pinfo['real-uid'] = "%x" % (unsigned(self.proc.p_ruid))
pinfo['real-gid'] = "%x" % (unsigned(self.proc.p_rgid))
if str(kern.arch).find('arm') >= 0:
pinfo['ostype'] = 'ios'
for i in pinfo.keys():
i_val = str(pinfo[i])
retval += '%s:%s;' % (str(i), i_val)
return retval
def readMemory(self, address, size):
data = GetUserDataAsString(self.task, address, size)
if not data:
logging.error("Failed to read memory task:{: <#018x} {: <#018x} {:d}".format(self.task, address, size))
return self.encodeByteString(data)
def getSharedLibInfoAddress(self):
return unsigned(self.task.all_image_info_addr)