使用 Jupyter 和 InfluxDB 进行流时序处理

导航至

Jupyter Notebooks 是非常棒的,因为它们提供了一种在同一位置分享代码、解释和可视化的方式。Notebooks 为计算增添了叙述。单元格将步骤隔开,减少了与编辑代码相关的恐惧或犹豫。以这种方式,notebooks 成为了实验的邀请。

今天,我想将这份邀请扩展到 InfluxDB。在这篇文章中,我们将学习如何使用 InfluxDB v2.0 中的 Flux 查询我们的系统统计数据。然后,我们将在 Jupyter Notebook 中实时显示结果。

使用 Jupyter 和 InfluxDB 进行流时序处理<figcaption> 我们将通过 hvPlotStreamzRxPY 进行可视化。本教程所附的笔记本可在此找到:这里。</figcaption>

导入和连接

首先,我们必须导入依赖项

from datetime import timedelta
from typing import List

import hvplot.streamz
import pandas as pd
import rx
from rx import operators as ops

from streamz.dataframe import Random, DataFrame
from streamz import Stream
from influxdb_client import InfluxDBClient

接下来,我们必须建立连接。如果您在 AWS Oregon 上运行 InfluxDB Cloud,以下是建立连接的方法

client = InfluxDBClient(url='https://us-west-2-1.aws.cloud2.influxdata.com', token='my-token', org='my-org')

对于 AWS Frankfurt,如下

client = InfluxDBClient(url=https://eu-central-1-1.aws.cloud2.influxdata.com', token='my-token', org='my-org')

对于 Google Cloud Iowa,如下

client = InfluxDBClient(url='https://us-central1-1.gcp.cloud2.influxdata.com', token='my-token', org='my-org')

最后,如果您在本地上运行 InfluxDB 开源版本,以下是连接方法

client = InfluxDBClient(url='https://127.0.0.1:9999', token='my-token', org='my-org')

源数据函数

此函数是本笔记本的核心。它使用客户端的 Pandas 功能 query_data_frame(),根据用户指定的 auto_refresh 返回数据作为 DataFrame。该函数通过将用户指定的 Flux 查询、尾部 Flux 查询附加到源数据()指定的起始 Flux 查询上来工作。

def source_data(auto_refresh: int, query: str, sink: Stream):
    rx \
        .interval(period=timedelta(seconds=auto_refresh)) \
        .pipe(ops.map(lambda start: f'from(bucket: "my-bucket") '
                                    f'|> range(start: -{auto_refresh}s, stop: now()) '
                                    f'{query}')) \
        .pipe(ops.map(lambda query: client.query_api().query_data_frame(query, data_frame_index=['_time']))) \
        .pipe(ops.map(lambda data_frame: data_frame.drop(columns=['result', 'table']))) \
        .subscribe(observer=lambda data_frame: sink.emit(data_frame), on_error=lambda error: print(error))
    pass

生成自动刷新的 DataFrame

创建尾部 Flux 查询

cpu_query = '|> filter(fn: (r) => r._measurement == "cpu") ' \
            '|> filter(fn: (r) => r._field == "usage_user") ' \
            '|> filter(fn: (r) => r.cpu == "cpu-total") ' \
            '|> keep(columns: ["_time", "_value"])'

使用 source_data() 生成自动刷新的 DataFrame。这将每五秒刷新一次,但您可以将其更改为满足您的需求

cpu_sink = Stream()
cpu_example = pd.DataFrame({'_value': []}, columns=['_value'])
cpu_df = DataFrame(cpu_sink, example=cpu_example)

source_data(auto_refresh=5, sink=cpu_sink, query=cpu_query)

使用 hvPlot 和 Streamz 创建可视化

现在是时候创建一些图表了。为此,导入 bokehDatetimeTick formatter,以在多个刻度上优雅地显示日期时间值,并指定每个刻度属性

from bokeh.models.formatters import DatetimeTickFormatter

# Time formatter
formatter = DatetimeTickFormatter(
    microseconds = ["%H:%M:%S"],
    milliseconds = ["%H:%M:%S"],
    seconds = ["%H:%M:%S"],
    minsec = ["%H:%M:%S"],
    minutes = ["%H:%M:%S"],
    hourmin = ["%H:%M:%S"],
    hours=["%H:%M:%S"],
    days=["%H:%M:%S"],
    months=["%H:%M:%S"],
    years=["%H:%M:%S"],
)

接下来,生成 hvPlot

cpu_df.hvplot(width=450, backlog=50, title='CPU % usage', xlabel='Time', ylabel='%', xformatter=formatter)

使用 InfluxDB v2.0 Python 客户端进行 Jupyter 可视化、TensorFlow 预测等

使用 TensorFlow 进行预测使用 Prophet 进行预测以及使用 2.0 版本 Python 客户端进行多进程写入这些都是近期 InfluxDB v2.0 Python 客户端中发生的一些有趣的事情。我希望这个教程和实现与其它内容相当!一如既往,如果您遇到任何难题,请在我们社区网站 Slack 频道上分享。我们非常乐意得到您的反馈,并帮助您解决遇到的问题。