"""
Test lldb Python event APIs.
"""
import os, time
import re
import unittest2
import lldb, lldbutil
from lldbtest import *
class EventAPITestCase(TestBase):
mydir = os.path.join("python_api", "event")
@unittest2.skipUnless(sys.platform.startswith("darwin"), "requires Darwin")
@python_api_test
@dsym_test
def test_listen_for_and_print_event_with_dsym(self):
"""Exercise SBEvent API."""
self.buildDsym()
self.do_listen_for_and_print_event()
@python_api_test
@dwarf_test
def test_listen_for_and_print_event_with_dwarf(self):
"""Exercise SBEvent API."""
self.buildDwarf()
self.do_listen_for_and_print_event()
@unittest2.skipUnless(sys.platform.startswith("darwin"), "requires Darwin")
@python_api_test
@dsym_test
def test_wait_for_event_with_dsym(self):
"""Exercise SBListener.WaitForEvent() API."""
self.buildDsym()
self.do_wait_for_event()
@python_api_test
@dwarf_test
def test_wait_for_event_with_dwarf(self):
"""Exercise SBListener.WaitForEvent() API."""
self.buildDwarf()
self.do_wait_for_event()
@unittest2.skipUnless(sys.platform.startswith("darwin"), "requires Darwin")
@python_api_test
@dsym_test
def test_add_listener_to_broadcaster_with_dsym(self):
"""Exercise some SBBroadcaster APIs."""
self.buildDsym()
self.do_add_listener_to_broadcaster()
@python_api_test
@dwarf_test
def test_add_listener_to_broadcaster_with_dwarf(self):
"""Exercise some SBBroadcaster APIs."""
self.buildDwarf()
self.do_add_listener_to_broadcaster()
def setUp(self):
TestBase.setUp(self)
self.line = line_number('main.c', '// Find the line number of function "c" here.')
def do_listen_for_and_print_event(self):
"""Create a listener and use SBEvent API to print the events received."""
exe = os.path.join(os.getcwd(), "a.out")
target = self.dbg.CreateTarget(exe)
self.assertTrue(target, VALID_TARGET)
breakpoint = target.BreakpointCreateByName('c', 'a.out')
process = target.LaunchSimple(None, None, os.getcwd())
self.assertTrue(process.GetState() == lldb.eStateStopped,
PROCESS_STOPPED)
broadcaster = process.GetBroadcaster()
event = lldb.SBEvent()
listener = lldb.SBListener("my listener")
rc = broadcaster.AddListener(listener, lldb.SBProcess.eBroadcastBitStateChanged)
self.assertTrue(rc, "AddListener successfully retruns")
traceOn = self.TraceOn()
if traceOn:
lldbutil.print_stacktraces(process)
import threading
class MyListeningThread(threading.Thread):
def run(self):
count = 0
while not count > 3:
if traceOn:
print "Try wait for event..."
if listener.WaitForEventForBroadcasterWithType(5,
broadcaster,
lldb.SBProcess.eBroadcastBitStateChanged,
event):
if traceOn:
desc = lldbutil.get_description(event)
print "Event description:", desc
print "Event data flavor:", event.GetDataFlavor()
print "Process state:", lldbutil.state_type_to_str(process.GetState())
print
else:
if traceOn:
print "timeout occurred waiting for event..."
count = count + 1
return
my_thread = MyListeningThread()
my_thread.start()
process.Continue()
process.Kill()
my_thread.join()
def do_wait_for_event(self):
"""Get the listener associated with the debugger and exercise WaitForEvent API."""
exe = os.path.join(os.getcwd(), "a.out")
target = self.dbg.CreateTarget(exe)
self.assertTrue(target, VALID_TARGET)
breakpoint = target.BreakpointCreateByName('c', 'a.out')
self.assertTrue(breakpoint and
breakpoint.GetNumLocations() == 1,
VALID_BREAKPOINT)
listener = self.dbg.GetListener()
error = lldb.SBError()
process = target.Launch (listener, None, None, None, None, None, None, 0, False, error)
self.assertTrue(error.Success() and process, PROCESS_IS_VALID)
broadcaster = process.GetBroadcaster()
self.assertTrue(broadcaster, "Process with valid broadcaster")
event = lldb.SBEvent()
self.assertFalse(event, "Event should not be valid initially")
import threading
class MyListeningThread(threading.Thread):
def run(self):
count = 0
while not count > 3:
if listener.WaitForEvent(5, event):
return
count = count + 1
print "Timeout: listener.WaitForEvent"
return
process.Kill()
my_thread = MyListeningThread()
my_thread.start()
my_thread.join()
self.assertTrue(event,
"My listening thread successfully received an event")
def do_add_listener_to_broadcaster(self):
"""Get the broadcaster associated with the process and wait for broadcaster events."""
exe = os.path.join(os.getcwd(), "a.out")
target = self.dbg.CreateTarget(exe)
self.assertTrue(target, VALID_TARGET)
breakpoint = target.BreakpointCreateByName('c', 'a.out')
self.assertTrue(breakpoint and
breakpoint.GetNumLocations() == 1,
VALID_BREAKPOINT)
process = target.LaunchSimple(None, None, os.getcwd())
self.assertTrue(process.GetState() == lldb.eStateStopped,
PROCESS_STOPPED)
broadcaster = process.GetBroadcaster()
self.assertTrue(broadcaster, "Process with valid broadcaster")
event = lldb.SBEvent()
self.assertFalse(event, "Event should not be valid initially")
listener = lldb.SBListener("TestEvents.listener")
rc = broadcaster.AddListener(listener, lldb.SBProcess.eBroadcastBitStateChanged)
self.assertTrue(rc, "AddListener successfully retruns")
self.state = 0
import threading
class MyListeningThread(threading.Thread):
def run(self):
pattern = re.compile("data = {.*, state = (.*)}$")
count = 0
while True:
if listener.WaitForEventForBroadcasterWithType(5,
broadcaster,
lldb.SBProcess.eBroadcastBitStateChanged,
event):
desc = lldbutil.get_description(event)
match = pattern.search(desc)
if not match:
break;
if self.context.state == 0 and match.group(1) == 'running':
self.context.state = 1
continue
elif self.context.state == 1 and match.group(1) == 'stopped':
self.context.state = 2
break
else:
break
print "Timeout: listener.WaitForEvent"
count = count + 1
if count > 6:
break
return
process.Continue()
my_thread = MyListeningThread()
my_thread.context = self
my_thread.start()
my_thread.join()
broadcaster.RemoveListener(listener, lldb.SBProcess.eBroadcastBitStateChanged)
self.assertTrue(self.state == 2,
"Both expected state changed events received")
if __name__ == '__main__':
import atexit
lldb.SBDebugger.Initialize()
atexit.register(lambda: lldb.SBDebugger.Terminate())
unittest2.main()