This Asphalt framework component provides connectivity to InfluxDB database servers.
Table of contents¶
Configuration¶
The typical InfluxDB configuration using a single database at localhost
on the default port
would look like this:
components:
influxdb:
db: mydb
The above configuration creates an asphalt.influxdb.client.InfluxDBClient
instance in the
context, available as ctx.influxdb
(resource name: default
).
If you wanted to connect to influx.example.org
on port 8886, you would do:
components:
influxdb:
base_urls: http://influx.example.org:8886
db: mydb
To connect to an InfluxEnterprise cluster, list all the nodes under base_urls
:
components:
influxdb:
base_urls:
- http://influx1.example.org:8086
- http://influx2.example.org:8086
- http://influx3.example.org:8086
db: mydb
To connect to two unrelated InfluxDB servers, you could use a configuration like:
components:
influxdb:
clients:
influx1:
base_urls: http://influx.example.org:8886
db: mydb
influx2:
context_attr: influxalter
base_urls: https://influxalter.example.org/influxdb
db: anotherdb
username: testuser
password: testpass
This configuration creates two asphalt.influxdb.client.InfluxDBClient
resources,
influx1
and influx2
(ctx.influx1
and ctx.influxalter
) respectively.
Note
See the documentation of the asphalt.influxdb.client.InfluxDBClient
class for
a comprehensive listing of all connection options.
Using the InfluxDB client¶
Getting the server version¶
You can find out which version of InfluxDB you’re running in the following manner:
async def handler(ctx):
version = await ctx.influxdb.ping()
print('Running InfluxDB version ' + version)
Writing data points¶
- Each data point being written into an InfluxDB database contains the following information:
- name of the measurement (corresponds to a table in a relational database)
- one or more tags (corresponds to indexed, non-nullable string columns in a RDBMS)
- zero or more fields (corresponds to nullable columns in a RDBMS)
- a timestamp (supplied by the server if omitted)
Data points can be written one at a time (write()
)
or many at once (write_many()
). It should be noted
that the former is merely a wrapper for the latter.
To write a single data point:
async def handler(ctx):
await ctx.influxdb.write('cpu_load_short', dict(host=server02), dict(value=0.67))
print('Running InfluxDB version ' + version)
To write multiple data points, you need use use the DataPoint
class:
async def handler(ctx):
await ctx.influxdb.write_many([
DataPoint('cpu_load_short', dict(host=server02), dict(value=0.67)),
DataPoint('cpu_load_short', dict(host=server01), dict(value=0.21))
])
The data points don’t have to be within the same measurement.
Querying the database¶
This library offers the ability to query data via both raw InfluxQL queries and a programmatic query builder. To learn how the query builder works, see the next section.
Sending raw queries is done using raw_query()
:
async def handler(ctx):
series = await ctx.influxdb.raw_query('SHOW DATABASES')
print([row[0] for row in series])
If you include more than one measurement in the FROM
clause of a SELECT
query, you will
get a list of Series
as a result:
async def handler(ctx):
cpu_load, temperature = await ctx.influxdb.raw_query('SELECT * FROM cpu_load,temperature')
print([row[0] for row in series])
It is possible to send multiple queries by delimiting them with a semicolon (;
).
If you do that, the call will return a list of results for each query, each of which may be a
Series
or a list thereof:
async def handler(ctx):
db_series, m_series = await ctx.influxdb.raw_query('SHOW DATABASES; SHOW MEASUREMENTS')
print('Databases:')
print([row[0] for row in db_series])
print('Measurements:')
print([row[0] for row in m_series])
Warning
Due to this bug, multiple queries with the same call do not currently work.
Note
If you want to send a query like SELECT ... INTO ...
, you must call
raw_query()
with http_verb='POST'
.
The proper HTTP verb should be automatically detected from the query string for all other
queries.
Using the query builder¶
The query builder allows one to dynamically build queries without having to do error prone manual string concatenation. The query builder is considered immutable, so every one of its methods returns a new builder with the modifications made to it, just like with SQLAlchemy ORM queries.
For example, to select field1
and field2
from measurements m1
and m2
:
async def handler(ctx):
query = ctx.influxdb.query(['field1', 'field2'], ['m1', 'm2'])
return await query.execute()
This will produce a query like SELECT "field1","field2" FROM "m1","m2"
.
More complex expressions can be given but field or tag names are not automatically quoted:
async def handler(ctx):
query = ctx.influxdb.query('field1 + field2', 'm1')
return await query.execute()
The query will look like SELECT field1 + field2 FROM "m1"
.
Filters can be added by using where()
with positional
and/or keyword arguments:
async def handler(ctx):
query = ctx.influxdb.query(['field1', 'field2'], 'm1').\
where('field1 > 3.5', 'field2 < 6.2', location='Helsinki')
return await query.execute()
This will produce a query like
SELECT field1,field2 FROM "m1" WHERE field1 > 3.5 AND field2 < 6.2 AND location='Helsinki'
.
To use OR
, you have to manually include it in one of the WHERE
expressions.
Further calls to .where()
will add conditions to the WHERE
clause of the query.
A call to .where()
with no arguments will clear the WHERE
clause.
Grouping by tags works largely the same way:
async def handler(ctx):
query = ctx.influxdb.query(['field1', 'SUM(field2)'], 'm1').group_by('tag1')
return await query.execute()
The SQL: SELECT field1,SUM(field2) FROM "m1" GROUP BY "tag1"
Version history¶
This library adheres to Semantic Versioning.
2.1.0 (2017-09-20)
- Exposed the
Series.values
attribute to enable faster processing of query results
2.0.1 (2017-06-04)
- Added compatibility with Asphalt 4.0
- Fixed
DeprecationWarning: ClientSession.close() is not coroutine
warnings - Added Docker configuration for easier local testing
2.0.0 (2017-04-11)
- BACKWARD INCOMPATIBLE Migrated to Asphalt 3.0
- BACKWARD INCOMPATIBLE Migrated to aiohttp 2.0
1.1.1 (2017-02-09)
- Fixed handling of long responses (on InfluxDB 1.2+)
1.1.0 (2016-12-15)
- Added the
KeyedTuple._asdict()
method - Fixed wrong quoting of string values (should use single quotes, not double quotes)
1.0.0 (2016-12-12)
- Initial release