kcboot   [plain text]


#!/usr/bin/python

import argparse
import sys
import os
import subprocess
import re
import time

parser = argparse.ArgumentParser()
parser.add_argument("--probe", "-p")
parser.add_argument("--device", "-d", metavar="NAME")
parser.add_argument("--kernelcache", "-k")
parser.add_argument("--location", "-l")
parser.add_argument("--udid", "-u")
parser.add_argument("--verbose", "-v", action='store_true')
parser.add_argument("--quiet", "-q", action='store_true')
parser.add_argument("--list", action='store_true')
args = parser.parse_args()

if not args.list and not args.kernelcache:
    parser.error("Please specify --kernelcache or --list")

def no0x(x):
    m = re.match(r'(0x)?([a-fA-F0-9]+)$', x)
    if not m:
        raise Exception, "invalid location: " + x
    return m.group(2).lower()


def probe_serial(probe):
    m = re.search('([a-f0-9]+)$', probe, re.IGNORECASE)
    if not m:
        raise Exception, "bad probe name: " + probe
    return m.group(1).lower()


def find_probes():

    proc = subprocess.Popen(["astrisctl", "list"], stdout=subprocess.PIPE);
    (out, err) = proc.communicate()
    if proc.wait() != 0:
        raise Exception, "astrisctl failed"

    i = iter(out.splitlines())
    if not i.next().startswith("astrisctl"):
        raise Exception, "bad output from astrisctl"

    return [x.strip() for x in i if x.strip() != '' and 'astris_gcd_test_serial_queue' not in x]


def find_locations_under_probe(serial):

    proc = subprocess.Popen(["ioreg", "-l"], stdout=subprocess.PIPE);
    i = iter(proc.stdout)
    for line in i:
        m = re.match(r'([ \|]+) \+-o \s* (Kanzi|Chimp|Gorilla|Kong)\@([a-fA-F0-9]+)', line, re.VERBOSE)
        if not m:
            continue

        indent = r'[ \|]{%d,}' % (len(m.group(1)) + 1)

        line = i.next()
        if not re.match(indent + r'\s* \{ \s* $', line, re.VERBOSE):
            raise Exception

        probe_serial = None
        for line in i:
            if not re.match(indent, line):
                raise Exception
            if re.match(indent + r'\s* \} \s* $', line, re.VERBOSE):
                break
            m = re.match(indent + r'\s*"USB Serial Number" = "([a-fA-F0-9]+)"\s*$', line)
            if m:
                probe_serial = m.group(1).lower()

        if probe_serial != serial:
            continue

        for line in i:
            if not re.match(indent, line):
                break
            m = re.match(indent + r'\s* \+-o .*? @([a-fA-F0-9]+)', line, re.VERBOSE)
            if m:
                yield m.group(1).lower()

    if proc.wait() != 0:
        raise Exception, "ioreg failed"


def find_usbterm_locations():

    proc = subprocess.Popen(["usbterm", "-list"], stdout=subprocess.PIPE)
    for line in proc.stdout:
        m = re.match(r' (0x) ([a-fA-F0-9]+) ,? $ ', line.split()[2], re.VERBOSE)
        if not m:
            raise Exception
        yield m.group(2).lower()

    if proc.wait() != 0:
        raise Exception, "usbterm failed"


def mobdev_list():
    proc = subprocess.Popen(["mobdev", "list"], stdout=subprocess.PIPE)

    regex = ( r'Attach (?P<model>.*) device "(?P<name>.*)"' +
              r' \(AMDevice 0x[a-fA-F0-9]+ \{UDID = (?P<udid>[a-fA-F0-9]+),' +
              r' device ID = (?P<id>\d+),' +
              r' location ID = 0x(?P<location>[0-9a-fA-F]+),' +
              r' product ID = 0x(?P<product>[0-9a-fA-F]+)' +
              r'\s*\}\)\s*$')
    regex = regex.replace(' ', '\s+')

    for line in proc.stdout:
        m = re.search(regex, line)
        if m:
            d = m.groupdict()
            for key in ('udid', 'location', 'product'):
                d[key] = d[key].lower()
            yield d

    if proc.wait() != 0:
        raise Exception, "mobdev failed"


if args.list:

    devs = list(mobdev_list())

    for probe in find_probes():

        locations = set(find_locations_under_probe(probe_serial(probe))).intersection(set(find_usbterm_locations()))
        if len(locations) != 1:
            print 'Probe = {probe}'.format(probe=probe)
            continue

        location = locations.pop()
        for dev in devs:
            if dev['location'] == location:
                dev['probe'] = probe
                print 'Probe = {probe}, UDID = {udid}, Location = {location}, Name = "{name}"'.format(**dev)
                break
        else:
            print 'Probe = {probe}, Location = {location}'.format(probe=probe, location=location)

    sys.exit(0)



if args.location:
    args.location = no0x(args.location)


if args.location is None and args.device is not None:
    proc = subprocess.Popen(["mobdev", "list"], stdout=subprocess.PIPE)
    for line in proc.stdout:
        regex = r'device "{name}".*location ID = (0x[a-fA-F0-9]+)'.format(
            name=re.escape(args.device))
        m = re.search(regex, line)
        if m:
            args.location = no0x(m.group(1))
    if args.location is None:
        raise Exception, "could not find device with name: " + args.device
    if proc.wait() != 0:
        raise Exception, "mobdev failed"


if args.location is None and args.udid is not None:
    proc = subprocess.Popen(["mobdev", "list"], stdout=subprocess.PIPE)
    for line in proc.stdout:
        regex = r'UDID = {udid}.*location ID = (0x[a-fA-F0-9]+)'.format(
            udid=re.escape(args.udid))
        m = re.search(regex, line, re.IGNORECASE)
        if m:
            args.location = no0x(m.group(1))
    if args.location is None:
        raise Exception, "could not find device with udid: " + args.udid
    if proc.wait() != 0:
        raise Exception, "mobdev failed"



if args.probe is None and args.location is not None:

    for probe in find_probes():
        if args.location in find_locations_under_probe(probe_serial(probe)):
            args.probe = probe
            break
    else:
        raise Exception, "couldn't find probe for location: " + args.location


if args.probe is None:

    probes = find_probes()

    if len(probes) > 1:
        raise Exception, "multiple probes found, please pick one"

    if len(probes) == 0:
        raise Exception, "no probe found"

    args.probe = probes.pop()


serial = probe_serial(args.probe)


def find_serial_port(serial):
    ports = [x for x in os.listdir("/dev/") if x.startswith("cu.") and re.search(serial[:-1], x, re.IGNORECASE)]
    if len(ports) != 1:
        raise Exception, "couldn't find serial port for probe"
    return "/dev/" + ports.pop()

port = find_serial_port(serial)


if args.location is None:

    locations = set(find_locations_under_probe(serial)).intersection(set(find_usbterm_locations()))

    if len(locations) > 1:
        raise Exception, "multiple usbterms under one kanzi?"

    if len(locations) == 1:
        args.location = locations.pop()

    else:
        print "Can't find usbterm. Rebooting..."

        expect = os.path.join(os.path.dirname(sys.argv[0]), "_kcboot_expect")
        cmd = ["expect", expect, args.probe, port]
        if args.verbose:
            proc = subprocess.Popen(cmd)
        else:
            proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        out,err = proc.communicate()
        proc.wait()

        for i in range(20):
            locations = set(find_locations_under_probe(serial)).intersection(set(find_usbterm_locations()))
            if len(locations) == 0:
                time.sleep(0.1)
            else:
                break

        if len(locations) > 1:
            raise Exception, "multiple usbterms under one kanzi?"

        if len(locations) < 1:
            raise Exception, "still couldn't find usbterm"

        args.location = locations.pop()


expect = os.path.join(os.path.dirname(sys.argv[0]), "_kcboot_expect")

cmd = ["expect", expect, args.probe, port, args.kernelcache, '0x' + args.location]

if not args.quiet:
    print "Booting kernelcache at:", args.kernelcache
    print "             via probe:", args.probe
    print "        at location id: 0x" + args.location
    print

if args.verbose:
    proc = subprocess.Popen(cmd)
else:
    proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

out,err = proc.communicate()
status = proc.wait()

if not args.quiet:
    if args.verbose:
        print

    print "Done." if status==0 else "Failed!"

sys.exit(status)