TL;DR InfluxDB 技术提示:使用 Flight SQL 和 AWS Lambda 进行降采样

导航至

本教程介绍了如何使用 AWS Lambda 在 InfluxDB Cloud (从 2 月 1 日起在 AWS us-east-1 和 AWS eu-central-1 上可用) 中使用新的 InfluxDB 存储引擎 InfluxDB IOx 执行降采样。本教程介绍了如何:

  1. 使用 Flight SQL Python 库查询 InfluxDB Cloud 实例。

  2. 使用 SQL 对数据进行降采样。

  3. 将查询输出转换为 Pandas DataFrame。

  4. 使用 InfluxDB v2 Python 客户端库 将数据写回 InfluxDB。

  5. 创建并测试 AWS Lambda 函数。

  6. 使用 CloudWatch 或 EventBridge 按计划触发 Lambda 函数。

InfluxDB IOx 解决了关键用户需求,包括(但不限于):

  • 无限 基数

  • 存储日志和跟踪数据的能力。

  • 与机器学习和商业分析工具的互操作性。

  • SQL 支持和快速分析。

我们通过在 Apache 生态系统(Apache ParquetApache DataFusionApache ArrowApache Flight SQL)上构建 InfluxDB IOx 来实现这些目标。

要了解更多关于我们为何选择这些技术以及实施这些技术的决定如何帮助 InfluxDB 实现这些目标的信息,请考虑阅读这篇博文

要求

本教程假设您满足以下要求:

  • 一个 AWS 账户。按照本文档创建一个。
  • Python 3.6
  • InfluxDB v2 Python 客户端库。使用 pip3 install 'influxdb-client[ciso]` 安装它。
  • Flight SQL Python 库。使用 pip install git+https://github.com/influxdata/flightsql-dbapi.gitbash pip3 install flightsql-dbapi
  • Pandas 库。使用 bash pip3 install flightsql-dbapi

您还可以使用以下 requirements.txt 安装所有以下依赖项:

pandas==0.23.4

influxdb_client==1.30.0
flightsql-dbapi==0.0.1

Arrow Flight SQL 的优势

Arrow Flight SQL 是“一种新的通用客户端-服务器框架,用于简化通过网络接口进行大型数据集的高性能传输。” 换句话说,Arrow Flight SQL 是一种协议,用于加速利用 Apache Arrow 的数据库的 SQL 数据库访问,Apache Arrow 是一个用于定义内存中列式数据的框架。 InfluxDB 使用 Apache Arrow 定义其内存数据,并使用 Flight SQL 传输这些大型数据集。这意味着 InfluxDB 用户现在可以轻松查询大型数据集。与 Flight SQL 和 Apache Arrow 相关的性能优势使得降采样、数据分析和 InfluxDB 外部的数据准备等任务变得可行和高效。这意味着用户可以使用他们最熟悉的工具和语言(如 Python 和 Pandas),可靠且高效地执行分析和数据管理任务(而无需使用 Flux 和 InfluxDB Task 系统)。Flight SQL 还支持 JDBC/ODBC 驱动程序,这意味着 InfluxDB 用户将能够将 InfluxDB 连接到商业分析和可视化工具,如 SupersetPowerBITableau。敬请期待更多关于这些主题的博文!

Python 脚本

以下脚本是使用 InfluxDB 的新数据库引擎进行数据降采样的最基本示例。此脚本遵循以下步骤:

  1. 导入我们的依赖项。
  2. 收集身份验证凭据,包括:
  3. 实例化 Flight SQL 客户端
  4. 执行 SQL 查询。这里我们使用 Date_Bin() 函数来执行降采样。我们将过去一小时的原始高精度数据转换为 1 分钟的求和值。
  5. 创建一个 reader 对象来消费结果。
  6. 将所有数据读取到 pyarrow.Table 对象中。
  7. 将数据转换为 Pandas DataFrame。注意:此脚本使用 SQL 执行降采样,以展示最简单的示例。但是,这种方法使开发人员能够利用 Pandas 在此步骤之后执行他们需要的任何数据操作和分析。
  8. 实例化 InfluxDB v2 Python 客户端库,并将 Pandas DataFrame 写回 InfluxDB。
# 1. Import dependencies. 
from flightsql import FlightSQLClient
import pandas as pd
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS

# 2. Gather authentication credentials. 
token = "-_KiZFxxx"
url = "https://us-east-1-1.aws.cloud2.influxdata.com/"
org = "28d1f2f565460a6c"
bucket = "anais-iox"

# 3. Instantiate the FlightSQL Client
client = FlightSQLClient(host='us-east-1-1.aws.cloud2.influxdata.com',
                        token=token,
                        metadata={'bucket-name': 'anais-iox'},
                        features={'metadata-reflection': 'true'})

# 4. Execute a query against InfluxDB's Flight SQL endpoint                        
query = client.execute("SELECT DATE_BIN(INTERVAL '1 minute', time, '2019-09-18T00:00:00Z'::timestamp) as time, SUM(\"co\") as 'sum_co', SUM(\"temperature\") as 'sum_temp', SUM(\"humidity\") as 'sum_hum' FROM \"airSensors\" WHERE time >= now() - interval '1 hour' GROUP BY time")

# 5. Create reader to consume result
reader = client.do_get(query.endpoints[0].ticket)

# 6. Read all data into a pyarrow.Table
Table = reader.read_all()
print(Table)

# 7. Convert to Pandas DataFrame
df = Table.to_pandas()
df = df.sort_values(by="time")
print(df)
data_frame = df

# 8. Write the Pandas DataFrame back to InfluxDB
with InfluxDBClient(url=url, token=token, org=org) as client:

    client.write_api(write_options=SYNCHRONOUS).write(bucket=bucket,
    record=data_frame,
    data_frame_measurement_name="downsampled",
    data_frame_timestamp_column="time")

创建 AWS Lambda 函数

要创建 AWS Lambda 函数,首先登录您的控制台。接下来,搜索 AWS Lambda 并选择该服务。然后,单击“创建函数”。

create function

现在您可以命名您的函数,指定您想要使用的语言、架构和任何特定权限。完成后,单击右下角的“创建函数”。

create function -2

现在您可以包含您的 Python 脚本。点击“测试”以确保它按预期工作。通过查询来验证您是否成功将降采样数据写入 InfluxDB。还建议您使用 AWS Secrets Manager,而不是简单地将令牌存储在脚本中。

code source

最后,选择“部署”以部署您的脚本。如果您希望执行降采样任务,则需要按计划运行 Lambda 脚本。您可以使用 CloudWatch 或 EventBridge 创建规则并将您的 AWS Lambda 函数作为目标,以按用户定义的计划运行。

根据您喜欢的服务使用以下文档:

在使用 AWS Lambda 函数执行降采样任务后,我们验证我们的数据是否正在被降采样并写入 InfluxDB 中的新 measurement “downsampled”。

downsampling-influxdb-aws-lambda

结论

要充分利用 InfluxDB IOx 的所有进步,请在此处注册。如果您想联系 InfluxDB IOx 开发人员,请加入 InfluxData 社区 Slack 并查找 #influxdb_iox 频道。

我希望这篇博文能够启发您探索 InfluxDB Cloud,并利用 Flight SQL 从 InfluxDB 传输大型数据集,以便使用您选择的工具进行数据处理。如果您需要任何帮助,请使用我们的社区站点Slack 频道联系我们。我很乐意了解您尝试实现的目标以及您希望 InfluxDB 中的任务系统具有哪些功能。