Python 和 InfluxDB 入门
作者:Noah Crowley / 产品, 用例, 开发者, 入门
2018 年 3 月 30 日
导航至
如果您正在寻找监控您的基础设施或第三方应用程序,那么 Telegraf 的内置插件是一个不错的选择,无论您是在查看磁盘和网络利用率等系统资源还是 MySQL 数据库的性能。InfluxDB 包括 各种工具,供开发人员摄取、存储和查询时间序列数据。
但是,如果您正在构建一个应用程序,您想在 时间序列数据库 中存储用户数据,该怎么办?也许它是一个物联网或智能家居应用程序,每个用户都需要访问来自他们的智能牙刷的读数。您想存储每次刷牙会话的时间和持续时间,发送警报以提醒孩子们刷牙,并跟踪电池健康状况和当前刷头已使用多长时间等信息。
收集自定义数据,无论是用于面向用户的应用程序还是用于 Telegraf 插件尚未涵盖的基础设施需求,都可能需要编写新代码。
对于智能牙刷示例,也许您有一个运行嵌入式 Linux 的基站,并通过蓝牙与牙刷通信。您已经编写了代码来监听传入的数据,并且它似乎运行良好;现在您需要将其放入 InfluxDB。
一种方法是与您的应用程序一起运行 Telegraf,并通过 Unix、UDP 或 TCP 套接字将您的数据发送给它,让 Telegraf 处理与 InfluxDB 的连接以及点的批处理和写入。
如果您只需要收集数据,这很棒,但是如果您需要查询和检索用户的数据,您可能需要利用各种语言提供的 InfluxDB 库之一来处理应用程序本身内与 InfluxDB 的交互。
现在已经有很多语言拥有 InfluxDB 库,其中许多库由社区维护。我们将在本文中仔细研究使用 influxdb-python 库,但如果 Python 不是您的风格,您可以在 InfluxDB API 客户端库页面 上找到库列表。
InfluxDB Python 客户端库
虽然 influxdb-python 库 由 InfluxDB 的 GitHub 帐户托管,但它由三位社区志愿者 @aviau、@xginn8 和 @sebito91 维护。非常感谢他们的辛勤工作以及对社区的贡献。
您需要准备什么
以下示例在 MacOS 系统(通过 Homebrew 安装了 Python 3(此处提供说明))和使用默认 Python 3 安装的 Ubuntu 16.04 系统上进行了测试。
Python 的安装可能会变得有点棘手;语言的不同版本,以及需要不同版本已安装库的项目,可能会很快导致冲突。虽然我们不会在此处详细介绍 Python 安装,但了解各种版本是如何安装和相互交互的,以及研究 virtualenv 或 pyenv 等其他工具可能会很有用。
您可以在 The Hitchhiker’s Guide to Python (Mac, Linux) 中找到有关安装 Python 和其他工具的其他文章。
我们还将数据发送到 InfluxDB 的本地实例。如果您还没有,您可以按照 我们文档页面上的安装说明,或使用 sandbox 脚本 来启动 Docker 中的完整 TICK Stack。
安装库
与许多 Python 库一样,启动和运行的最简单方法是使用 pip
安装库。
我们将使用 Python 命令的 -m
参数运行 pip
,以确保哪个 Python 是安装目标(根据 Raymond Hettinger 的 提示)。
$ python3 -m pip install influxdb
您应该看到一些输出指示成功。
我们将使用 REPL 逐步完成 Python 库的一些功能,以便我们可以输入命令并立即查看其输出。现在启动 REPL,并从 python-influxdb
库导入 InfluxDBClient
,以确保已安装它
$ python3
Python 3.6.4 (default, Mar 9 2018, 23:15:03)
[GCC 4.2.1 Compatible Apple LLVM 9.0.0 (clang-900.0.39.2)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> from influxdb import InfluxDBClient
>>>
没有错误 - 看起来我们准备好了!
建立连接
下一步将是创建一个新的 InfluxDBClient 实例(API 文档),其中包含有关我们要访问的服务器的信息。在您的 REPL 中输入以下命令,将 host
和 port
的值替换为您 InfluxDB 主机的相应 URL/IP 地址和端口。在本例中,我们在默认端口上本地运行
>>> client = InfluxDBClient(host='localhost', port=8086)
除了用户名和密码、要连接的数据库、是否使用 SSL、超时和 UDP 参数之外,InfluxDBClient
构造函数还有一些其他参数可用。
如果您想使用用户名 myuser
和密码 mypass
并使用 SSL 连接到端口 8086
上的远程主机 mydomain.com
,则可以使用以下命令,该命令使用两个附加参数 ssl=True
和 ssl_verify=True
启用 SSL 和 SSL 验证
>>> client = InfluxDBClient(host='mydomain.com', port=8086, username='myuser', password='mypass' ssl=True, verify_ssl=True)
现在,让我们创建一个名为 pyexample
的新数据库来存储我们的数据
>>> client.create_database('pyexample')
我们可以使用客户端的 get_list_database()
函数检查数据库是否存在
>>> client.get_list_database()
[{'name': 'telegraf'}, {'name': '_internal'}, {'name': 'pyexample'}]
它在那里,除了我在安装中拥有的 telegraf
和 _internal
数据库。最后,我们将客户端设置为使用此数据库
>>> client.switch_database('pyexample')
插入数据
现在我们有了一个可以写入数据的数据库,并且我们的客户端已正确配置,是时候插入一些数据了!我们将使用客户端的 write_points()
方法来执行此操作(API 文档)。此方法接受一个点列表和一些其他参数,包括“批处理大小”,这使我们能够批量插入数据,而不是一次性插入所有数据。如果您要插入大量数据,这可能很有用。
write_points()
方法有一个名为 points
的参数,它是一个字典列表,其中包含要写入数据库的点。现在让我们创建一些示例数据并插入它。首先,让我们以 JSON 格式向名为 json_body
的变量添加三个点
>>> json_body = [
{
"measurement": "brushEvents",
"tags": {
"user": "Carol",
"brushId": "6c89f539-71c6-490d-a28d-6c5d84c0ee2f"
},
"time": "2018-03-28T8:01:00Z",
"fields": {
"duration": 127
}
},
{
"measurement": "brushEvents",
"tags": {
"user": "Carol",
"brushId": "6c89f539-71c6-490d-a28d-6c5d84c0ee2f"
},
"time": "2018-03-29T8:04:00Z",
"fields": {
"duration": 132
}
},
{
"measurement": "brushEvents",
"tags": {
"user": "Carol",
"brushId": "6c89f539-71c6-490d-a28d-6c5d84c0ee2f"
},
"time": "2018-03-30T8:02:00Z",
"fields": {
"duration": 129
}
}
]
这些指示我们的智能牙刷的“刷牙事件”;每个事件都发生在早上 8 点左右,使用牙刷的人的用户名和牙刷本身的 ID 进行标记(以便我们可以跟踪每个刷头已使用多长时间),并且有一个字段包含用户刷牙的时长(以秒为单位)。
由于我们已经设置了数据库,并且 write_points()
的默认输入是 JSON,因此我们可以使用 json_body
变量作为唯一参数来调用该方法,如下所示
>>> client.write_points(json_body)
True
如果写入操作成功,您应该看到函数返回的响应 True
。如果您正在构建一个应用程序,您希望这种数据收集是自动的,每次用户与牙刷交互时都向数据库添加点。
查询数据
现在我们数据库中有了一些数据,让我们尝试运行一些查询以将其取回。我们将使用与写入数据相同的客户端对象,但这次我们将在 InfluxDB 上执行查询,并使用客户端的 query()
函数(API 文档)获取结果。
>>> client.query('SELECT "duration" FROM "pyexample"."autogen"."brushEvents" WHERE time > now() - 4d GROUP BY "user"')
>>>
query()
函数返回一个 ResultSet
对象(API 文档),其中包含结果的所有数据以及一些便捷方法。我们的查询请求我们 pyexample
数据库中的所有测量值,按用户分组。您可以使用 .raw
参数访问来自 InfluxDB 的原始 JSON 响应
>>> results.raw
{'statement_id': 0, 'series': [{'name': 'brushEvents', 'tags': {'user': 'Carol'}, 'columns': ['time', 'duration'], 'values': [['2018-03-28T08:01:00Z', 127], ['2018-03-29T08:04:00Z', 132], ['2018-03-30T08:02:00Z', 129]]}]}
但是,在大多数情况下,您不需要直接访问 JSON。相反,您可以使用 ResultSet
的 get_points()
方法从请求中获取测量值,按标签或字段进行筛选。如果您想遍历 Carol 的所有刷牙会话;您可以使用以下命令获取在标签“user”下分组的所有点,值为“Carol”
>>> points = results.get_points(tags={'user':'Carol'})
points
在这种情况下是一个 Python 生成器,它是一个类似于迭代器工作的函数;您可以使用 for x in y
循环对其进行迭代,如下所示
>>> points = results.get_points(tags={'user': 'Carol'})
>>> for point in points:
... print("Time: %s, Duration: %i" % (point['time'], point['duration']))
...
Time: 2018-03-28T08:01:00Z, Duration: 127
Time: 2018-03-29T08:04:00Z, Duration: 132
Time: 2018-03-30T08:02:00Z, Duration: 129
根据您的应用程序,您可能会迭代这些点来计算用户的平均刷牙时间,或者只是验证每天发生了 X 次刷牙事件。
如果您有兴趣跟踪单个刷头已使用的时间,您可以替换一个新的 查询,该查询根据 brushId
对点进行分组,然后获取每个点的持续时间并将其添加到总和中。在某个时候,您可以提醒您的用户是时候更换刷头了
>>> results = client.query('SELECT "duration" FROM "pyexample"."autogen"."brushEvents" WHERE time > now() - 4d GROUP BY "brushId"')
>>> points = results.get_points(tags={'brushId': '6c89f539-71c6-490d-a28d-6c5d84c0ee2f'})
>>> brush_usage_total = 0
>>> for point in points:
... brush_usage_total = brush_usage_total + point['duration']
...
>>> if brush_usage_total > 350:
... print("You've used your brush head for %s seconds, more than the recommended amount! Time to replace your brush head!" % brush_usage_total)
...
You've used your brush head for 388 seconds, more than the recommended amount! Time to replace your brush head!
>>>
其他文档和功能
influx-python
库包含我们在上面文章中未涵盖的相当多的附加功能。客户端中还有其他管理功能,例如添加用户、管理数据库和删除测量值,以及其他对象,例如 SeriesHelper
,它为批量写入点提供了一些便捷功能,以及 DataFrameClient
,它简化了与 PANDAS 和 DataFrames 的集成。
如果您有兴趣在您的项目中使用此库,那么花一些时间阅读 API 文档 和 源代码,不仅要了解提供的功能,还要了解其幕后工作方式,这是有意义的。
如果您使用 InfluxDB 构建了一些很酷的东西,我们很乐意在我们的博客上进行专题报道,因此请在 Twitter 上与我们分享 @InfluxDB!