API reference

Components

class asphalt.core.Component

Bases: object

This is the base class for all Asphalt components.

add_component(alias, /, type=None, **config)

Add a child component.

This will store the type and configuration options of the named child component, to be later instantiated by start_component().

If the type argument is omitted, then the value of the alias argument is used to derive the type.

The locally given configuration can be overridden by component configuration parameters supplied to the constructor (via the components argument).

When configuration values are provided both as keyword arguments to this method and component configuration through the components constructor argument, the configurations are merged together using merge_config() in a way that the configuration values from the components argument override the keyword arguments to this method.

Parameters:
  • alias (str) – a name for the component instance, unique within this container

  • type (str | type[Component] | None) – name of and entry point in the asphalt.components namespace or a Component subclass

  • config (Any) – mapping of keyword arguments passed to the component’s initializer

Raises:

RuntimeError – if there is already a child component with the same alias

Return type:

None

async prepare()

Perform any necessary initialization before starting the component.

This method is called by start_component() before starting the child components of this component, so it can be used to add any resources required by the child components.

Return type:

None

async start()

Perform any necessary tasks to start the services provided by this component.

This method is called by start_component() after the child components of this component have been started, so any resources provided by the child components are available at this point. :rtype: None

Note

Resources added within this method with the default name may be published under a different name if the component is deployed as a child component with an alias like componenttype/resourcename. In this case, resources added with the default name will be published under the name resourcename instead of default.

Warning

Do not call this method directly; use start_component() instead.

class asphalt.core.CLIApplicationComponent

Bases: Component

Specialized Component subclass for command line tools.

Command line tools and similar applications should use this as their root component and implement their main code in the run() method.

When all the subcomponents have been started, run() is started as a new task. When the task is finished, the application will exit using the return value as its exit code.

If run() raises an exception, a stack trace is printed and the exit code will be set to 1. If the returned exit code is out of range or of the wrong data type, it is set to 1 and a warning is emitted.

abstract async run()

Run the business logic of the command line tool.

Do not call this method yourself.

Return type:

int | None

Returns:

the application’s exit code (0-127; None = 0)

async asphalt.core.start_component(component_class, config=None, *, timeout=20)

Start a component and its subcomponents.

Parameters:
  • component_class – the root component class, an entry point name in the asphalt.components namespace or a modulename:varname reference

  • config – configuration for the root component (and its child components)

  • timeout – seconds to wait for all the components in the hierarchy to start (default: 20; set to None to disable timeout)

Raises:
exception asphalt.core.ComponentStartError(phase, path, component_type)

Bases: Exception

Raised by start_component() when there’s an error instantiating or starting a component.

Note

The underlying exception can be retrieved from the __cause__ attribute.

Variables:
  • phase (Literal["creating", "preparing", "starting"]) – the phase of the component initialization in which the error occurred

  • path (str) – the path of the component in the configuration (an empty string if this is the root component)

  • component_type (type[Component]) – the component class

Concurrency

async asphalt.core.start_background_task_factory(*, exception_handler=None)

Start a service task that hosts ad-hoc background tasks.

Each of the tasks started by this factory is run in its own, separate Asphalt context, inherited from this context.

When the service task is torn down, it will wait for all the background tasks to finish before returning.

It is imperative to ensure that the task factory is set up after any of the resources potentially needed by the ad-hoc tasks are set up first. Failing to do so risks those resources being removed from the context before all the tasks have finished.

Parameters:

exception_handler – a callback called to handle an exception raised from the task. Takes the exception (Exception) as the argument, and should return True if it successfully handled the exception.

Returns:

the task factory

async asphalt.core.start_service_task(func, name, *, teardown_action='cancel')

Start a background task that gets shut down when the context shuts down.

This function is meant to be used by components to run their tasks like network services that should be shut down with the application, because each call to this functions registers a context teardown callback that waits for the service task to finish before allowing the context teardown to continue.

If you supply a teardown callback, and it raises an exception, then the task will be cancelled instead.

Parameters:
  • func (Callable[..., Coroutine[Any, Any, TypeVar(T_Retval)]]) – the coroutine function to run

  • name (str) – descriptive name (e.g. “HTTP server”) for the task, to which the prefix “Service task: “ will be added when the task is actually created in the backing asynchronous event loop implementation (e.g. asyncio)

  • teardown_action (Union[Callable[[], Any], Literal['cancel'], None]) –

    the action to take when the context is being shut down:

    • 'cancel': cancel the task

    • None: no action (the task must finish by itself)

    • (function, or any callable, can be asynchronous): run this callable to signal

      the task to finish

Return type:

Any

Returns:

any value passed to task_status.started() by the target callable if it supports that, otherwise None

class asphalt.core.TaskFactory(exception_handler=None)

Bases: object

all_task_handles()

Return task handles for all the tasks currently running on the factory’s task group.

Return type:

set[TaskHandle]

async start_task(func, name=None)

Start a background task in the factory’s task group.

The task runs in its own context, inherited from the root context. If the task raises an exception (inherited from Exception), it is logged with a descriptive message containing the task’s name.

To pass arguments to the target callable, pass them via lambda (e.g. lambda: yourfunc(arg1, arg2, kw=val))

If func takes an argument named task_status, then this method will only return when the function has called task_status.started(). See anyio.abc.TaskGroup.start() for details. The value passed to task_status.started() will be available as the start_value property on the TaskHandle.

Parameters:
  • func (Callable[…, Coroutine[Any, Any, T_Retval]]) – the coroutine function to run

  • name (str | None) – descriptive name for the task

Return type:

TaskHandle

Returns:

a task handle that can be used to await on the result or cancel the task

start_task_soon(func, name=None)

Start a background task in the factory’s task group.

This is similar to start_task(), but works from synchronous callbacks and doesn’t support TaskStatus.

Parameters:
  • func (Callable[…, Coroutine[Any, Any, T_Retval]]) – the coroutine function to run

  • name (str | None) – descriptive name for the task

Return type:

TaskHandle

Returns:

a task handle that can be used to await on the result or cancel the task

class asphalt.core.TaskHandle(name)

Bases: object

A representation of a task started from TaskFactory.

Variables:
  • name – the name of the task

  • start_value – the start value passed to task_status.started() if the target function supported that

cancel()

Schedule the task to be cancelled.

Return type:

None

async wait_finished()

Wait until the task is finished.

Return type:

None

Contexts and resources

class asphalt.core.Context(parent=None)

Bases: object

Contexts give request handlers and callbacks access to resources.

When a context is created, all resource factories and resources (except factory-generated resources, all of which are scoped to a specific context instance) are copied from the parent context.

Variables:

resource_added (Signal) – a signal (ResourceEvent) dispatched when a resource has been published in this context

add_resource(value, name='default', types=(), *, description=None, teardown_callback=None)

Add a resource to this context.

This will cause a resource_added event to be dispatched.

Parameters:
  • value (T_Resource) – the actual resource value

  • name (str) – name of this resource (unique among all its registered types within a single context)

  • types (type | Sequence[type]) – type(s) to register the resource as (omit to use the type of value)

  • description (str | None) – an optional free-form description, for introspection/debugging

  • teardown_callback (Callable[[], Any] | None) – callable that is called to perform cleanup on this resource when the context is being shut down

Raises:

ResourceConflict – if the resource conflicts with an existing one in any way

Return type:

None

add_resource_factory(factory_callback, name='default', *, types=(), description=None)

Add a resource factory to this context.

This will cause a resource_added event to be dispatched.

A resource factory is a callable that generates a “contextual” resource when it is requested by either get_resource() or get_resource_nowait().

The type(s) of the generated resources need to be specified, either by passing the types argument, or by adding a return type annotation to the factory function. If the generated resource needs to be registered as multiple types, you can use Union (e.g. Union[str, int]) or a union type (e.g. str | int; requires from __future__ import annotations on earlier than Python 3.10).

When a new resource is created in this manner, it is always bound to the context through it was requested, regardless of where in the chain the factory itself was added to.

Parameters:
  • factory_callback (FactoryCallback) – a (non-coroutine) callable that takes a context instance as argument and returns the created resource object

  • name (str) – name of the resource that will be created in the target context

  • types (type | Sequence[type]) – one or more types to register the generated resource as on the target context (can be omitted if the factory callable has a return type annotation)

  • description (str | None) – an optional free-form description, for introspection/debugging

Raises:

ResourceConflict – if there is an existing resource factory for the given type/name combinations or the given context variable

Return type:

None

add_teardown_callback(callback, pass_exception=False)

Add a callback to be called when this context closes.

This is intended for cleanup of resources, and the list of callbacks is processed in the reverse order in which they were added, so the last added callback will be called first.

The callback may return an awaitable. If it does, the awaitable is awaited on before calling any further callbacks.

Parameters:
  • callback (Union[Callable[[], Any], Callable[[Optional[BaseException]], Any]]) – a callable that is called with either no arguments or with the exception that ended this context, based on the value of pass_exception

  • pass_exception (bool) – True to pass the callback the exception that ended this context (or None if the context ended cleanly)

Return type:

None

property closed: bool

Return True if the teardown process has at least been initiated, False otherwise.

async get_resource(type, name='default', *, optional=False)

Look up a resource in this context.

Parameters:
  • type (type[T_Resource]) – type of the requested resource

  • name (str) – name of the requested resource

  • optional (Literal[False, True]) – if True, return None if the resource was not available

Return type:

T_Resource | None

Returns:

the requested resource, or None if none was available and optional was False

get_resource_nowait(type, name='default', *, optional=False)

Look up a resource in this context.

This method will not trigger async resource factories.

Parameters:
  • type (type[T_Resource]) – type of the requested resource

  • name (str) – name of the requested resource

  • optional (Literal[False, True]) – if True, return None if the resource was not available

Return type:

T_Resource | None

Returns:

the requested resource, or None if none was available and optional was False

get_resources(type)

Retrieve all the resources of the given type in this context.

This method does not trigger resource factories; it only retrieves resources already in the context chain.

Parameters:

type (type[TypeVar(T_Resource)]) – type of the resources to get

Return type:

Mapping[str, TypeVar(T_Resource)]

Returns:

a mapping of resource name to the resource

property parent: Context | None

Return the parent context, or None if there is no parent.

async start_background_task_factory(*, exception_handler=None)

Start a service task that hosts ad-hoc background tasks.

Each of the tasks started by this factory is run in its own, separate Asphalt context, inherited from this context.

When the service task is torn down, it will wait for all the background tasks to finish before returning.

It is imperative to ensure that the task factory is set up after any of the resources potentially needed by the ad-hoc tasks are set up first. Failing to do so risks those resources being removed from the context before all the tasks have finished.

Parameters:

exception_handler (ExceptionHandler | None) – a callback called to handle an exception raised from the task. Takes the exception (Exception) as the argument, and should return True if it successfully handled the exception.

Return type:

TaskFactory

Returns:

the task factory

async start_service_task(func, name, *, teardown_action='cancel')

Start a background task that gets shut down when the context shuts down.

This function is meant to be used by components to run their tasks like network services that should be shut down with the application, because each call to this functions registers a context teardown callback that waits for the service task to finish before allowing the context teardown to continue.

If you supply a teardown callback, and it raises an exception, then the task will be cancelled instead.

Parameters:
  • func (Callable[..., Coroutine[Any, Any, TypeVar(T_Retval)]]) – the coroutine function to run

  • name (str) – descriptive name (e.g. “HTTP server”) for the task, to which the prefix “Service task: “ will be added when the task is actually created in the backing asynchronous event loop implementation (e.g. asyncio)

  • teardown_action (Union[Callable[[], Any], Literal['cancel'], None]) –

    the action to take when the context is being shut down:

    • 'cancel': cancel the task

    • None: no action (the task must finish by itself)

    • (function, or any callable, can be asynchronous): run this callable to signal

      the task to finish

Return type:

Any

Returns:

any value passed to task_status.started() by the target callable if it supports that, otherwise None

class asphalt.core.ResourceEvent(resource_types, resource_name, resource_description, is_factory)

Bases: Event

Dispatched when a resource or resource factory has been added to a context.

Variables:
  • resource_types (tuple[type, ...]) – types the resource was registered under

  • name (str) – name of the resource

  • resource_description – an optional human-readable description of the resource

  • is_factory (bool) – True if a resource factory was added, False if a regular resource was added

asphalt.core.current_context()

Return the currently active context.

Raises:

NoCurrentContext – if there is no active context

Return type:

Context

asphalt.core.context_teardown(func)

Wrap an async generator function to execute the rest of the function at context teardown.

This function returns an async function, which, when called, starts the wrapped async generator. The wrapped async function is run until the first yield statement When the context is being torn down, the exception that ended the context, if any, is sent to the generator.

For example:

class SomeComponent(Component):
    @context_teardown
    async def start(self, ctx: ComponentContext):
        service = SomeService()
        add_resource(service)
        exception = yield
        service.stop()
Parameters:

func – an async generator function

Returns:

an async function

asphalt.core.add_resource(value, name='default', types=(), *, description=None, teardown_callback=None)

Shortcut for current_context().add_resource(...).

asphalt.core.add_resource_factory(factory_callback, name='default', *, types=(), description=None)

Shortcut for current_context().add_resource_factory(...).

asphalt.core.add_teardown_callback(callback, pass_exception=False)

Shortcut for current_context().add_teardown_callback(...). :rtype: None

async asphalt.core.get_resource(type, name='default', *, optional=False)

Shortcut for current_context().get_resource(...).

asphalt.core.get_resource_nowait(type, name='default', *, optional=False)

Shortcut for current_context().get_resource_nowait(...).

asphalt.core.inject(func)

Wrap the given coroutine function for use with dependency injection.

Parameters with dependencies need to be annotated and have resource() as the default value. When the wrapped function is called, values for such parameters will be automatically filled in by calling get_resource() using the parameter’s type annotation and the resource name passed to resource() (or "default") as the arguments.

Any forward references among the type annotations are resolved on the first call to the wrapped function.

Return type:

Callable[[ParamSpec(P)], Any]

asphalt.core.resource(name='default')

Marker for declaring a parameter for dependency injection via inject().

Parameters:

name (str) – the resource name (defaults to default)

Return type:

Any

exception asphalt.core.AsyncResourceError

Bases: Exception

Raised when Context.get_resource_nowait() received a coroutine object from a resource factory (the factory was asynchronous).

exception asphalt.core.NoCurrentContext

Bases: Exception

Raised by :func: current_context when there is no active context.

exception asphalt.core.ResourceConflict

Bases: Exception

Raised when a new resource that is being published conflicts with an existing resource or context variable.

exception asphalt.core.ResourceNotFound(type, name)

Bases: LookupError

Raised when a resource request cannot be fulfilled within the allotted time.

Events

class asphalt.core.Event

Bases: object

The base class for all events.

Variables:
  • source – the object where this event originated from

  • topic (str) – the topic

  • time (float) – event creation time as seconds from the UNIX epoch

property utc_timestamp: datetime

Return a timezone aware datetime corresponding to the time variable, using the UTC timezone.

class asphalt.core.Signal(event_class)

Bases: Generic[T_Event]

Declaration of a signal that can be used to dispatch events.

This is a descriptor that returns itself on class level attribute access and a bound version of itself on instance level access. Dispatching and streaming events only works with these bound instances.

Each signal must be assigned to a class attribute, but only once. The Signal will not function correctly if the same Signal instance is assigned to multiple attributes.

Parameters:

event_class (type[TypeVar(T_Event, bound= Event)]) – an event class

dispatch(event)

Dispatch an event.

Raises:

UnboundSignal – if attempting to dispatch an event on a signal not bound to any instance of the containing class

Return type:

None

stream_events(filter=None, *, max_queue_size=50)

Shortcut for calling stream_events() with this signal in the first argument.

Return type:

AbstractAsyncContextManager[AsyncIterator[T_Event]]

async wait_event(filter=None)

Shortcut for calling wait_event() with this signal in the first argument.

Return type:

T_Event

class asphalt.core.SignalQueueFull

Bases: UserWarning

Warning about signal delivery failing due to a subscriber’s queue being full because the subscriber could not receive the events quickly enough.

asphalt.core.stream_events(signals, filter=None, *, max_queue_size=50)

Return an async generator that yields events from the given signals.

Only events that pass the filter callable (if one has been given) are returned. If no filter function was given, all events are yielded from the generator.

If another event is received from any of the signals while the previous one is still being yielded, it will stay in the queue. If the queue fills up, then dispatch() on all the signals will block until the yield has been processed.

The listening of events from the given signals starts when this function is called.

Parameters:
  • signals (Sequence[Signal[T_Event]]) – the signals to get events from

  • filter (Callable[[T_Event], bool] | None) – a callable that takes an event object as an argument and returns a truthy value if the event should pass

  • max_queue_size (int) – maximum number of unprocessed events in the queue

Return type:

AsyncIterator[AsyncIterator[T_Event]]

Returns:

an async generator yielding all events (that pass the filter, if any) from all the given signals

Raises:

UnboundSignal – if attempting to listen to events on a signal not bound to any instance of the containing class

async asphalt.core.wait_event(signals, filter=None)

Wait until any of the given signals dispatches an event that satisfies the filter (if any).

If no filter has been given, the first event dispatched from any of the signals is returned.

The listening of events from the given signals starts when this function is called.

Parameters:
  • signals – the signals to get events from

  • filter – a callable that takes an event object as an argument and returns a truthy value if the event should pass

Returns:

the first event (that passed the filter, if any) that was dispatched from any of the signals

Raises:

UnboundSignal – if attempting to listen to events on a signal not bound to any instance of the containing class

exception asphalt.core.UnboundSignal

Bases: Exception

Raised when attempting to dispatch or listen to events on a Signal on the class level, rather than on an instance of the class.

Application runner

asphalt.core.run_application(component_class, config=None, *, backend='asyncio', backend_options=None, max_threads=None, logging=20, start_timeout=10)

Configure logging and start the given component.

Assuming the root component was started successfully, the event loop will continue running until the process is terminated.

Initializes the logging system first based on the value of logging:

By default, the logging system is initialized using basicConfig() using the INFO logging level.

The default executor in the event loop is replaced with a new ThreadPoolExecutor where the maximum number of threads is set to the value of max_threads or, if omitted, the default value of ThreadPoolExecutor.

Parameters:
  • component_class – the root component class, an entry point name in the asphalt.components namespace or a modulename:varname reference

  • config – configuration options for the root component

  • backend – name of the AnyIO backend (e.g. asyncio or trio)

  • backend_options – options to pass to the AnyIO backend (see the AnyIO documentation for reference)

  • max_threads – the maximum number of worker threads in the default thread pool executor (the default value depends on the event loop implementation)

  • logging – a logging configuration dictionary, logging level or`None``

  • start_timeout – seconds to wait for the root component (and its subcomponents) to start up before giving up (None = wait forever)

Raises:

SystemExit – if the root component fails to start, or an exception is raised when the application is running

Utilities

class asphalt.core.PluginContainer(namespace, base_class=None)

Bases: object

A convenience class for loading and instantiating plugins through the use of entry points.

Parameters:
  • namespace (str) – a setuptools entry points namespace

  • base_class (type | None) – the base class for plugins of this type (or None if the entry points don’t point to classes)

all()

Load all entry points (if not already loaded) in this namespace and return the resulting objects as a list.

Return type:

list[Any]

create_object(type, **constructor_kwargs)

Instantiate a plugin.

The entry points in this namespace must point to subclasses of the base_class parameter passed to this container.

Parameters:
  • type (type | str) – an entry point identifier or an actual class object

  • constructor_kwargs (Any) – keyword arguments passed to the constructor of the plugin class

Return type:

Any

Returns:

the plugin instance

property names: list[str]

Return names of all entry points in this namespace.

resolve(obj)

Resolve a reference to an entry point.

If obj is a string, the named entry point is loaded from this container’s namespace. Otherwise, obj is returned as is.

Parameters:

obj (Any) – an entry point identifier, an object reference or an arbitrary object

Return type:

Any

Returns:

the loaded entry point, resolved object or the unchanged input value

Raises:

LookupError – if obj was a string but the named entry point was not found

asphalt.core.callable_name(func)

Return the qualified name (e.g. package.module.func) for the given callable.

Return type:

str

asphalt.core.merge_config(original, overrides)

Return a copy of the original configuration dictionary, with overrides from overrides applied.

This similar to what dict.update() does, but when a dictionary is about to be replaced with another dictionary, it instead merges the contents.

Parameters:
  • original – a configuration dictionary (or None)

  • overrides – a dictionary containing overriding values to the configuration (or None)

Returns:

the merge result

Changed in version 5.0: Previously, if a key in overrides was a dotted path (ie. foo.bar.baz: value), it was assumed to be a shorthand for foo: {bar: {baz: value}}. In v5.0, this feature was removed, as it turned out to be a problem with logging configuration, as it was not possible to configure any logging that had a dot in its name (as is the case with most loggers).

asphalt.core.qualified_name(obj)

Return the qualified name (e.g. package.module.Type) for the given object.

If obj is not a class, the returned name will match its type instead.

Return type:

str

asphalt.core.resolve_reference(ref)

Return the object pointed to by ref. If ref is not a string or does not contain :, it is returned as is. References must be in the form <modulename>:<varname> where <modulename> is the fully qualified module name and varname is the path to the variable inside that module. For example, “concurrent.futures:Future” would give you the Future class.

Raises:

LookupError – if the reference could not be resolved

Return type:

Any