Working with tasks and threads
Asphalt, as of version 5, leverages AnyIO to provide both structured concurrency and support for multiple asynchronous event loop implementations (asyncio and Trio, as of this writing). It is therefore highly recommended that you use AnyIO, rather than asyncio, for your task management, synchronization, concurrency and file I/O needs whenever possible. This ensures compatibility with AnyIO’s “level cancellation” model, and potentially enables you to switch to a different event loop implementation, should the need arise.
Working with asynchronous tasks
The main idea behind structured concurrency is that each task must have a parent task
watching over it, and if a task raises an exception that is not handled, then the
exception propagates to the parent task, taking all the other tasks down with it until
the exception is finally handled somewhere, or the entire process crashes. This ensures
that tasks never crash quietly, as is often the case with asyncio applications using
create_task()
to spawn “fire-and-forget” tasks.
To enable Asphalt users to work with tasks while respecting structured concurrency, two ways of launching tasks are provided: service tasks and background task factories.
Service tasks
Service tasks are started to last throughout the lifespan of the context. They’re typically used to run network services like web apps.
When the context is torn down, service tasks will also be shut down. By default, they
will be cancelled, but you can control this behavior by passing a different value as
teardown_action
:
cancel
(the default): cancel the task and wait for it to finisha callable: run the given callable at teardown to trigger the task to shut itself down (and wait for the task to finish on its own)
None
: do nothing and just wait for the task to finish on its own
If a service task crashes, it will take down the whole application with it. This is part of the structured concurrency design that is intended to ensure that failures don’t go unnoticed. You’re responsible for catching and handling any exceptions raised in service tasks.
Background task factories
Background task factories are meant to be used for dynamically spawning background tasks on demand. A typical example would be a web app request handler that needs to send an email, but wants to return a response to the end user straight away. The background task will thus outlive the task that was spawned to handle the request.
In contrast to service tasks, any running background task will block the teardown of the task factory from which they were spawned.
By default, an unhandled exception raised in a task spawned from a background task
factory will propagate and then crash the entire application. You can, however, control
this behavior by passing a callable as exception_handler
to
start_background_task_factory()
. If a background task crashes, the exception
handler is called with the exception as the sole argument. If the handler returns a
truthy value, the exception is ignored. In all other cases it is reraised.
Working with threads
Threads are usually used when calling functions that take a long time (more than tens of
milliseconds) to execute, either because they use significant amounts of CPU time, or
they access external devices. The recommended way to use threads from asynchronous code
is to use anyio.to_thread.run_sync()
:
from functools import partial
from anyio import to_thread
def my_synchronous_func(a: int, b: int, *, kwarg: str = "") -> int:
...
async def my_async_func() -> None:
retval = await to_thread.run_sync(my_synchronous_func, 2, 5)
# Use partial() if you need to pass keyword arguments
retval = await to_thread.run_sync(partial(my_synchronous_func, 3, 6, kwarg="foo"))
See also
To learn more about working with threads using AnyIO’s APIs, see the Working with threads section in AnyIO’s documentation.
Configuring the maximum amount of worker threads
The configuration option max_threads
sets the limit on how many worker threads will
at most be used with anyio.to_thread.run_sync()
if no explicit capacity limiter
is passed.
For example, this YAML configuration will set the thread limit to 60 in the default capacity limiter:
max_threads: 60
component:
...
Note
This will not affect backend-specific thread APIs like
asyncio.to_thread()
or asyncio.loop.run_in_executor()
!