Working with signals and events¶
Events are a handy way to make your code react to changes in another part of the application.
To dispatch and listen to events, you first need to have one or more
Signal
instances as attributes of some class. Each signal needs to be
associated with some Event
class. Then, when you dispatch a new event
by calling dispatch()
, a new instance of this event class will be
constructed and passed to all listener callbacks.
To listen to events dispatched from a signal, you need to have a function or any other callable
that accepts a single positional argument. You then pass this callable to
connect()
. That’s it!
To disconnect the callback, simply call disconnect()
with whatever
you passed to connect()
as argument.
Here’s how it works:
from asphalt.core import Event, Signal
class CustomEvent(Event):
def __init__(self, source, topic, extra_argument):
super().__init__(source, topic)
self.extra_argument = extra_argument
class MyEventSource:
somesignal = Signal(Event)
customsignal = Signal(CustomEvent)
def plain_listener(event):
print('received event: %s' % event)
async def coro_listener(event):
print('coroutine listeners are fine too: %s' % event)
async def some_handler():
source = MyEventSource()
source.somesignal.connect(plain_listener)
source.customsignal.connect(coro_listener)
# Dispatches an Event instance
source.somesignal.dispatch()
# Dispatches a CustomEvent instance (the extra argument is passed to its constructor)
source.customsignal.dispatch('extra argument here')
Exception handling¶
Any exceptions raised by the listener callbacks are logged to the asphalt.core.event
logger.
Additionally, the future returned by dispatch()
resolves to
True
if no exceptions were raised during the processing of listeners. This was meant as a
convenience for use with tests where you can just do
assert await thing.some_signal.dispatch('foo')
.
Waiting for a single event¶
To wait for the next event dispatched from a given signal, you can use the
wait_event()
method:
async def print_next_event(source):
event = await source.somesignal.wait_event()
print(event)
You can even wait for the next event dispatched from any of several signals using the
wait_event()
function:
from asphalt.core import wait_event
async def print_next_event(source1, source2, source3):
event = await wait_event(source1.some_signal, source2.another_signal, source3.some_signal)
print(event)
As a convenience, you can provide a filter callback that will cause the call to only return when
the callback returns True
:
async def print_next_matching_event(source1, source2, source3):
event = await wait_event(source1.some_signal, source2.another_signal, source3.some_signal,
lambda event: event.myrandomproperty == 'foo')
print(event)
Receiving events iteratively¶
With stream_events()
, you can even asynchronously iterate over
events dispatched from a signal:
from contextlib import aclosing # on Python < 3.10, import from async_generator or contextlib2
async def listen_to_events(source):
async with aclosing(source.somesignal.stream_events()) as stream:
async for event in stream:
print(event)
Using stream_events()
, you can stream events from multiple signals:
from asphalt.core import stream_events
async def listen_to_events(source1, source2, source3):
stream = stream_events(source1.some_signal, source2.another_signal, source3.some_signal)
async with aclosing(stream):
async for event in stream:
print(event)
The filtering capability of wait_event()
works here too:
async def listen_to_events(source1, source2, source3):
stream = stream_events(source1.some_signal, source2.another_signal, source3.some_signal,
lambda event: event.randomproperty == 'foo')
async with aclosing(stream):
async for event in stream:
print(event)