使用 Jupyter 和 InfluxDB 流式处理时间序列
作者:Anais Dotis-Georgiou / 产品, 用例, 开发者
2020 年 2 月 7 日
导航至
Jupyter Notebooks 非常棒,因为它们提供了一种在同一位置共享代码、解释和可视化的方式。Notebooks 为计算添加了叙述。单元格分隔步骤,减少了与编辑代码相关的恐惧或犹豫。通过这种方式,notebooks 充当了实验的邀请。
今天,我想扩展这种邀请并将其应用于 InfluxDB。在这篇文章中,我们将学习如何使用 Flux 从 InfluxDB v2.0 查询我们的系统统计数据。然后,我们将在 Jupyter Notebook 中实时显示结果。
<figcaption> 我们将通过 hvPlot、Streamz、RxPY 执行的可视化类型。本教程的 notebook 可以在此处找到。</figcaption>
导入和连接
首先,我们必须导入我们的依赖项
from datetime import timedelta
from typing import List
import hvplot.streamz
import pandas as pd
import rx
from rx import operators as ops
from streamz.dataframe import Random, DataFrame
from streamz import Stream
from influxdb_client import InfluxDBClient
接下来,我们必须建立连接。如果您在 AWS Oregon 上运行 InfluxDB Cloud,以下是如何操作
client = InfluxDBClient(url='https://us-west-2-1.aws.cloud2.influxdata.com', token='my-token', org='my-org')
对于 AWS Frankfurt,它是
client = InfluxDBClient(url=https://eu-central-1-1.aws.cloud2.influxdata.com', token='my-token', org='my-org')
对于 Google Cloud Iowa,它是
client = InfluxDBClient(url='https://us-central1-1.gcp.cloud2.influxdata.com', token='my-token', org='my-org')
最后,如果您在本地运行 InfluxDB 开源版本,以下是如何连接
client = InfluxDBClient(url='http://localhost:9999', token='my-token', org='my-org')
源数据函数
此函数是此 notebook 的核心。它使用 Pandas 功能 query_data_frame()
,客户端的功能将数据作为 dataframe 返回,auto_refresh
由用户指定。该函数的工作原理是将用户的 Flux 查询,即 tail Flux 查询,附加到 source_data() 指定的起始 Flux 查询。
def source_data(auto_refresh: int, query: str, sink: Stream):
rx \
.interval(period=timedelta(seconds=auto_refresh)) \
.pipe(ops.map(lambda start: f'from(bucket: "my-bucket") '
f'|> range(start: -{auto_refresh}s, stop: now()) '
f'{query}')) \
.pipe(ops.map(lambda query: client.query_api().query_data_frame(query, data_frame_index=['_time']))) \
.pipe(ops.map(lambda data_frame: data_frame.drop(columns=['result', 'table']))) \
.subscribe(observer=lambda data_frame: sink.emit(data_frame), on_error=lambda error: print(error))
pass
生成自动刷新 DataFrame
创建 tail Flux 查询
cpu_query = '|> filter(fn: (r) => r._measurement == "cpu") ' \
'|> filter(fn: (r) => r._field == "usage_user") ' \
'|> filter(fn: (r) => r.cpu == "cpu-total") ' \
'|> keep(columns: ["_time", "_value"])'
使用 source_data() 生成自动刷新 DataFrame。这将每五秒刷新一次,但您可以根据需要更改它
cpu_sink = Stream()
cpu_example = pd.DataFrame({'_value': []}, columns=['_value'])
cpu_df = DataFrame(cpu_sink, example=cpu_example)
source_data(auto_refresh=5, sink=cpu_sink, query=cpu_query)
使用 hvPlot 和 Streamz 创建可视化
现在是创建图表的时候了。为此,导入 bokeh DatetimeTick formatter,以便在各种尺度上漂亮地显示日期时间值,并指定每个尺度属性
from bokeh.models.formatters import DatetimeTickFormatter
# Time formatter
formatter = DatetimeTickFormatter(
microseconds = ["%H:%M:%S"],
milliseconds = ["%H:%M:%S"],
seconds = ["%H:%M:%S"],
minsec = ["%H:%M:%S"],
minutes = ["%H:%M:%S"],
hourmin = ["%H:%M:%S"],
hours=["%H:%M:%S"],
days=["%H:%M:%S"],
months=["%H:%M:%S"],
years=["%H:%M:%S"],
)
接下来,生成 hvPlot
cpu_df.hvplot(width=450, backlog=50, title='CPU % usage', xlabel='Time', ylabel='%', xformatter=formatter)
使用 InfluxDB v2.0 Python 客户端进行 Jupyter 可视化、TensorFlow 预测等
使用 TensorFlow 进行预测、使用 Prophet 进行预测,以及 多进程写入 只是最近 InfluxDB v2.0 Python 客户端发生的一些有趣的事情。我希望本教程和实现与其余部分相提并论!与往常一样,如果您遇到障碍,请在我们的 社区网站或 Slack 频道上分享它们。我们很乐意获得您的反馈并帮助您解决遇到的任何问题。