This is the core Asphalt library. If you’re looking for documentation for some specific component project, you will the appropriate link from the project’s Github page.
If you’re a new user, it’s a good idea to start from the tutorials. Pick a tutorial that suits your current level of knowledge.
Table of contents¶
Tutorials¶
The following tutorials will help you get acquainted with Asphalt application development. It is expected that the reader have at least basic understanding of the Python language.
Code for all tutorials can be found in the examples
directory in the source distribution or in
the
Github repository.
Tutorial 1: Getting your feet wet – a simple echo server and client¶
This tutorial will get you started with Asphalt development from the ground up.
You will be learn how to build a simple network server that echoes back messages sent to it, along
with a matching client application. It will however not yet touch more advanced concepts like
using the asphalt
command to run an application with a configuration file.
Prerequisites¶
Asphalt requires Python 3.5.0 or later. You will also need to have the venv
module installed
for your Python version of choice. It should come with most Python installations, but if it does
not, you can usually install it with your operating system’s package manager (python3-venv
is a
good guess).
Setting up the virtual environment¶
Now that you have your base tools installed, it’s time to create a virtual environment (referred
to as simply virtualenv
later). Installing Python libraries in a virtual environment isolates
them from other projects, which may require different versions of the same libraries.
Now, create a project directory and a virtualenv:
mkdir tutorial1
cd tutorial1
python3.5 -m venv tutorialenv
source tutorialenv/bin/activate
On Windows, the last line should be:
tutorialenv\Scripts\activate
The last command activates the virtualenv, meaning the shell will first look for commands in
its bin
directory (Scripts
on Windows) before searching elsewhere. Also, Python will
now only import third party libraries from the virtualenv and not anywhere else. To exit the
virtualenv, you can use the deactivate
command (but don’t do that now!).
You can now proceed with installing Asphalt itself:
pip install asphalt
Creating the project structure¶
Every project should have a top level package, so create one now:
mkdir echo
touch echo/__init__.py
On Windows, the last line should be:
copy NUL echo\__init__.py
Creating the first component¶
Now, let’s write some code! Create a file named server.py
in the echo
package directory:
from asphalt.core import Component, run_application
class ServerComponent(Component):
async def start(self, ctx):
print('Hello, world!')
if __name__ == '__main__':
component = ServerComponent()
run_application(component)
The ServerComponent
class is the root component (and in this case, the only component) of
this application. Its start()
method is called by run_application
when it has
set up the event loop. Finally, the if __name__ == '__main__':
block is not strictly necessary
but is good, common practice that prevents run_application()
from being called again if this
module is ever imported from another module.
You can now try running the above application. With the project directory (tutorial
) as your
current directory, do:
python -m echo.server
This should print “Hello, world!” on the console. The event loop continues to run until you press Ctrl+C (Ctrl+Break on Windows).
Making the server listen for connections¶
The next step is to make the server actually accept incoming connections.
For this purpose, the asyncio.start_server()
function is a logical choice:
from asyncio import start_server
from asphalt.core import Component, run_application
async def client_connected(reader, writer):
message = await reader.readline()
writer.write(message)
writer.close()
print('Message from client:', message.decode().rstrip())
class ServerComponent(Component):
async def start(self, ctx):
await start_server(client_connected, 'localhost', 64100)
if __name__ == '__main__':
component = ServerComponent()
run_application(component)
Here, asyncio.start_server()
is used to listen to incoming TCP connections on the
localhost
interface on port 64100. The port number is totally arbitrary and can be changed to
any other legal value you want to use.
Whenever a new connection is established, the event loop launches client_connected()
as a new
Task
. Tasks work much like green threads in that they’re adjourned when
waiting for something to happen and then resumed when the result is available. The main difference
is that a coroutine running in a task needs to use the await
statement (or async for
or
async with
) to yield control back to the event loop. In client_connected()
, the await
on the first line will cause the task to be adjourned until a line of text has been read from the
network socket.
The client_connected()
function receives two arguments: a StreamReader
and
a StreamWriter
. In the callback we read a line from the client, write it back to
the client and then close the connection. To get at least some output from the application, the
function was made to print the received message on the console (decoding it from bytes
to
str
and stripping the trailing newline character first). In production applications, you will
want to use the logging
module for this instead.
If you have the netcat
utility or similar, you can already test the server like this:
echo Hello | nc localhost 64100
This command, if available, should print “Hello” on the console, as echoed by the server.
Creating the client¶
No server is very useful without a client to access it, so we’ll need to add a client module in this project. And to make things a bit more interesting, we’ll make the client accept a message to be sent as a command line argument.
Create the file client.py
file in the echo
package directory as follows:
import sys
from asyncio import open_connection
from asphalt.core import CLIApplicationComponent, run_application
class ClientComponent(CLIApplicationComponent):
def __init__(self, message: str):
super().__init__()
self.message = message
async def run(self, ctx):
reader, writer = await open_connection('localhost', 64100)
writer.write(self.message.encode() + b'\n')
response = await reader.readline()
writer.close()
print('Server responded:', response.decode().rstrip())
if __name__ == '__main__':
component = ClientComponent(sys.argv[1])
run_application(component)
You may have noticed that ClientComponent
inherits from
CLIApplicationComponent
instead of
Component
and that instead of overriding the
start()
method,
run()
is overridden instead.
This is standard practice for Asphalt applications that just do one specific thing and then exit.
The script instantiates ClientComponent
using the first command line argument as the
message
argument to the component’s constructor. Doing this instead of directly accessing
sys.argv
from the run()
method makes this component easier to test and allows you to
specify the message in a configuration file (covered in the next tutorial).
When the client component runs, it grabs the message to be sent from the list of command line
arguments (sys.argv
), converts it from a unicode string to a bytestring and adds a newline
character (so the server can use readline()
). Then, it connects to localhost
on port 64100
and sends the bytestring to the other end. Next, it reads a response line from the server, closes
the connection and prints the (decoded) response. When the run()
method returns, the
application exits.
To send the “Hello” message to the server, run this in the project directory:
python -m echo.client Hello
Conclusion¶
This covers the basics of setting up a minimal Asphalt application. You’ve now learned to:
- Create a virtual environment to isolate your application’s dependencies from other applications
- Create a package structure for your application
- Start your application using
run_application()
- Use asyncio streams to create a basic client-server protocol
This tutorial only scratches the surface of what’s possible with Asphalt, however. The second tutorial will build on the knowledge you gained here and teach you how to work with components, resources and configuration files to build more useful applications.
Tutorial 2: Something a little more practical – a web page change detector¶
Now that you’ve gone through the basics of creating an Asphalt application, it’s time to expand your horizons a little. In this tutorial you will learn to use a container component to create a multi-component application and how to set up a configuration file for that.
The application you will build this time will periodically load a web page and see if it has changed since the last check. When changes are detected, it will then present the user with the computed differences between the old and the new versions.
Setting up the project structure¶
As in the previous tutorial, you will need a project directory and a virtual environment. Create a
directory named tutorial2
and make a new virtual environment inside it. Then activate it and
use pip
to install the asphalt-mailer
and aiohttp
libraries:
pip install asphalt-mailer aiohttp
This will also pull in the core Asphalt library as a dependency.
Next, create a package directory named webnotifier
and a module named app
(app.py
).
The code in the following sections should be put in the app
module (unless explicitly stated
otherwise).
Detecting changes in a web page¶
The first task is to set up a loop that periodically retrieves the web page. For that, you can adapt code from the aiohttp HTTP client tutorial:
import asyncio
import logging
import aiohttp
from asphalt.core import CLIApplicationComponent, run_application
logger = logging.getLogger(__name__)
class ApplicationComponent(CLIApplicationComponent):
async def run(self, ctx):
with aiohttp.ClientSession() as session:
while True:
async with session.get('http://imgur.com') as resp:
await resp.text()
await asyncio.sleep(10)
if __name__ == '__main__':
run_application(ApplicationComponent(), logging=logging.DEBUG)
Great, so now the code fetches the contents of http://imgur.com
at 10 second intervals.
But this isn’t very useful yet – you need something that compares the old and new versions of the
contents somehow. Furthermore, constantly loading the contents of a page exerts unnecessary strain
on the hosting provider. We want our application to be as polite and efficient as reasonably
possible.
To that end, you can use the if-modified-since
header in the request. If the requests after the
initial one specify the last modified date value in the request headers, the remote server will
respond with a 304 Not Modified
if the contents have not changed since that moment.
So, modify the code as follows:
class ApplicationComponent(CLIApplicationComponent):
async def start(self, ctx):
last_modified = None
with aiohttp.ClientSession() as session:
while True:
headers = {'if-modified-since': last_modified} if last_modified else {}
async with session.get('http://imgur.com', headers=headers) as resp:
logger.debug('Response status: %d', resp.status)
if resp.status == 200:
last_modified = resp.headers['date']
await resp.text()
logger.info('Contents changed')
await asyncio.sleep(10)
The code here stores the date
header from the first response and uses it in the
if-modified-since
header of the next request. A 200
response indicates that the web page
has changed so the last modified date is updated and the contents are retrieved from the response.
Some logging calls were also sprinkled in the code to give you an idea of what’s happening.
Computing the changes between old and new versions¶
Now you have code that actually detects when the page has been modified between the requests.
But it doesn’t yet show what in its contents has changed. The next step will then be to use the
standard library difflib
module to calculate the difference between the contents and send it
to the logger:
from difflib import unified_diff
class ApplicationComponent(CLIApplicationComponent):
async def start(self, ctx):
with aiohttp.ClientSession() as session:
last_modified, old_lines = None, None
while True:
logger.debug('Fetching webpage')
headers = {'if-modified-since': last_modified} if last_modified else {}
async with session.get('http://imgur.com', headers=headers) as resp:
logger.debug('Response status: %d', resp.status)
if resp.status == 200:
last_modified = resp.headers['date']
new_lines = (await resp.text()).split('\n')
if old_lines is not None and old_lines != new_lines:
difference = '\n'.join(unified_diff(old_lines, new_lines))
logger.info('Contents changed:\n%s', difference)
old_lines = new_lines
await asyncio.sleep(10)
This modified code now stores the old and new contents in different variables to enable them to be
compared. The .split('\n')
is needed because unified_diff()
requires the input
to be iterables of strings. Likewise, the '\n'.join(...)
is necessary because the output is
also an iterable of strings.
Sending changes via email¶
While an application that logs the changes on the console could be useful on its own, it’d be much
better if it actually notified the user by means of some communication medium, wouldn’t it?
For this specific purpose you need the asphalt-mailer
library you installed in the beginning.
The next modification will send the HTML formatted differences to you by email.
But, you only have a single component in your app now. To use asphalt-mailer
, you will need to
add its component to your application somehow. Enter
ContainerComponent
. With that, you can create a hierarchy of
components where the mailer
component is a child component of your own container component.
And to make the the results look nicer in an email message, you can switch to using
difflib.HtmlDiff
to produce the delta output:
from difflib import HtmlDiff
class ApplicationComponent(CLIApplicationComponent):
async def start(self, ctx):
self.add_component(
'mailer', backend='smtp', host='your.smtp.server.here',
message_defaults={'sender': 'your@email.here', 'to': 'your@email.here'})
await super().start(ctx)
async def run(self, ctx):
with aiohttp.ClientSession() as session:
last_modified, old_lines = None, None
diff = HtmlDiff()
while True:
logger.debug('Fetching webpage')
headers = {'if-modified-since': last_modified} if last_modified else {}
async with session.get('http://imgur.com', headers=headers) as resp:
logger.debug('Response status: %d', resp.status)
if resp.status == 200:
last_modified = resp.headers['date']
new_lines = (await resp.text()).split('\n')
if old_lines is not None and old_lines != new_lines:
difference = diff.make_file(old_lines, new_lines, context=True)
logger.info('Sent notification email')
old_lines = new_lines
await asyncio.sleep(10)
You’ll need to replace the host
, sender
and to
arguments for the mailer component and
possibly add the ssl
, username
and password
arguments if your SMTP server requires
authentication.
With these changes, you’ll get a new HTML formatted email each time the code detects changes in the target web page.
Separating the change detection logic¶
While the application now works as intended, you’re left with two small problems. First off, the target URL and checking frequency are hard coded. That is, they can only be changed by modifying the program code. It is not reasonable to expect non-technical users to modify the code when they want to simply change the target website or the frequency of checks. Second, the change detection logic is hardwired to the notification code. A well designed application should maintain proper separation of concerns. One way to do this is to separate the change detection logic to its own class.
Create a new module named detector
in the webnotifier
package. Then, add the change event
class to it:
import asyncio
import logging
import aiohttp
from async_generator import yield_
from asphalt.core import Component, Event, Signal, context_finisher
logger = logging.getLogger(__name__)
class WebPageChangeEvent(Event):
def __init__(self, source, topic, old_lines, new_lines):
super().__init__(source, topic)
self.old_lines = old_lines
self.new_lines = new_lines
This class defines the type of event that the notifier will emit when the target web page changes. The old and new content are stored in the event instance to allow the event listener to generate the output any way it wants.
Next, add another class in the same module that will do the HTTP requests and change detection:
class Detector:
changed = Signal(WebPageChangeEvent)
def __init__(self, url, delay):
self.url = url
self.delay = delay
async def run(self):
with aiohttp.ClientSession() as session:
last_modified, old_lines = None, None
while True:
logger.debug('Fetching contents of %s', self.url)
headers = {'if-modified-since': last_modified} if last_modified else {}
async with session.get(self.url, headers=headers) as resp:
logger.debug('Response status: %d', resp.status)
if resp.status == 200:
last_modified = resp.headers['date']
new_lines = (await resp.text()).split('\n')
if old_lines is not None and old_lines != new_lines:
self.changed.dispatch(old_lines, new_lines)
old_lines = new_lines
await asyncio.sleep(self.delay)
The constructor arguments allow you to freely specify the parameters for the detection process.
The class includes a signal named change
that uses the previously created
WebPageChangeEvent
class. The code dispatches such an event when a change in the target web
page is detected.
Finally, add the component class which will allow you to integrate this functionality into any Asphalt application:
class ChangeDetectorComponent(Component):
def __init__(self, url, delay=10):
self.url = url
self.delay = delay
@context_finisher
async def start(self, ctx):
detector = Detector(self.url, self.delay)
ctx.publish_resource(detector, context_attr='detector')
task = asyncio.get_event_loop().create_task(detector.run())
logging.info('Started web page change detector for url "%s" with a delay of %d seconds',
self.url, self.delay)
# Can be replaced with plain "yield" on Python 3.6+
await yield_()
# This part is run when the context is finished
task.cancel()
logging.info('Shut down web page change detector')
The component’s start()
method starts the detector’s run()
method as a new task, publishes
the detector object as resource and installs an event listener that will shut down the detector
when the context finishes.
Now that you’ve moved the change detection code to its own module, ApplicationComponent
will
become somewhat lighter:
class ApplicationComponent(CLIApplicationComponent):
async def start(self, ctx):
self.add_component('detector', ChangeDetectorComponent, url='http://imgur.com')
self.add_component(
'mailer', backend='smtp', host='your.smtp.server.here',
message_defaults={'sender': 'your@email.here', 'to': 'your@email.here'})
await super().start(ctx)
async def run(self, ctx):
diff = HtmlDiff()
async for event in ctx.detector.changed.stream_events():
difference = diff.make_file(event.old_lines, event.new_lines, context=True)
await ctx.mailer.create_and_deliver(
subject='Change detected in %s' % event.source.url, html_body=difference)
logger.info('Sent notification email')
The main application component will now use the detector resource published by
ChangeDetectorComponent
. It adds one event listener which reacts to change events by creating
an HTML formatted difference and sending it to the default recipient.
Once the start()
method here has run to completion, the event loop finally has a chance to run
the task created for Detector.run()
. This will allow the detector to do its work and dispatch
those changed
events that the page_changed()
listener callback expects.
Setting up the configuration file¶
Now that your application code is in good shape, you will need to give the user an easy way to configure it. This is where YAML configuration files come in handy. They’re clearly structured and are far less intimidating to end users than program code. And you can also have more than one of them, in case you want to run the program with a different configuration.
In your project directory (tutorial2
), create a file named config.yaml
with the following
contents:
---
component:
type: webnotifier.app:ApplicationComponent
components:
detector:
url: http://imgur.com/
delay: 15
mailer:
host: your.smtp.server.here
message_defaults:
sender: your@email.here
to: your@email.here
logging:
version: 1
disable_existing_loggers: false
formatters:
default:
format: '[%(asctime)s %(levelname)s] %(message)s'
handlers:
console:
class: logging.StreamHandler
formatter: default
loggers:
root:
handlers: [console]
level: INFO
webnotifier:
handlers: [console]
level: DEBUG
propagate: false
The component
section defines parameters for the root component. Aside from the special
type
key which tells the runner where to find the component class, all the keys in this section
are passed to the constructor of ApplicationComponent
as keyword arguments. Keys under
components
will match the alias of each child component, which is given as the first argument
to add_component()
. Any component parameters given
here can now be removed from the add_component()
call in ApplicationComponent
‘s code.
The logging configuration here sets up two loggers, one for webnotifier
and its descendants
and another (root
) as a catch-all for everything else. It specifies one handler that just
writes all log entries to the standard output. To learn more about what you can do with the logging
configuration, consult the Configuration dictionary schema section in the standard library
documentation.
You can now run your app with the asphalt run
command, provided that the project directory is
on Python’s search path. When your application is properly packaged and installed in
site-packages
, this won’t be a problem. But for the purposes of this tutorial, you can
temporarily add it to the search path by setting the PYTHONPATH
environment variable:
PYTHONPATH=. asphalt run config.yaml
On Windows:
set PYTHONPATH=%CD%
asphalt run config.yaml
Note
The if __name__ == '__main__':
block is no longer needed since asphalt run
is now used
as the entry point for the application.
Conclusion¶
You now know how to take advantage of Asphalt’s component system to add structure to your application. You’ve learned how to build reusable components and how to make the components work together through the use of resources. Last, but not least, you’ve learned to set up a YAML configuration file for your application and to set up a fine grained logging configuration in it.
You now possess enough knowledge to leverage Asphalt to create practical applications. You are now encouraged to find out what Asphalt component projects exist to aid your application development. Happy coding ☺
User guide¶
This is the reference documentation. If you’re looking to learn Asphalt from scratch, you should take a look at the Tutorials first.
Application architecture¶
Asphalt applications are built by assembling a hierarchy of components. Each component typically
provides some specific functionality for the application, like a network server or client, a
database connection or a myriad of other things. A component’s lifecycle is usually very short:
it’s instantiated and its start()
method is run and the
component is then discarded. A common exception to this are command line tools, where the root
component’s start()
call typically lasts for the entire run time of the tool.
Components work together through a shared Context
. Every application
has at least a top level context which is passed to the root component’s
start()
method. A context is essentially a container for
resources and a namespace for arbitrary data attributes. Resources can be objects of any type
like data or services.
Contexts can have subcontexts. How and if subcontexts are used depends on the components using them. For example, a component serving network requests may want to create a subcontext for each request it handles to store request specific information and other state. While the subcontext will have its own independent state, it also has full access the resources of its parent context.
An Asphalt application is normally started by calling run_application()
with the root component as the argument. This function takes care of logging and and starting the
root component in the event loop. The application will then run until Ctrl+C is pressed, the
process is terminated from outside or the application code stops the event loop.
The runner is further extended by the asphalt
command line tool which reads the application
configuration from a YAML formatted configuration file, instantiates the root component and calls
run_application()
. The settings from the configuration file are merged
with hard coded defaults so the config file only needs to override settings where necessary.
Components¶
Components are the basic building blocks of an Asphalt application. They have a narrowly defined set of responsibilities:
- Take in configuration through the constructor
- Validate the configuration
- Publish resources (in
start()
) - Close/shut down/cleanup resources when the context is finished (by adding a callback on the
finished
signal of the context, or by usingcontext_finisher()
)
In the start()
method, the component receives a
Context
as its only argument. The component can use the context to
publish resources for other components and the application business logic to use. It can also
request resources provided by other components to provide some complex service that builds on those
resources.
The start()
method of a component is only called once,
during application startup. When all components have been started, they are disposed of.
If any of the components raises an exception, the application startup process fails and the context
is finished.
In order to speed up the startup process and to prevent any deadlocks, components should try to publish any resources as soon as possible before requesting any. If two or more components end up waiting on each others’ resources, the application will fail to start due to timeout errors. Also, if a component needs to perform lengthy operations like connection validation on network clients, it should publish all its resources first to avoid said timeouts.
Hint
It is a good idea to use type hints with typeguard checks
(assert check_argument_types()
) in the component’s __init__
method to ensure that the
received configuration values are of the expected type, but this is of course not required.
Container components¶
A container component is component that can contain other Asphalt components. The root component of virtually any nontrivial Asphalt application is a container component. Container components can of course contain other container components and so on.
When the container component starts its child components, each
start()
call is launched in its own task. Therefore all the
child components start concurrently and cannot rely on the start order. This is by design.
The only way components should be relying on each other is by the publishing and requesting of
resources in their shared context.
Context hierarchies¶
As mentioned previously, every application has at least one context. Component code and application business logic can create new contexts at any time, and a new context can be linked to a parent context to take advantage of its resources. Such subcontexts have access to all the resources of the parent context, but parent contexts cannot access resources from their subcontexts. Sometimes it may also be beneficial to create completely isolated contexts to ensure consistent behavior when some reusable code is plugged in an application.
A common use case for creating subcontexts is when a network server handles an incoming request.
Such servers typically want to create a separate subcontext for each request, usually using
specialized subclass of Context
.
Resources¶
The resource system in Asphalt exists for two principal reasons:
- To avoid having to duplicate configuration
- To enable sharing of pooled resources, like database connection pools
Here are a few examples of services that will likely benefit from resource sharing:
- Database connections
- Remote service handles
- Serializers
- Template renderers
- SSL contexts
When you publish a resource, you should make sure that the resource is discoverable using any
abstract interface or base class that it implements. This is so that consumers of the service don’t
have to care if you switch the implementation of another. For example, consider a mailer service,
provided by asphalt-mailer. The library has an abstract base class for all mailers,
asphalt.mailer.api.Mailer
. To facilitate this loose coupling of services, it publishes all
mailers as Mailers.
Lazy resources¶
Resources can also be published lazily. That means they’re created on demand, that is, either when their context attribute is accessed or when the resource is being requested for the first time. Unlike with normal resources, the resource values are not inherited by subcontexts, but every time the resource is requested in a new context, a new value is created specifically for that context.
There are at least a couple plausible reasons for publishing resources this way:
- The resource needs access to the resources or data specific to the local context (example: template renderers)
- The life cycle of the resource needs to be tied to the life cycle of the context (example: database transactions)
Lazy resources are published using publish_lazy_resource()
.
Instead of passing a static value to it, you give it a callable that takes the local context
object (whatever that may be) as the argument and returns the created resource object. The creator
callable will only be called at most once per context.
The creator callable can be a coroutine function or return an awaitable, in which case the
coroutine or other awaitable is resolved before returning the resource object to the caller. This
approach has the unfortunate limitation that the awaitable cannot be automatically resolved on
attribute access so something like await ctx.resourcename
is required when such resources are
accessed through their context attributes.
Working with coroutines and threads¶
Asphalt was designed as a network oriented framework capable of high concurrency. This means that
it can efficiently work with hundreds or even thousands of connections at once. This is achieved by
utilizing co-operative multitasking, using an event loop provided by the asyncio
module.
The event loop can only work on one task at a time, so whenever the currently running task needs to
wait for something to happen, it will need to explicitly yield control back to the event loop
(using await
and similar statements) to let the event loop run other tasks while this task
waits for the result. Once the result is available, the event loop will resume the task.
There is another concurrency mechanism called threads. Threads are an implementation of preemptive multitasking, which means that the CPU may run your program at more than one location at once and your code will not have to worry about yielding control to another task. There are some big downsides to using threads, however. First off, threaded code is much more prone to race conditions and programs often need to use locks to share state in a predictable manner. Second, threads don’t scale. When you have more threads than CPU cores, the cores need to do context switching, that is, juggle between the threads. With a large number of threads, the overhead from context switching becomes very significant up to the point where the system stops responding altogether.
While Asphalt was designed to avoid the use of threads, they are sometimes necessary. Most third party libraries at the moment don’t support the asynchronous concurrency model, and as such, they sometimes need to be used with threads in order to avoid blocking the event loop. Also, file operations cannot, at this time, be executed asynchronously and need to be wrapped in threads. Finally, your application might just need to do some CPU heavy processing that would otherwise block the event loop for long periods of time.
To help with this, the asyncio_extras library was created as a byproduct of Asphalt. It provides several conveniences you can use to easily use threads when the need arises.
Examples¶
Consider a coroutine function that reads the contents of a certain file and then sends them over a network connection. While you might get away with reading the file in the event loop thread, consider what happens if the disk has to spin up from idle state or the file is located on a slow (or temporarily inaccessible) network drive. The whole event loop will then be blocked for who knows how long.
The easiest way is probably to use open_async()
:
from asyncio_extras import open_async
async def read_and_send_file(connection):
async with open_async('file.txt', 'rb') as f:
contents = await f.read()
await connection.send(contents)
The following snippet achieves the same goal:
from asyncio_extras import threadpool
async def read_and_send_file(connection):
async with threadpool():
with open('file.txt', 'rb') as f:
contents = f.read()
await connection.send(contents)
As does the next one:
from asyncio_extras import call_in_executor
async def read_and_send_file(connection):
f = await call_in_executor(open, 'file.txt', 'rb')
with f:
contents = await call_in_executor(f.read)
await connection.send(contents)
Alternatively, you can run the whole function in the thread pool. You will need to make it a regular function instead of a coroutine function and you must explicitly pass in the event loop object:
from asyncio_extras import threadpool, call_async
@threadpool
def read_and_send_file(connection, loop):
with open('file.txt', 'rb') as f:
contents = f.read()
call_async(loop, connection.send, contents)
Using alternate thread pools¶
In more advanced applications, you may find it useful to set up specialized thread pools for certain tasks in order to avoid the default thread pool from being overburdened:
from concurrent.futures import ThreadPoolExecutor
from asyncio_extras import threadpool
file_ops = ThreadPoolExecutor(5) # max 5 threads for file operations
async def read_and_send_file(connection):
async with threadpool(file_ops):
with open('file.txt', 'rb') as f:
contents = f.read()
await connection.send(contents)
All the thread related utilities in asyncio_extras have a way to specify the executor to use. Refer to its documentation for the specifics.
Events¶
Events are a handy way to make your code react to changes in another part of the application.
To dispatch and listen to events, you first need to have one or more
Signal
instances as attributes of some class. Each signal needs to be
associated with some Event
class. Then, when you dispatch a new event
by calling dispatch()
, a new instance of this event class will be
constructed and passed to all listener callbacks.
To listen to events dispatched from a signal, you need to have a function or any other callable
that accepts a single positional argument. You then pass this callable to
connect()
. That’s it!
To disconnect the callback, simply call disconnect()
with whatever
you passed to connect()
as argument.
Here’s how it works:
from asphalt.core import Event, Signal
class CustomEvent(Event):
def __init__(source, topic, extra_argument):
super().__init__(source, topic)
self.extra_argument = extra_argument
class MyEventSource:
somesignal = Signal(Event)
customsignal = Signal(CustomEvent)
def plain_listener(event):
print('received event: %s' % event)
async def coro_listener(event):
print('coroutine listeners are fine too: %s' % event)
async def some_handler():
source = MyEventSource()
source.somesignal.connect(plain_listener)
source.customsignal.connect(coro_listener)
# Dispatches an Event instance
source.somesignal.dispatch()
# Dispatches a CustomEvent instance (the extra argument is passed to its constructor)
source.customsignal.dispatch('extra argument here')
Exception handling¶
By default, all exceptions raised by listener callbacks are just sent to the logger
(asphalt.core.event
). If the dispatcher needs to know about any exceptions raised by listeners,
it can call dispatch()
with return_future=True
. This will
cause a Future
to be returned and, when awaited, will raise a
EventDispatchError
if any listener raised an exception. This exception
will contain every exception that was raised, along with the information regarding which callback
raised which exception.
Waiting for a single event¶
To wait for the next event dispatched from a given signal, you can use the
wait_event()
method:
async def print_next_event(source):
event = await source.somesignal.wait_event()
print(event)
You can even wait for the next event dispatched from any of several signals using the
wait_event()
function:
from asphalt.core import wait_event
async def print_next_event(source1, source2, source3):
event = await wait_event(source1.some_signal, source2.another_signal, source3.some_signal)
print(event)
Receiving events iteratively¶
With stream_events()
, you can even asynchronously iterate over
events dispatched from a signal:
async def listen_to_events(source):
async for event in source.somesignal.stream_events():
print(event)
Using stream_events()
, you can stream events from multiple signals:
from asphalt.core import stream_events
async def listen_to_events(source1, source2, source3):
async for event in stream_events(source1.some_signal, source2.another_signal,
source3.some_signal):
print(event)
Testing Asphalt components¶
Testing Asphalt components and component hierarchies is a relatively simple procedure:
- Create an instance of your
Component
- Create a
Context
instance - Run the component’s
start()
method with the context as the argument - Run the tests
- Dispatch the
finished
event on the context to release any resources
With Asphalt projects, it is recommended to use the py.test testing framework because it is already being used with Asphalt core and it provides easy testing of asynchronous code (via the pytest-asyncio plugin).
Example¶
Let’s build a test suite for the Echo Tutorial.
The client and server components could be tested separately, but to make things easier, we’ll test them against each other.
Create a tests
directory at the root of the project directory and create a module named
test_client_server
there (the test_
prefix is important):
import asyncio
import pytest
from asphalt.core import Context
from echo.client import ClientComponent
from echo.server import ServerComponent
@pytest.fixture
def event_loop():
# Required on pytest-asyncio v0.4.0 and newer since the event_loop fixture provided by the
# plugin no longer sets the global event loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
yield loop
loop.close()
@pytest.fixture
def context(event_loop):
ctx = Context()
yield ctx
event_loop.run_until_complete(ctx.finished.dispatch(None, return_future=True))
@pytest.fixture
def server_component(event_loop, context):
component = ServerComponent()
event_loop.run_until_complete(component.start(context))
def test_client(event_loop, server_component, context, capsys):
client = ClientComponent('Hello!')
event_loop.run_until_complete(client.start(context))
exc = pytest.raises(SystemExit, event_loop.run_forever)
assert exc.value.code == 0
# Grab the captured output of sys.stdout and sys.stderr from the capsys fixture
out, err = capsys.readouterr()
assert out == 'Message from client: Hello!\nServer responded: Hello!\n'
The test module above contains one test function (test_client
) and three fixtures:
event_loop
: provides an asyncio event loop and closes it after the testcontext
provides the root context and runs finish callbacks after the testserver_component
: creates and starts the server component
The client component is not provided as a fixture because, as always with
CLIApplicationComponent
, starting it would run the logic we want
to test, so we defer that to the actual test code.
In the test function (test_client
), the client component is instantiated and started. Since the
component’s start()
function only kicks off the task that runs the client’s business logic (the
run()
method), we have to wait until the task is complete by running the event loop (using
run_forever()
) until run()
finishes and its callback code
attempts to terminate the application. For that purpose, we catch the resulting SystemExit
exception and verify that the application indeed completed successfully, as indicated by the return
code of 0.
Finally, we check that the server and the client printed the messages they were supposed to.
When the server receives a line from the client, it prints a message to standard output using
print()
. Likewise, when the client gets a response from the server, it too prints out its
own message. By using pytest’s built-in capsys
fixture, we can capture the output and verify it
against the expected lines.
To run the test suite, make sure you’re in the project directory and then do:
pytest tests
For more elaborate examples, please see the test suites of various Asphalt subprojects.
Configuration and deployment¶
As your application grows more complex, you may find that you need to have different settings for your development environment and your production environment. You may even have multiple deployments that all need their own custom configuration.
For this purpose, Asphalt provides a command line interface that will read a YAML formatted configuration file and run the application it describes.
Running the Asphalt launcher¶
Running the launcher is very straightfoward:
asphalt run yourconfig.yaml [your-overrides.yml...]
Or alternatively:
python -m asphalt run yourconfig.yaml [your-overrides.yml...]
What this will do is:
- read all the given configuration files, starting from
yourconfig.yaml
- merge the configuration files’ contents into a single configuration dictionary using
merge_config()
- call
run_application()
using the configuration dictionary as keyword - arguments
- call
Writing a configuration file¶
A production-ready configuration file should contain at least the following options:
component
: a dictionary containing the class name and keyword arguments for its constructorlogging
: a dictionary to be passed tologging.config.dictConfig()
Suppose you had the following component class as your root component:
class MyRootComponent(ContainerComponent):
def __init__(self, components, data_directory: str):
super().__init__(components)
self.data_directory = data_directory
async def start(ctx):
self.add_component('mailer', backend='smtp')
self.add_component('sqlalchemy')
await super().start(ctx)
You could then write a configuration file like this:
---
component:
type: myproject:MyRootComponent
data_directory: /some/file/somewhere
components:
mailer:
host: smtp.mycompany.com
ssl: true
sqlalchemy:
url: postgresql:///mydatabase
max_threads: 20
logging:
version: 1
disable_existing_loggers: false
handlers:
console:
class: logging.StreamHandler
formatter: generic
formatters:
generic:
format: "%(asctime)s:%(levelname)s:%(name)s:%(message)s"
root:
handlers: [console]
level: INFO
In the above configuration you have three top level configuration keys: component
,
max_threads
and logging
, all of which are directly passed to
run_application()
as keyword arguments.
The component
section defines the type of the root component using the specially processed
type
option. You can either specify a setuptools entry point name (from the
asphalt.components
namespace) or a text reference like module:class
(see
resolve_reference()
for details). The rest of the keys in this section are
passed directly to the constructor of the MyRootComponent
class.
The components
section within component
is processed in a similar fashion.
Each subsection here is a component type alias and its keys and values are the constructor
arguments to the relevant component class. The per-component configuration values are merged with
those provided in the start()
method of MyRootComponent
. See the next section for a more
elaborate explanation.
With max_threads: 20
, the maximum number of threads in the event loop’s default thread pool
executor is set to 20.
The logging
configuration tree here sets up a root logger that prints all log entries of at
least INFO
level to the console. You may want to set up more granular logging in your own
configuration file. See the
Python standard library documentation for details.
Configuration overlays¶
Component configuration can be specified on several levels:
- Hard-coded arguments to
add_component()
- First configuration file argument to
asphalt run
- Second configuration file argument to
asphalt run
- ...
Any options you specify on each level override or augment any options given on previous levels. To minimize the effort required to build a working configuration file for your application, it is suggested that you pass as many of the options directly in the component initialization code and leave only deployment specific options like API keys, access credentials and such to the configuration file.
With the configuration presented in the earlier paragraphs, the mailer
component’s constructor
gets passed three keyword arguments:
backend='smtp'
host='smtp.mycompany.com'
ssl=True
The first one is provided in the root component code while the other two options come from the YAML
file. You could also override the mailer backend in the configuration file if you wanted. The same
effect can be achieved programmatically by supplying the override configuration to the container
component via its components
constructor argument. This is very useful when writing tests
against your application. For example, you might want to use the mock
mailer in your test suite
configuration to test that the application correctly sends out emails (and to prevent them from
actually being sent to recipients!).
There is another neat trick that lets you easily modify a specific key in the configuration. By using dotted notation in a configuration key, you can target a specific key arbitrarily deep in the configuration structure. For example, to override the logging level for the root logger in the configuration above, you could use an override configuration such as:
---
logging.root.level: DEBUG
The keys don’t need to be on the top level either, so the following has the same effect:
---
logging:
root.level: DEBUG
Performance tuning¶
Asphalt’s core code and many third part components employ a number of potentially expensive validation steps in its code. The performance hit of these checks is not a concern in development and testing, but in a production environment you will probably want to maximize the performance.
To do this, you will want to disable Python’s debugging mode by either setting the environment
variable PYTHONOPTIMIZE
to 1
or (if applicable) running Python with the -O
switch.
This has the effect of completely eliminating all assert
statements and blocks starting with
if __debug__:
from the compiled bytecode.
When you want maximum performance, you’ll also want to use the fastest available event loop
implementation. This can be done by specifying the event_loop_policy
option in the
configuration file or by using the -l
or --loop
switch. The core library has built-in
support for the uvloop event loop implementation, which should provide a nice performance boost
over the standard library implementation.
Version history¶
This library adheres to Semantic Versioning.
2.1.1 (2017-02-01)
- Fixed memory leak which prevented objects containing Signals from being garbage collected
- Log a message on startup that indicates whether optimizations (
-O
orPYTHONOPTIMIZE
) are enabled
2.1.0 (2016-09-26)
- Added the possibility to specify more than one configuration file on the command line
- Added the possibility to use the command line interface via
python -m asphalt ...
- Added the
CLIApplicationComponent
class to facilitate the creation of Asphalt based command line tools - Root component construction is now done after installing any alternate event loop policy provider
- Switched YAML library from PyYAML to ruamel.yaml
- Fixed a corner case where in
wait_event()
the future’s result would be set twice, causing an exception in the listener - Fixed coroutine-based lazy resource returning a CoroWrapper instead of a Future when asyncio’s debug mode has been enabled
- Fixed a bug where a lazy resource would not be created separately for a context if a parent context contained an instance of the same resource
2.0.0 (2016-05-09)
- BACKWARD INCOMPATIBLE Dropped Python 3.4 support in order to make the code fully rely on the
new
async
/await
,async for
andasync with
language additions - BACKWARD INCOMPATIBLE De-emphasized the ability to implicitly run code in worker threads.
As such, Asphalt components are no longer required to transparently work outside of the event
loop thread. Instead, use
asyncio_extras.threads.call_async()
to call asynchronous code from worker threads if absolutely necessary. As a direct consequence of this policy shift, theasphalt.core.concurrency
module was dropped in favor of theasyncio_extras
library. - BACKWARD INCOMPATIBLE The event system was completely rewritten:
- instead of inheriting from
EventSource
, event source classes now simply assignSignal
instances to attributes and useobject.signalname.connect()
to listen to events - all event listeners are now called independently of each other and coroutine listeners are run concurrently
- added the ability to stream events
- added the ability to wait for a single event to be dispatched
- instead of inheriting from
- BACKWARD INCOMPATIBLE Removed the
asphalt.command
module from the public API - BACKWARD INCOMPATIBLE Removed the
asphalt quickstart
command - BACKWARD INCOMPATIBLE Removed the
asphalt.core.connectors
module - BACKWARD INCOMPATIBLE Removed the
optional
argument ofContext.request_resource()
- BACKWARD INCOMPATIBLE Removed the
asphalt.core.runners
entry point namespace - BACKWARD INCOMPATIBLE
Component.start()
is now required to be a coroutine method - BACKWARD INCOMPATIBLE Removed regular context manager support from the
Context
class (asynchronous context manager support still remains) - BACKWARD INCOMPATIBLE The
Context.publish_resource()
,Context.publish_lazy_resource()
andContext.remove_resource()
methods are no longer coroutine methods - BACKWARD INCOMPATIBLE Restricted resource names to alphanumeric characters and underscores
- Added the possibility to specify a custom event loop policy
- Added support for uvloop
- Added support for aiogevent
- Added the ability to use coroutine functions as lazy resource creators (though that just makes
them return a
Future
instead) - Added the ability to get a list of all the resources in a Context
- Changed the
asphalt.core.util.resolve_reference()
function to return invalid reference strings as-is - Switched from argparse to click for the command line interface
- All of Asphalt core’s public API is now importable directly from
asphalt.core
1.2.0 (2016-01-02)
- Moved the
@asynchronous
and@blocking
decorators to theasphalt.core.concurrency
package along with related code (they’re still importable fromasphalt.core.util
until v2.0) - Added typeguard checks to fail early if arguments of wrong types are passed to functions
1.1.0 (2015-11-19)
- Decorated
ContainerComponent.start
with@asynchronous
so that it can be called by a blocking subclass implementation - Added the
stop_event_loop
function to enable blocking callables to shut down Asphalt’s event loop
1.0.0 (2015-10-18)
- Initial release
Acknowledgements¶
Many thanks to following people for the time spent helping with Asphalt’s development:
- Alice Bevan-McGregor (brainstorming and documentation QA)
- Guillaume “Cman” Brun (brainstorming)
- Darin Gordon (brainstorming and documentation QA)
- Antti Haapala (brainstorming)
- Olli Paloheimo (Asphalt logo design).
- Cody Scott (tutorials QA)