#!/usr/bin/python import argparse import sys import os import subprocess import re import time parser = argparse.ArgumentParser() parser.add_argument("--probe", "-p") parser.add_argument("--device", "-d", metavar="NAME") parser.add_argument("--kernelcache", "-k") parser.add_argument("--location", "-l") parser.add_argument("--udid", "-u") parser.add_argument("--verbose", "-v", action='store_true') parser.add_argument("--quiet", "-q", action='store_true') parser.add_argument("--list", action='store_true') args = parser.parse_args() if not args.list and not args.kernelcache: parser.error("Please specify --kernelcache or --list") def no0x(x): m = re.match(r'(0x)?([a-fA-F0-9]+)$', x) if not m: raise Exception, "invalid location: " + x return m.group(2).lower() def probe_serial(probe): m = re.search('([a-f0-9]+)$', probe, re.IGNORECASE) if not m: raise Exception, "bad probe name: " + probe return m.group(1).lower() def find_probes(): proc = subprocess.Popen(["astrisctl", "list"], stdout=subprocess.PIPE); (out, err) = proc.communicate() if proc.wait() != 0: raise Exception, "astrisctl failed" i = iter(out.splitlines()) if not i.next().startswith("astrisctl"): raise Exception, "bad output from astrisctl" return [x.strip() for x in i if x.strip() != '' and 'astris_gcd_test_serial_queue' not in x] def find_locations_under_probe(serial): proc = subprocess.Popen(["ioreg", "-l"], stdout=subprocess.PIPE); i = iter(proc.stdout) for line in i: m = re.match(r'([ \|]+) \+-o \s* (Kanzi|Chimp|Gorilla|Kong)\@([a-fA-F0-9]+)', line, re.VERBOSE) if not m: continue indent = r'[ \|]{%d,}' % (len(m.group(1)) + 1) line = i.next() if not re.match(indent + r'\s* \{ \s* $', line, re.VERBOSE): raise Exception probe_serial = None for line in i: if not re.match(indent, line): raise Exception if re.match(indent + r'\s* \} \s* $', line, re.VERBOSE): break m = re.match(indent + r'\s*"USB Serial Number" = "([a-fA-F0-9]+)"\s*$', line) if m: probe_serial = m.group(1).lower() if probe_serial != serial: continue for line in i: if not re.match(indent, line): break m = re.match(indent + r'\s* \+-o .*? @([a-fA-F0-9]+)', line, re.VERBOSE) if m: yield m.group(1).lower() if proc.wait() != 0: raise Exception, "ioreg failed" def find_usbterm_locations(): proc = subprocess.Popen(["usbterm", "-list"], stdout=subprocess.PIPE) for line in proc.stdout: m = re.match(r' (0x) ([a-fA-F0-9]+) ,? $ ', line.split()[2], re.VERBOSE) if not m: raise Exception yield m.group(2).lower() if proc.wait() != 0: raise Exception, "usbterm failed" def mobdev_list(): proc = subprocess.Popen(["mobdev", "list"], stdout=subprocess.PIPE) regex = ( r'Attach (?P<model>.*) device "(?P<name>.*)"' + r' \(AMDevice 0x[a-fA-F0-9]+ \{UDID = (?P<udid>[a-fA-F0-9]+),' + r' device ID = (?P<id>\d+),' + r' location ID = 0x(?P<location>[0-9a-fA-F]+),' + r' product ID = 0x(?P<product>[0-9a-fA-F]+)' + r'\s*\}\)\s*$') regex = regex.replace(' ', '\s+') for line in proc.stdout: m = re.search(regex, line) if m: d = m.groupdict() for key in ('udid', 'location', 'product'): d[key] = d[key].lower() yield d if proc.wait() != 0: raise Exception, "mobdev failed" if args.list: devs = list(mobdev_list()) for probe in find_probes(): locations = set(find_locations_under_probe(probe_serial(probe))).intersection(set(find_usbterm_locations())) if len(locations) != 1: print 'Probe = {probe}'.format(probe=probe) continue location = locations.pop() for dev in devs: if dev['location'] == location: dev['probe'] = probe print 'Probe = {probe}, UDID = {udid}, Location = {location}, Name = "{name}"'.format(**dev) break else: print 'Probe = {probe}, Location = {location}'.format(probe=probe, location=location) sys.exit(0) if args.location: args.location = no0x(args.location) if args.location is None and args.device is not None: proc = subprocess.Popen(["mobdev", "list"], stdout=subprocess.PIPE) for line in proc.stdout: regex = r'device "{name}".*location ID = (0x[a-fA-F0-9]+)'.format( name=re.escape(args.device)) m = re.search(regex, line) if m: args.location = no0x(m.group(1)) if args.location is None: raise Exception, "could not find device with name: " + args.device if proc.wait() != 0: raise Exception, "mobdev failed" if args.location is None and args.udid is not None: proc = subprocess.Popen(["mobdev", "list"], stdout=subprocess.PIPE) for line in proc.stdout: regex = r'UDID = {udid}.*location ID = (0x[a-fA-F0-9]+)'.format( udid=re.escape(args.udid)) m = re.search(regex, line, re.IGNORECASE) if m: args.location = no0x(m.group(1)) if args.location is None: raise Exception, "could not find device with udid: " + args.udid if proc.wait() != 0: raise Exception, "mobdev failed" if args.probe is None and args.location is not None: for probe in find_probes(): if args.location in find_locations_under_probe(probe_serial(probe)): args.probe = probe break else: raise Exception, "couldn't find probe for location: " + args.location if args.probe is None: probes = find_probes() if len(probes) > 1: raise Exception, "multiple probes found, please pick one" if len(probes) == 0: raise Exception, "no probe found" args.probe = probes.pop() serial = probe_serial(args.probe) def find_serial_port(serial): ports = [x for x in os.listdir("/dev/") if x.startswith("cu.") and re.search(serial[:-1], x, re.IGNORECASE)] if len(ports) != 1: raise Exception, "couldn't find serial port for probe" return "/dev/" + ports.pop() port = find_serial_port(serial) if args.location is None: locations = set(find_locations_under_probe(serial)).intersection(set(find_usbterm_locations())) if len(locations) > 1: raise Exception, "multiple usbterms under one kanzi?" if len(locations) == 1: args.location = locations.pop() else: print "Can't find usbterm. Rebooting..." expect = os.path.join(os.path.dirname(sys.argv[0]), "_kcboot_expect") cmd = ["expect", expect, args.probe, port] if args.verbose: proc = subprocess.Popen(cmd) else: proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) out,err = proc.communicate() proc.wait() for i in range(20): locations = set(find_locations_under_probe(serial)).intersection(set(find_usbterm_locations())) if len(locations) == 0: time.sleep(0.1) else: break if len(locations) > 1: raise Exception, "multiple usbterms under one kanzi?" if len(locations) < 1: raise Exception, "still couldn't find usbterm" args.location = locations.pop() expect = os.path.join(os.path.dirname(sys.argv[0]), "_kcboot_expect") cmd = ["expect", expect, args.probe, port, args.kernelcache, '0x' + args.location] if not args.quiet: print "Booting kernelcache at:", args.kernelcache print " via probe:", args.probe print " at location id: 0x" + args.location print if args.verbose: proc = subprocess.Popen(cmd) else: proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) out,err = proc.communicate() status = proc.wait() if not args.quiet: if args.verbose: print print "Done." if status==0 else "Failed!" sys.exit(status)