Working with signals and events¶
Events are a handy way to make your code react to changes in another part of the application.
To dispatch and listen to events, you first need to have one or more
Signal instances as attributes of some class. Each signal needs to be
associated with some
Event class. Then, when you dispatch a new event
dispatch(), a new instance of this event class will be
constructed and passed to all listener callbacks.
To listen to events dispatched from a signal, you need to have a function or any other callable
that accepts a single positional argument. You then pass this callable to
connect(). That’s it!
To disconnect the callback, simply call
disconnect() with whatever
you passed to
connect() as argument.
Here’s how it works:
from asphalt.core import Event, Signal class CustomEvent(Event): def __init__(source, topic, extra_argument): super().__init__(source, topic) self.extra_argument = extra_argument class MyEventSource: somesignal = Signal(Event) customsignal = Signal(CustomEvent) def plain_listener(event): print('received event: %s' % event) async def coro_listener(event): print('coroutine listeners are fine too: %s' % event) async def some_handler(): source = MyEventSource() source.somesignal.connect(plain_listener) source.customsignal.connect(coro_listener) # Dispatches an Event instance source.somesignal.dispatch() # Dispatches a CustomEvent instance (the extra argument is passed to its constructor) source.customsignal.dispatch('extra argument here')
Any exceptions raised by the listener callbacks are logged to the
Additionally, the future returned by
dispatch() resolves to
True if no exceptions were raised during the processing of listeners. This was meant as a
convenience for use with tests where you can just do
assert await thing.some_signal.dispatch('foo').
Waiting for a single event¶
To wait for the next event dispatched from a given signal, you can use the
async def print_next_event(source): event = await source.somesignal.wait_event() print(event)
You can even wait for the next event dispatched from any of several signals using the
from asphalt.core import wait_event async def print_next_event(source1, source2, source3): event = await wait_event(source1.some_signal, source2.another_signal, source3.some_signal) print(event)
As a convenience, you can provide a filter callback that will cause the call to only return when
the callback returns
async def print_next_matching_event(source1, source2, source3): event = await wait_event(source1.some_signal, source2.another_signal, source3.some_signal, lambda event: event.myrandomproperty == 'foo') print(event)
Receiving events iteratively¶
stream_events(), you can even asynchronously iterate over
events dispatched from a signal:
from async_generator import aclosing async def listen_to_events(source): async with aclosing(source.somesignal.stream_events()) as stream: async for event in stream: print(event)
stream_events(), you can stream events from multiple signals:
from asphalt.core import stream_events async def listen_to_events(source1, source2, source3): stream = stream_events(source1.some_signal, source2.another_signal, source3.some_signal) async with aclosing(stream): async for event in stream: print(event)
The filtering capability of
wait_event() works here too:
async def listen_to_events(source1, source2, source3): stream = stream_events(source1.some_signal, source2.another_signal, source3.some_signal, lambda event: event.randomproperty == 'foo') async with aclosing(stream): async for event in stream: print(event)