Working with signals and events

Events are a handy way to make your code react to changes in another part of the application. To dispatch and listen to events, you first need to have one or more Signal instances as attributes of some class. Each signal needs to be associated with some Event class. Then, when you dispatch a new event by calling Signal.dispatch(), the given event will be passed to all tasks currently subscribed to that signal.

For listening to events dispatched from a signal, you have two options:

  1. wait_event() (for returning after receiving one event)

  2. stream_events() (for asynchronously iterating over events as they come)

If you only intend to listen to a single signal at once, you can use Signal.wait_event() or Signal.stream_events() as shortcuts.

Receiving events iteratively

Here’s an example of an event source containing two signals (somesignal and customsignal) and code that subscribes to said signals, dispatches an event on both signals and then prints them out as they are received:

from dataclasses import dataclass

from asphalt.core import Event, Signal, stream_events


@dataclass
class CustomEvent(Event):
    extra_argument: str


class MyEventSource:
    somesignal = Signal(Event)
    customsignal = Signal(CustomEvent)


async def some_handler():
    source = MyEventSource()
    async with stream_events([source.somesignal, source.customsignal]) as events:
        # Dispatch a basic Event
        source.somesignal.dispatch(Event())

        # Dispatch a CustomEvent
        source.customsignal.dispatch(CustomEvent("extra argument here"))

        async for event in events:
            print(f"received event: {event}")

Waiting for a single event

To wait for the next event dispatched from a given signal, you can use the Signal.wait_event() method:

async def print_next_event(source: MyEventSource) -> None:
    event = await source.somesignal.wait_event()
    print(event)

You can even wait for the next event dispatched from any of several signals using the wait_event() function:

from asphalt.core import wait_event


async def print_next_event(
    source1: MyEventSource,
    source2: MyEventSource,
    source3: MyEventSource,
) -> None:
    event = await wait_event(
        [source1.some_signal, source2.custom_signal, source3.some_signal]
    )
    print(event)

Filtering received events

You can provide a filter callback that will take an event as the sole argument. Only if the callback returns True, will the event be received by the listener:

async def print_next_matching_event(source: MyEventSource) -> None:
    event = await source.customsignal.wait_event(
        lambda event: event.extra_argument == "foo"
    )
    print("Got an event with 'foo' as extra_argument")

The same works for event streams too:

async def print_matching_events(source: MyEventSource) -> None:
    async with source.customsignal.stream_events(
        lambda event: event.extra_argument == "foo"
    ) as events:
        async for event in events:
            print(event)