使用 Jupyter 和 InfluxDB 流式处理时间序列

导航至

Jupyter Notebooks 非常棒,因为它们提供了一种在同一位置共享代码、解释和可视化的方式。Notebooks 为计算添加了叙述。单元格分隔步骤,减少了与编辑代码相关的恐惧或犹豫。通过这种方式,notebooks 充当了实验的邀请。

今天,我想扩展这种邀请并将其应用于 InfluxDB。在这篇文章中,我们将学习如何使用 FluxInfluxDB v2.0 查询我们的系统统计数据。然后,我们将在 Jupyter Notebook 中实时显示结果。

Streaming Time Series with Jupyter and InfluxDB<figcaption> 我们将通过 hvPlotStreamzRxPY 执行的可视化类型。本教程的 notebook 可以在此处找到。</figcaption>

导入和连接

首先,我们必须导入我们的依赖项

from datetime import timedelta
from typing import List

import hvplot.streamz
import pandas as pd
import rx
from rx import operators as ops

from streamz.dataframe import Random, DataFrame
from streamz import Stream
from influxdb_client import InfluxDBClient

接下来,我们必须建立连接。如果您在 AWS Oregon 上运行 InfluxDB Cloud,以下是如何操作

client = InfluxDBClient(url='https://us-west-2-1.aws.cloud2.influxdata.com', token='my-token', org='my-org')

对于 AWS Frankfurt,它是

client = InfluxDBClient(url=https://eu-central-1-1.aws.cloud2.influxdata.com', token='my-token', org='my-org')

对于 Google Cloud Iowa,它是

client = InfluxDBClient(url='https://us-central1-1.gcp.cloud2.influxdata.com', token='my-token', org='my-org')

最后,如果您在本地运行 InfluxDB 开源版本,以下是如何连接

client = InfluxDBClient(url='http://localhost:9999', token='my-token', org='my-org')

源数据函数

此函数是此 notebook 的核心。它使用 Pandas 功能 query_data_frame(),客户端的功能将数据作为 dataframe 返回,auto_refresh 由用户指定。该函数的工作原理是将用户的 Flux 查询,即 tail Flux 查询,附加到 source_data() 指定的起始 Flux 查询。

def source_data(auto_refresh: int, query: str, sink: Stream):
    rx \
        .interval(period=timedelta(seconds=auto_refresh)) \
        .pipe(ops.map(lambda start: f'from(bucket: "my-bucket") '
                                    f'|> range(start: -{auto_refresh}s, stop: now()) '
                                    f'{query}')) \
        .pipe(ops.map(lambda query: client.query_api().query_data_frame(query, data_frame_index=['_time']))) \
        .pipe(ops.map(lambda data_frame: data_frame.drop(columns=['result', 'table']))) \
        .subscribe(observer=lambda data_frame: sink.emit(data_frame), on_error=lambda error: print(error))
    pass

生成自动刷新 DataFrame

创建 tail Flux 查询

cpu_query = '|> filter(fn: (r) => r._measurement == "cpu") ' \
            '|> filter(fn: (r) => r._field == "usage_user") ' \
            '|> filter(fn: (r) => r.cpu == "cpu-total") ' \
            '|> keep(columns: ["_time", "_value"])'

使用 source_data() 生成自动刷新 DataFrame。这将每五秒刷新一次,但您可以根据需要更改它

cpu_sink = Stream()
cpu_example = pd.DataFrame({'_value': []}, columns=['_value'])
cpu_df = DataFrame(cpu_sink, example=cpu_example)

source_data(auto_refresh=5, sink=cpu_sink, query=cpu_query)

使用 hvPlot 和 Streamz 创建可视化

现在是创建图表的时候了。为此,导入 bokeh DatetimeTick formatter,以便在各种尺度上漂亮地显示日期时间值,并指定每个尺度属性

from bokeh.models.formatters import DatetimeTickFormatter

# Time formatter
formatter = DatetimeTickFormatter(
    microseconds = ["%H:%M:%S"],
    milliseconds = ["%H:%M:%S"],
    seconds = ["%H:%M:%S"],
    minsec = ["%H:%M:%S"],
    minutes = ["%H:%M:%S"],
    hourmin = ["%H:%M:%S"],
    hours=["%H:%M:%S"],
    days=["%H:%M:%S"],
    months=["%H:%M:%S"],
    years=["%H:%M:%S"],
)

接下来,生成 hvPlot

cpu_df.hvplot(width=450, backlog=50, title='CPU % usage', xlabel='Time', ylabel='%', xformatter=formatter)

使用 InfluxDB v2.0 Python 客户端进行 Jupyter 可视化、TensorFlow 预测等

使用 TensorFlow 进行预测使用 Prophet 进行预测,以及 多进程写入 只是最近 InfluxDB v2.0 Python 客户端发生的一些有趣的事情。我希望本教程和实现与其余部分相提并论!与往常一样,如果您遇到障碍,请在我们的 社区网站Slack 频道上分享它们。我们很乐意获得您的反馈并帮助您解决遇到的任何问题。