import logging
import target
import struct
from xnu import *
from core.operating_system import Armv8_RegisterSet, Armv7_RegisterSet, I386_RegisterSet, X86_64RegisterSet
""" these defines should come from an authoritative header file """
CPU_TYPE_I386 = 0x00000007
CPU_TYPE_X86_64 = 0x01000007
CPU_TYPE_ARM = 0x0000000c
CPU_TYPE_ARM64 = 0x0100000c
CPU_TYPE_ARM64_32 = 0x0200000c
def GetRegisterSetForCPU(cputype, subtype):
if cputype == CPU_TYPE_ARM64:
retval = Armv8_RegisterSet
elif cputype == CPU_TYPE_ARM64_32:
retval = Armv8_RegisterSet
elif cputype == CPU_TYPE_ARM:
retval = Armv7_RegisterSet
elif cputype == CPU_TYPE_I386:
retval = I386_RegisterSet
elif cputype == CPU_TYPE_X86_64:
retval = X86_64RegisterSet
""" crash if unknown cputype """
return retval.register_info['registers']
class UserThreadObject(object):
"""representation of userspace thread"""
def __init__(self, thr_obj, cputype, cpusubtype, is_kern_64bit):
super(UserThreadObject, self).__init__()
self.thread = thr_obj
self.registerset = GetRegisterSetForCPU(cputype, cpusubtype)
self.thread_id = unsigned(self.thread.thread_id)
self.is64Bit = bool(cputype & 0x01000000)
if self.is64Bit:
if cputype == CPU_TYPE_X86_64:
self.reg_type = "x86_64"
self.saved_state = Cast(self.thread.machine.iss, 'x86_saved_state_t *').uss.ss_64
if cputype == CPU_TYPE_ARM64:
self.reg_type = "arm64"
self.saved_state = self.thread.machine.upcb.uss.ss_64
else:
if cputype == CPU_TYPE_I386:
self.reg_type = "i386"
self.saved_state = Cast(self.thread.machine.iss, 'x86_saved_state_t *').uss.ss_32
if cputype == CPU_TYPE_ARM:
self.reg_type = "arm"
if not is_kern_64bit:
self.saved_state = self.thread.machine.PcbData
else:
self.saved_state = self.thread.machine.contextData.ss.uss.ss_32
if cputype == CPU_TYPE_ARM64_32:
self.reg_type = "arm64"
self.saved_state = self.thread.machine.upcb.uss.ss_64
logging.debug("created thread id 0x%x of type %s, is_kern_64bit 0x%x cputype 0x%x"
% (self.thread_id, self.reg_type, is_kern_64bit, cputype))
def getRegisterValueByName(self, name):
if self.reg_type == 'arm64':
if name in ('x0', 'x1', 'x2', 'x3', 'x4', 'x5', 'x6', 'x7', 'x8', 'x9', 'x10', 'x11', 'x12', 'x13', 'x14', 'x15', 'x16', 'x17', 'x18', 'x19', 'x20', 'x21', 'x22', 'x23', 'x24', 'x25', 'x26', 'x27', 'x28'):
return unsigned(getattr(self.saved_state, 'x')[int(name.strip('x'))])
return unsigned(getattr(self.saved_state, name))
if self.reg_type == "x86_64":
if name in ('rip', 'rflags', 'cs', 'rsp', 'cpu'):
return unsigned(getattr(self.saved_state.isf, name))
return unsigned(getattr(self.saved_state, name))
if self.reg_type == "arm":
if name in ('r0', 'r1', 'r2', 'r3', 'r4', 'r5', 'r6', 'r7', 'r8', 'r9', 'r10', 'r11', 'r12'):
retval = unsigned(getattr(self.saved_state, 'r')[int(name.strip('r'))])
else:
retval = unsigned(getattr(self.saved_state, name))
return retval
def getName(self):
return str(self.thread_id)
def getRegisterData(self, reg_num):
""" returns None if there is error """
if reg_num < 0 or reg_num >= len(self.registerset):
logging.warning("regnum %d is not defined for thread_id 0x%x" % (reg_num, self.thread_id))
return None
return self.getRegisterValueByName(self.registerset[reg_num]['name'])
class UserProcess(target.Process):
""" Represent a user process and thread states """
def __init__(self, task):
self.task = task
self.proc = Cast(task.bsd_info, 'proc_t')
dataregisters64bit = False
ptrsize = 4
if task.t_flags & 0x1:
ptrsize = 8
if task.t_flags & 0x2:
dataregisters64bit = True
is_kern_64bit = kern.arch in ['x86_64', 'x86_64h', 'arm64', 'arm64e']
self.cputype = unsigned(self.proc.p_cputype)
self.cpusubtype = unsigned(self.proc.p_cpusubtype)
super(UserProcess, self).__init__(self.cputype, self.cpusubtype, ptrsize)
self.hinfo['ostype'] = 'macosx'
if self.cputype != CPU_TYPE_X86_64 and self.cputype != CPU_TYPE_I386:
self.hinfo['ostype'] = 'ios'
self.registerset = GetRegisterSetForCPU(self.cputype, self.cpusubtype)
logging.debug("process %s is64bit: %d ptrsize: %d cputype: %d cpusubtype:%d",
hex(self.proc), int(dataregisters64bit), ptrsize,
self.cputype, self.cpusubtype
)
self.threads = {}
self.threads_ids_list = []
logging.debug("iterating over threads in process")
for thval in IterateQueue(task.threads, 'thread *', 'task_threads'):
self.threads[unsigned(thval.thread_id)] = UserThreadObject(thval, self.cputype, self.cpusubtype, is_kern_64bit)
self.threads_ids_list.append(unsigned(thval.thread_id))
def getRegisterDataForThread(self, th_id, reg_num):
if th_id not in self.threads:
logging.critical("0x%x thread id is not found in this task")
return ''
if reg_num < 0 or reg_num >= len(self.registerset):
logging.warning("regnum %d is not defined for thread_id 0x%x" % (reg_num, th_id))
return ''
value = self.threads[th_id].getRegisterData(reg_num)
return self.encodeRegisterData(value, bytesize=self.registerset[reg_num]['bitsize']/8)
def getRegisterCombinedDataForThread(self, th_id):
if th_id not in self.threads:
logging.critical("0x%x thread id is not found in this task" % th_id)
return ''
cur_thread = self.threads[th_id]
retval = 'thread:%s;name:%s;' % (self.encodeThreadID(th_id), cur_thread.getName())
pos = 0
for rinfo in self.registerset:
name = rinfo['name']
format = "%02x:%s;"
value = cur_thread.getRegisterValueByName(name)
value_endian_correct_str = self.encodeRegisterData(value, bytesize=(rinfo['bitsize']/8))
retval += format % (pos, value_endian_correct_str)
pos += 1
return retval
def getThreadStopInfo(self, th_id):
if th_id not in self.threads:
logging.critical("0x%x thread id is not found in this task")
return ''
return 'T02' + self.getRegisterCombinedDataForThread(th_id) + 'threads:' + self.getThreadsInfo()+';'
def getRegisterInfo(self, regnum):
if regnum > len(self.registerset):
logging.debug("No register_info for number %d." % regnum)
return 'E45'
rinfo = self.registerset[regnum]
retval = ''
for i in rinfo.keys():
i_val = str(rinfo[i])
if i == 'set':
i_val = 'General Purpose Registers'
retval += '%s:%s;' % (str(i), i_val)
return retval
def getProcessInfo(self):
retval = ''
pinfo = {'effective-uid': 'ecf', 'effective-gid': 'b', 'endian': 'little', 'vendor': 'apple'}
pinfo['pid'] = "%x" % (GetProcPIDForTask(self.task))
pinfo['parent-pid'] = "%x" % (unsigned(self.proc.p_ppid))
pinfo['ptrsize'] = str(self.ptrsize)
pinfo['ostype'] = 'macosx'
pinfo['cputype'] = "%x" % self.cputype
pinfo['cpusubtype'] = "%x" % self.cpusubtype
pinfo['real-uid'] = "%x" % (unsigned(self.proc.p_ruid))
pinfo['real-gid'] = "%x" % (unsigned(self.proc.p_rgid))
if str(kern.arch).find('arm') >= 0:
pinfo['ostype'] = 'ios'
for i in pinfo.keys():
i_val = str(pinfo[i])
retval += '%s:%s;' % (str(i), i_val)
return retval
def readMemory(self, address, size):
data = GetUserDataAsString(self.task, address, size)
if not data:
logging.error("Failed to read memory task:{: <#018x} {: <#018x} {:d}".format(self.task, address, size))
return self.encodeByteString(data)
def getSharedLibInfoAddress(self):
return unsigned(self.task.all_image_info_addr)