API reference
Components
- class asphalt.core.Component
Bases:
object
This is the base class for all Asphalt components.
- add_component(alias, /, type=None, **config)
Add a child component.
This will store the type and configuration options of the named child component, to be later instantiated by
start_component()
.If the
type
argument is omitted, then the value of thealias
argument is used to derive the type.The locally given configuration can be overridden by component configuration parameters supplied to the constructor (via the
components
argument).When configuration values are provided both as keyword arguments to this method and component configuration through the
components
constructor argument, the configurations are merged together usingmerge_config()
in a way that the configuration values from thecomponents
argument override the keyword arguments to this method.- Parameters:
alias (str) – a name for the component instance, unique within this container
type (str | type[Component] | None) – name of and entry point in the
asphalt.components
namespace or aComponent
subclassconfig (Any) – mapping of keyword arguments passed to the component’s initializer
- Raises:
RuntimeError – if there is already a child component with the same alias
- Return type:
None
- async prepare()
Perform any necessary initialization before starting the component.
This method is called by
start_component()
before starting the child components of this component, so it can be used to add any resources required by the child components.- Return type:
- async start()
Perform any necessary tasks to start the services provided by this component.
This method is called by
start_component()
after the child components of this component have been started, so any resources provided by the child components are available at this point. :rtype:None
Note
Resources added within this method with the default name may be published under a different name if the component is deployed as a child component with an alias like
componenttype/resourcename
. In this case, resources added with the default name will be published under the nameresourcename
instead ofdefault
.Warning
Do not call this method directly; use
start_component()
instead.
- class asphalt.core.CLIApplicationComponent
Bases:
Component
Specialized
Component
subclass for command line tools.Command line tools and similar applications should use this as their root component and implement their main code in the
run()
method.When all the subcomponents have been started,
run()
is started as a new task. When the task is finished, the application will exit using the return value as its exit code.If
run()
raises an exception, a stack trace is printed and the exit code will be set to 1. If the returned exit code is out of range or of the wrong data type, it is set to 1 and a warning is emitted.- abstract async run()
Run the business logic of the command line tool.
Do not call this method yourself.
- Return type:
int | None
- Returns:
the application’s exit code (0-127;
None
= 0)
- async asphalt.core.start_component(component_class, config=None, *, timeout=20)
Start a component and its subcomponents.
- Parameters:
component_class – the root component class, an entry point name in the
asphalt.components
namespace or amodulename:varname
referenceconfig – configuration for the root component (and its child components)
timeout – seconds to wait for all the components in the hierarchy to start (default:
20
; set toNone
to disable timeout)
- Raises:
RuntimeError – if this function is called without an active
Context
TimeoutError – if the startup of the component hierarchy takes more than
timeout
secondsTypeError – if
component_class
is neither aComponent
subclass or a string
- exception asphalt.core.ComponentStartError(phase, path, component_type)
Bases:
Exception
Raised by
start_component()
when there’s an error instantiating or starting a component.Note
The underlying exception can be retrieved from the
__cause__
attribute.
Concurrency
- async asphalt.core.start_background_task_factory(*, exception_handler=None)
Start a service task that hosts ad-hoc background tasks.
Each of the tasks started by this factory is run in its own, separate Asphalt context, inherited from this context.
When the service task is torn down, it will wait for all the background tasks to finish before returning.
It is imperative to ensure that the task factory is set up after any of the resources potentially needed by the ad-hoc tasks are set up first. Failing to do so risks those resources being removed from the context before all the tasks have finished.
- Parameters:
exception_handler – a callback called to handle an exception raised from the task. Takes the exception (
Exception
) as the argument, and should returnTrue
if it successfully handled the exception.- Returns:
the task factory
See also
- async asphalt.core.start_service_task(func, name, *, teardown_action='cancel')
Start a background task that gets shut down when the context shuts down.
This function is meant to be used by components to run their tasks like network services that should be shut down with the application, because each call to this functions registers a context teardown callback that waits for the service task to finish before allowing the context teardown to continue.
If you supply a teardown callback, and it raises an exception, then the task will be cancelled instead.
- Parameters:
func (
Callable
[...
,Coroutine
[Any
,Any
,TypeVar
(T_Retval
)]]) – the coroutine function to runname (
str
) – descriptive name (e.g. “HTTP server”) for the task, to which the prefix “Service task: “ will be added when the task is actually created in the backing asynchronous event loop implementation (e.g. asyncio)teardown_action (
Union
[Callable
[[],Any
],Literal
['cancel'
],None
]) –the action to take when the context is being shut down:
'cancel'
: cancel the taskNone
: no action (the task must finish by itself)- (function, or any callable, can be asynchronous): run this callable to signal
the task to finish
- Return type:
- Returns:
any value passed to
task_status.started()
by the target callable if it supports that, otherwiseNone
- class asphalt.core.TaskFactory(exception_handler=None)
Bases:
object
- all_task_handles()
Return task handles for all the tasks currently running on the factory’s task group.
- Return type:
- async start_task(func, name=None)
Start a background task in the factory’s task group.
The task runs in its own context, inherited from the root context. If the task raises an exception (inherited from
Exception
), it is logged with a descriptive message containing the task’s name.To pass arguments to the target callable, pass them via lambda (e.g.
lambda: yourfunc(arg1, arg2, kw=val)
)If
func
takes an argument namedtask_status
, then this method will only return when the function has calledtask_status.started()
. Seeanyio.abc.TaskGroup.start()
for details. The value passed totask_status.started()
will be available as thestart_value
property on theTaskHandle
.- Parameters:
func (Callable[…, Coroutine[Any, Any, T_Retval]]) – the coroutine function to run
name (str | None) – descriptive name for the task
- Return type:
TaskHandle
- Returns:
a task handle that can be used to await on the result or cancel the task
- start_task_soon(func, name=None)
Start a background task in the factory’s task group.
This is similar to
start_task()
, but works from synchronous callbacks and doesn’t supportTaskStatus
.- Parameters:
func (Callable[…, Coroutine[Any, Any, T_Retval]]) – the coroutine function to run
name (str | None) – descriptive name for the task
- Return type:
TaskHandle
- Returns:
a task handle that can be used to await on the result or cancel the task
- class asphalt.core.TaskHandle(name)
Bases:
object
A representation of a task started from
TaskFactory
.- Variables:
name – the name of the task
start_value – the start value passed to
task_status.started()
if the target function supported that
Contexts and resources
- class asphalt.core.Context(parent=None)
Bases:
object
Contexts give request handlers and callbacks access to resources.
When a context is created, all resource factories and resources (except factory-generated resources, all of which are scoped to a specific context instance) are copied from the parent context.
- Variables:
resource_added (Signal) – a signal (
ResourceEvent
) dispatched when a resource has been published in this context
- add_resource(value, name='default', types=(), *, description=None, teardown_callback=None)
Add a resource to this context.
This will cause a
resource_added
event to be dispatched.- Parameters:
value (T_Resource) – the actual resource value
name (str) – name of this resource (unique among all its registered types within a single context)
types (type | Sequence[type]) – type(s) to register the resource as (omit to use the type of
value
)description (str | None) – an optional free-form description, for introspection/debugging
teardown_callback (Callable[[], Any] | None) – callable that is called to perform cleanup on this resource when the context is being shut down
- Raises:
ResourceConflict – if the resource conflicts with an existing one in any way
- Return type:
None
- add_resource_factory(factory_callback, name='default', *, types=(), description=None)
Add a resource factory to this context.
This will cause a
resource_added
event to be dispatched.A resource factory is a callable that generates a “contextual” resource when it is requested by either
get_resource()
orget_resource_nowait()
.The type(s) of the generated resources need to be specified, either by passing the
types
argument, or by adding a return type annotation to the factory function. If the generated resource needs to be registered as multiple types, you can useUnion
(e.g.Union[str, int]
) or a union type (e.g.str | int
; requiresfrom __future__ import annotations
on earlier than Python 3.10).When a new resource is created in this manner, it is always bound to the context through it was requested, regardless of where in the chain the factory itself was added to.
- Parameters:
factory_callback (FactoryCallback) – a (non-coroutine) callable that takes a context instance as argument and returns the created resource object
name (str) – name of the resource that will be created in the target context
types (type | Sequence[type]) – one or more types to register the generated resource as on the target context (can be omitted if the factory callable has a return type annotation)
description (str | None) – an optional free-form description, for introspection/debugging
- Raises:
ResourceConflict – if there is an existing resource factory for the given type/name combinations or the given context variable
- Return type:
None
- add_teardown_callback(callback, pass_exception=False)
Add a callback to be called when this context closes.
This is intended for cleanup of resources, and the list of callbacks is processed in the reverse order in which they were added, so the last added callback will be called first.
The callback may return an awaitable. If it does, the awaitable is awaited on before calling any further callbacks.
- Parameters:
callback (
Union
[Callable
[[],Any
],Callable
[[Optional
[BaseException
]],Any
]]) – a callable that is called with either no arguments or with the exception that ended this context, based on the value ofpass_exception
pass_exception (
bool
) –True
to pass the callback the exception that ended this context (orNone
if the context ended cleanly)
- Return type:
- property closed: bool
Return
True
if the teardown process has at least been initiated,False
otherwise.
- async get_resource(type, name='default', *, optional=False)
Look up a resource in this context.
- Parameters:
type (type[T_Resource]) – type of the requested resource
name (str) – name of the requested resource
optional (Literal[False, True]) – if
True
, returnNone
if the resource was not available
- Return type:
T_Resource | None
- Returns:
the requested resource, or
None
if none was available andoptional
wasFalse
- get_resource_nowait(type, name='default', *, optional=False)
Look up a resource in this context.
This method will not trigger async resource factories.
- Parameters:
type (type[T_Resource]) – type of the requested resource
name (str) – name of the requested resource
optional (Literal[False, True]) – if
True
, returnNone
if the resource was not available
- Return type:
T_Resource | None
- Returns:
the requested resource, or
None
if none was available andoptional
wasFalse
- get_resources(type)
Retrieve all the resources of the given type in this context.
This method does not trigger resource factories; it only retrieves resources already in the context chain.
- async start_background_task_factory(*, exception_handler=None)
Start a service task that hosts ad-hoc background tasks.
Each of the tasks started by this factory is run in its own, separate Asphalt context, inherited from this context.
When the service task is torn down, it will wait for all the background tasks to finish before returning.
It is imperative to ensure that the task factory is set up after any of the resources potentially needed by the ad-hoc tasks are set up first. Failing to do so risks those resources being removed from the context before all the tasks have finished.
- Parameters:
exception_handler (ExceptionHandler | None) – a callback called to handle an exception raised from the task. Takes the exception (
Exception
) as the argument, and should returnTrue
if it successfully handled the exception.- Return type:
TaskFactory
- Returns:
the task factory
See also
- async start_service_task(func, name, *, teardown_action='cancel')
Start a background task that gets shut down when the context shuts down.
This function is meant to be used by components to run their tasks like network services that should be shut down with the application, because each call to this functions registers a context teardown callback that waits for the service task to finish before allowing the context teardown to continue.
If you supply a teardown callback, and it raises an exception, then the task will be cancelled instead.
- Parameters:
func (
Callable
[...
,Coroutine
[Any
,Any
,TypeVar
(T_Retval
)]]) – the coroutine function to runname (
str
) – descriptive name (e.g. “HTTP server”) for the task, to which the prefix “Service task: “ will be added when the task is actually created in the backing asynchronous event loop implementation (e.g. asyncio)teardown_action (
Union
[Callable
[[],Any
],Literal
['cancel'
],None
]) –the action to take when the context is being shut down:
'cancel'
: cancel the taskNone
: no action (the task must finish by itself)- (function, or any callable, can be asynchronous): run this callable to signal
the task to finish
- Return type:
- Returns:
any value passed to
task_status.started()
by the target callable if it supports that, otherwiseNone
- class asphalt.core.ResourceEvent(resource_types, resource_name, resource_description, is_factory)
Bases:
Event
Dispatched when a resource or resource factory has been added to a context.
- asphalt.core.current_context()
Return the currently active context.
- Raises:
NoCurrentContext – if there is no active context
- Return type:
- asphalt.core.context_teardown(func)
Wrap an async generator function to execute the rest of the function at context teardown.
This function returns an async function, which, when called, starts the wrapped async generator. The wrapped async function is run until the first
yield
statement When the context is being torn down, the exception that ended the context, if any, is sent to the generator.For example:
class SomeComponent(Component): @context_teardown async def start(self, ctx: ComponentContext): service = SomeService() add_resource(service) exception = yield service.stop()
- Parameters:
func – an async generator function
- Returns:
an async function
- asphalt.core.add_resource(value, name='default', types=(), *, description=None, teardown_callback=None)
Shortcut for
current_context().add_resource(...)
.See also
- asphalt.core.add_resource_factory(factory_callback, name='default', *, types=(), description=None)
Shortcut for
current_context().add_resource_factory(...)
.See also
- asphalt.core.add_teardown_callback(callback, pass_exception=False)
Shortcut for
current_context().add_teardown_callback(...)
. :rtype:None
See also
- async asphalt.core.get_resource(type, name='default', *, optional=False)
Shortcut for
current_context().get_resource(...)
.See also
- asphalt.core.get_resource_nowait(type, name='default', *, optional=False)
Shortcut for
current_context().get_resource_nowait(...)
.See also
- asphalt.core.inject(func)
Wrap the given coroutine function for use with dependency injection.
Parameters with dependencies need to be annotated and have
resource()
as the default value. When the wrapped function is called, values for such parameters will be automatically filled in by callingget_resource()
using the parameter’s type annotation and the resource name passed toresource()
(or"default"
) as the arguments.Any forward references among the type annotations are resolved on the first call to the wrapped function.
- asphalt.core.resource(name='default')
Marker for declaring a parameter for dependency injection via
inject()
.
- exception asphalt.core.AsyncResourceError
Bases:
Exception
Raised when
Context.get_resource_nowait()
received a coroutine object from a resource factory (the factory was asynchronous).
- exception asphalt.core.NoCurrentContext
Bases:
Exception
Raised by :func: current_context when there is no active context.
- exception asphalt.core.ResourceConflict
Bases:
Exception
Raised when a new resource that is being published conflicts with an existing resource or context variable.
- exception asphalt.core.ResourceNotFound(type, name)
Bases:
LookupError
Raised when a resource request cannot be fulfilled within the allotted time.
Events
- class asphalt.core.Signal(event_class)
Bases:
Generic
[T_Event
]Declaration of a signal that can be used to dispatch events.
This is a descriptor that returns itself on class level attribute access and a bound version of itself on instance level access. Dispatching and streaming events only works with these bound instances.
Each signal must be assigned to a class attribute, but only once. The Signal will not function correctly if the same Signal instance is assigned to multiple attributes.
- dispatch(event)
Dispatch an event.
- Raises:
UnboundSignal – if attempting to dispatch an event on a signal not bound to any instance of the containing class
- Return type:
- stream_events(filter=None, *, max_queue_size=50)
Shortcut for calling
stream_events()
with this signal in the first argument.- Return type:
AbstractAsyncContextManager[AsyncIterator[T_Event]]
- async wait_event(filter=None)
Shortcut for calling
wait_event()
with this signal in the first argument.- Return type:
T_Event
- class asphalt.core.SignalQueueFull
Bases:
UserWarning
Warning about signal delivery failing due to a subscriber’s queue being full because the subscriber could not receive the events quickly enough.
- asphalt.core.stream_events(signals, filter=None, *, max_queue_size=50)
Return an async generator that yields events from the given signals.
Only events that pass the filter callable (if one has been given) are returned. If no filter function was given, all events are yielded from the generator.
If another event is received from any of the signals while the previous one is still being yielded, it will stay in the queue. If the queue fills up, then
dispatch()
on all the signals will block until the yield has been processed.The listening of events from the given signals starts when this function is called.
- Parameters:
signals (Sequence[Signal[T_Event]]) – the signals to get events from
filter (Callable[[T_Event], bool] | None) – a callable that takes an event object as an argument and returns a truthy value if the event should pass
max_queue_size (int) – maximum number of unprocessed events in the queue
- Return type:
AsyncIterator[AsyncIterator[T_Event]]
- Returns:
an async generator yielding all events (that pass the filter, if any) from all the given signals
- Raises:
UnboundSignal – if attempting to listen to events on a signal not bound to any instance of the containing class
- async asphalt.core.wait_event(signals, filter=None)
Wait until any of the given signals dispatches an event that satisfies the filter (if any).
If no filter has been given, the first event dispatched from any of the signals is returned.
The listening of events from the given signals starts when this function is called.
- Parameters:
signals – the signals to get events from
filter – a callable that takes an event object as an argument and returns a truthy value if the event should pass
- Returns:
the first event (that passed the filter, if any) that was dispatched from any of the signals
- Raises:
UnboundSignal – if attempting to listen to events on a signal not bound to any instance of the containing class
Application runner
- asphalt.core.run_application(component_class, config=None, *, backend='asyncio', backend_options=None, max_threads=None, logging=20, start_timeout=10)
Configure logging and start the given component.
Assuming the root component was started successfully, the event loop will continue running until the process is terminated.
- Initializes the logging system first based on the value of
logging
: If the value is a dictionary, it is passed to
logging.config.dictConfig()
as argument.If the value is an integer, it is passed to
logging.basicConfig()
as the logging level.If the value is
None
, logging setup is skipped entirely.
By default, the logging system is initialized using
basicConfig()
using theINFO
logging level.The default executor in the event loop is replaced with a new
ThreadPoolExecutor
where the maximum number of threads is set to the value ofmax_threads
or, if omitted, the default value ofThreadPoolExecutor
.- Parameters:
component_class – the root component class, an entry point name in the
asphalt.components
namespace or amodulename:varname
referenceconfig – configuration options for the root component
backend – name of the AnyIO backend (e.g.
asyncio
ortrio
)backend_options – options to pass to the AnyIO backend (see the AnyIO documentation for reference)
max_threads – the maximum number of worker threads in the default thread pool executor (the default value depends on the event loop implementation)
logging – a logging configuration dictionary, logging level or`None``
start_timeout – seconds to wait for the root component (and its subcomponents) to start up before giving up (
None
= wait forever)
- Raises:
SystemExit – if the root component fails to start, or an exception is raised when the application is running
- Initializes the logging system first based on the value of
Utilities
- class asphalt.core.PluginContainer(namespace, base_class=None)
Bases:
object
A convenience class for loading and instantiating plugins through the use of entry points.
- Parameters:
namespace (str) – a setuptools entry points namespace
base_class (type | None) – the base class for plugins of this type (or
None
if the entry points don’t point to classes)
- all()
Load all entry points (if not already loaded) in this namespace and return the resulting objects as a list.
- create_object(type, **constructor_kwargs)
Instantiate a plugin.
The entry points in this namespace must point to subclasses of the
base_class
parameter passed to this container.- Parameters:
type (type | str) – an entry point identifier or an actual class object
constructor_kwargs (Any) – keyword arguments passed to the constructor of the plugin class
- Return type:
Any
- Returns:
the plugin instance
- resolve(obj)
Resolve a reference to an entry point.
If
obj
is a string, the named entry point is loaded from this container’s namespace. Otherwise,obj
is returned as is.- Parameters:
obj (
Any
) – an entry point identifier, an object reference or an arbitrary object- Return type:
- Returns:
the loaded entry point, resolved object or the unchanged input value
- Raises:
LookupError – if
obj
was a string but the named entry point was not found
- asphalt.core.callable_name(func)
Return the qualified name (e.g. package.module.func) for the given callable.
- Return type:
- asphalt.core.merge_config(original, overrides)
Return a copy of the
original
configuration dictionary, with overrides fromoverrides
applied.This similar to what
dict.update()
does, but when a dictionary is about to be replaced with another dictionary, it instead merges the contents.- Parameters:
original – a configuration dictionary (or
None
)overrides – a dictionary containing overriding values to the configuration (or
None
)
- Returns:
the merge result
Changed in version 5.0: Previously, if a key in
overrides
was a dotted path (ie.foo.bar.baz: value
), it was assumed to be a shorthand forfoo: {bar: {baz: value}}
. In v5.0, this feature was removed, as it turned out to be a problem with logging configuration, as it was not possible to configure any logging that had a dot in its name (as is the case with most loggers).
- asphalt.core.qualified_name(obj)
Return the qualified name (e.g. package.module.Type) for the given object.
If
obj
is not a class, the returned name will match its type instead.- Return type:
- asphalt.core.resolve_reference(ref)
Return the object pointed to by
ref
. Ifref
is not a string or does not contain:
, it is returned as is. References must be in the form <modulename>:<varname> where <modulename> is the fully qualified module name and varname is the path to the variable inside that module. For example, “concurrent.futures:Future” would give you theFuture
class.- Raises:
LookupError – if the reference could not be resolved
- Return type: