使用Python和InfluxDB入门
作者:Noah Crowley / 产品,用例,开发者,入门
2018年3月30日
导航到
如果您正在寻找监控基础设施或第三方应用程序的方法,那么Telegraf内置插件是一个不错的选择,无论是查看系统资源如磁盘和网络利用率,还是MySQL数据库的性能。InfluxDB包括各种工具,以帮助开发者摄取、存储和查询时间序列数据。
那么,如果您正在构建一个需要将用户数据存储在时间序列数据库中的应用程序,会怎样呢?也许它是一个物联网或智能家居应用程序,每个用户都需要访问他们的智能牙刷的读数。您想要存储每次刷牙的时间长度,发送提醒孩子们刷牙的警报,并跟踪电池健康和当前刷头使用时间等信息。
收集自定义数据,无论是面向用户的程序还是Telegraf插件尚未覆盖的基础设施需求,可能都需要编写新的代码。
以智能牙刷为例,你可能有一个基站,它运行嵌入式Linux,并通过蓝牙与牙刷通信。你已经编写了监听传入数据的代码,并且看起来工作得很好;现在你需要将其数据存入InfluxDB。
一种方法是在你的应用程序旁边运行Telegraf,并通过Unix、UDP或TCP套接字发送数据,让Telegraf处理与InfluxDB的连接、批处理和点写入。
如果你只需要收集数据,这很好,但如果你需要查询并检索用户的数据,你可能会想利用各种语言中可用的InfluxDB库,在应用程序内部本身处理与InfluxDB的交互。
有许多语言已经有了InfluxDB库,其中许多都是由社区维护的。在这篇文章中,我们将更详细地探讨使用influxdb-python库,但如果Python不适合你,你可以在InfluxDB API客户端库页面找到库列表。
InfluxDB Python客户端库
influxdb-python库由InfluxDB的GitHub账户托管,但它由三位社区志愿者维护:@aviau、@xginn8和@sebito91。非常感谢他们的辛勤工作和为社区做出的贡献。
你需要什么
以下示例是在安装了Python 3的MacOS系统上测试的,Python 3是通过Homebrew安装的(安装说明在此),以及使用默认Python 3安装的Ubuntu 16.04系统。
Python的安装可能会有些棘手;不同版本的语言以及需要安装不同版本库的项目可能会迅速导致冲突。虽然我们不会在这里详细介绍Python的安装,但了解各种版本是如何安装和相互交互的,以及了解像virtualenv或pyenv这样的额外工具可能是有用的。
你可以在《Python的流浪者指南》(Mac、Linux)中找到有关安装Python和附加工具的一些额外文章。
我们还将向本地InfluxDB实例发送数据。如果你还没有,你可以按照我们文档页面上的安装说明操作,或者使用sandbox脚本在Docker中启动完整的TICK堆栈。
安装库
与许多Python库一样,启动和运行的最简单方法是使用pip
安装库。
我们将使用Python命令的-m
参数运行pip
,以确保确定安装目标是哪个Python(如Raymond Hettinger提供的此提示)。
$ python3 -m pip install influxdb
你应该会看到一些表示成功的输出。
我们将使用REPL(交互式解释器)来演示Python库的一些功能,以便我们可以输入命令并立即看到它们的输出。现在让我们启动REPL,并从python-influxdb
库中导入InfluxDBClient
,以确保它已安装。
$ python3
Python 3.6.4 (default, Mar 9 2018, 23:15:03)
[GCC 4.2.1 Compatible Apple LLVM 9.0.0 (clang-900.0.39.2)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> from influxdb import InfluxDBClient
>>>
没有错误 - 看起来我们已准备好开始了!
建立连接
下一步是创建一个InfluxDBClient的新实例(API文档),其中包含我们想要访问的服务器信息。在您的REPL中输入以下命令,用适当的URL/IP地址和端口替换host
和port
的值。在本例中,我们正在本地默认端口上运行。
>>> client = InfluxDBClient(host='localhost', port=8086)
InfluxDBClient
构造函数有一些额外的参数可用,包括用户名和密码、要连接的数据库、是否使用SSL、超时和UDP参数。
如果您想要连接到mydomain.com
上的远程主机,端口为8086
,用户名为myuser
,密码为mypass
,并使用SSL,则可以使用以下命令代替,该命令启用SSL和SSL验证,并带有两个额外的参数ssl=True
和ssl_verify=True
。
>>> client = InfluxDBClient(host='mydomain.com', port=8086, username='myuser', password='mypass' ssl=True, verify_ssl=True)
现在,让我们创建一个名为pyexample
的新数据库来存储我们的数据
>>> client.create_database('pyexample')
我们可以通过使用客户端的get_list_database()
函数来检查数据库是否存在。
>>> client.get_list_database()
[{'name': 'telegraf'}, {'name': '_internal'}, {'name': 'pyexample'}]
就在那里,除了我在安装上有的telegraf
和_internal
数据库外。最后,我们将客户端设置为此数据库
>>> client.switch_database('pyexample')
插入数据
现在我们已经有了可以写入数据的数据库,并且客户端已经正确配置,是时候插入一些数据了!我们将使用客户端的write_points()
方法来实现这一点(API文档)。此方法接受一系列点和一些额外的参数,包括“批量大小”,它允许我们批量插入数据,而不是一次性插入。如果您正在插入大量数据,这可能很有用。
write_points()
方法有一个名为points
的参数,它是一个字典列表,包含要写入数据库的点。现在让我们创建一些示例数据并将其插入。首先,让我们将三个JSON格式的点添加到一个名为json_body
的变量中。
>>> json_body = [
{
"measurement": "brushEvents",
"tags": {
"user": "Carol",
"brushId": "6c89f539-71c6-490d-a28d-6c5d84c0ee2f"
},
"time": "2018-03-28T8:01:00Z",
"fields": {
"duration": 127
}
},
{
"measurement": "brushEvents",
"tags": {
"user": "Carol",
"brushId": "6c89f539-71c6-490d-a28d-6c5d84c0ee2f"
},
"time": "2018-03-29T8:04:00Z",
"fields": {
"duration": 132
}
},
{
"measurement": "brushEvents",
"tags": {
"user": "Carol",
"brushId": "6c89f539-71c6-490d-a28d-6c5d84c0ee2f"
},
"time": "2018-03-30T8:02:00Z",
"fields": {
"duration": 129
}
}
]
这些表示我们的智能牙刷的“刷牙事件”;每个事件都在早上8点左右发生,带有使用牙刷的人的用户名和牙刷本身的ID(因此我们可以跟踪每个刷头使用的时间),并且有一个字段包含用户刷牙的时间,以秒为单位。
由于我们已经设置了数据库,并且write_points()
的默认输入是JSON,我们可以使用我们的json_body
变量作为唯一参数调用该方法,如下所示
>>> client.write_points(json_body)
True
如果您看到函数返回了响应 True
,则表示写入操作已成功。如果您正在构建应用程序,您可能希望此数据集自动添加,每次用户与电动牙刷交互时都在数据库中添加分数。
查询数据
现在我们在数据库中已经有了一些数据,让我们尝试运行一些查询来获取它。我们将使用与写入数据相同的客户端对象,但这次我们将在InfluxDB上执行查询,并使用客户端的 query()
函数获取结果(API 文档)。
>>> client.query('SELECT "duration" FROM "pyexample"."autogen"."brushEvents" WHERE time > now() - 4d GROUP BY "user"')
>>>
query()
函数返回一个 ResultSet
对象(API 文档),其中包含结果的所有数据以及一些便捷方法。我们的查询请求我们 pyexample
数据库中的所有测量值,按用户分组。您可以使用 .raw
参数访问InfluxDB的原始JSON响应
>>> results.raw
{'statement_id': 0, 'series': [{'name': 'brushEvents', 'tags': {'user': 'Carol'}, 'columns': ['time', 'duration'], 'values': [['2018-03-28T08:01:00Z', 127], ['2018-03-29T08:04:00Z', 132], ['2018-03-30T08:02:00Z', 129]]}]}
然而,在大多数情况下,您不需要直接访问JSON。相反,您可以使用 ResultSet
的 get_points()
方法从请求中获取测量值,通过标签或字段进行过滤。如果您想迭代Carol的所有刷牙时间;您可以使用此命令获取所有分组在标签“user”下,值为“Carol”的点
>>> points = results.get_points(tags={'user':'Carol'})
在这个例子中,points
是一个Python生成器,它是一个类似于迭代器的函数;您可以使用 for x in y
循环迭代它,如下所示
>>> points = results.get_points(tags={'user': 'Carol'})
>>> for point in points:
... print("Time: %s, Duration: %i" % (point['time'], point['duration']))
...
Time: 2018-03-28T08:01:00Z, Duration: 127
Time: 2018-03-29T08:04:00Z, Duration: 132
Time: 2018-03-30T08:02:00Z, Duration: 129
根据您的应用程序,您可能需要迭代这些点来计算用户的平均刷牙时间,或者只是验证每天有X次刷牙事件。
如果您想跟踪单个刷头使用的时间量,您可以用一个新的查询替换,该查询根据 brushId
对点进行分组,然后添加每个点的持续时间。在某个时候,您可以提醒用户是时候更换他们的刷头了
>>> results = client.query('SELECT "duration" FROM "pyexample"."autogen"."brushEvents" WHERE time > now() - 4d GROUP BY "brushId"')
>>> points = results.get_points(tags={'brushId': '6c89f539-71c6-490d-a28d-6c5d84c0ee2f'})
>>> brush_usage_total = 0
>>> for point in points:
... brush_usage_total = brush_usage_total + point['duration']
...
>>> if brush_usage_total > 350:
... print("You've used your brush head for %s seconds, more than the recommended amount! Time to replace your brush head!" % brush_usage_total)
...
You've used your brush head for 388 seconds, more than the recommended amount! Time to replace your brush head!
>>>
附加文档和功能
influx-python
库包含了一些我们上面文章中没有涉及到的附加功能。客户端中还有额外的管理功能,例如添加用户、管理数据库和删除测量值,以及额外的对象,如 SeriesHelper
,它提供了批量写入点的便捷功能,以及 DataFrameClient
,它简化了与 PANDAS 和数据框的集成。
如果您有兴趣在项目中使用此库,那么花些时间研究 API 文档 和 源代码 是有意义的,理解不仅提供了哪些功能,还有它们背后的工作方式。
如果您用InfluxDB构建了很酷的东西,我们很乐意在博客上展示它,所以请在Twitter上与我们分享 @InfluxDB!