TL;DR InfluxDB技术提示:使用Flight SQL和AWS Lambda进行降采样

导航至

本教程介绍如何在InfluxDB Cloud中(从2月1日起在AWS us-east-1和AWS eu-central-1上可用)使用AWS Lambda,通过新的InfluxDB存储引擎InfluxDB IOx进行降采样。本教程描述了如何

  1. 使用Flight SQL Python库查询InfluxDB Cloud实例。

  2. 使用SQL进行数据降采样。

  3. 将查询输出转换为Pandas DataFrame。

  4. 使用InfluxDB v2 Python客户端库将数据写回InfluxDB。

  5. 创建和测试AWS Lambda函数。

  6. 使用CloudWatch或EventBridge按计划触发Lambda函数。

InfluxDB IOx解决了关键用户需求,包括但不限于

  • 无限基数

  • 存储日志和跟踪数据的能力。

  • 与ML和商业分析工具的互操作性。

  • SQL支持和快速分析。

我们通过在Apache生态系统(Apache ParquetApache DataFusionApache ArrowApache Flight SQL)上构建InfluxDB IOx)来实现这些目标。

要了解更多关于我们为什么选择这些技术以及这些决策如何帮助InfluxDB实现这些目标的信息,请考虑阅读这篇文章

要求

本教程假设您满足以下要求

  • AWS账户。按照此文档创建一个。
  • Python 3.6
  • InfluxDB v2 Python客户端库。使用pip3 install 'influxdb-client[ciso]`安装它。
  • Flight SQL Python库。使用pip install git+https://github.com/influxdata/flightsql-dbapi.gitbash pip3 install flightsql-dbapi
  • Pandas库。使用bash pip3 install flightsql-dbapi

您还可以使用以下requirements.txt安装所有以下依赖项

pandas==0.23.4

influxdb_client==1.30.0
flightsql-dbapi==0.0.1

Arrow Flight SQL的优势

Arrow Flight SQL 是一个“新的通用客户端-服务器框架,用于简化大数据集通过网络接口的高性能传输”。换句话说,Arrow Flight SQL 是一种协议,用于加速利用 Apache Arrow 的数据库的 SQL 数据库访问。Apache Arrow 是一个定义内存中列式数据的框架。InfluxDB 使用 Apache Arrow 定义其内存数据,并使用 Flight SQL 来传输这些大数据集。这意味着 InfluxDB 用户现在可以轻松查询大数据集。Flight SQL 和 Apache Arrow 的性能优势使得在 InfluxDB 外进行降采样、数据分析和数据准备等任务变得可行且高效。这意味着用户可以使用他们最熟悉的工具和语言(如 Python 和 Pandas),而不是必须使用 Flux 和 InfluxDB 任务系统)来可靠且高效地执行分析和管理任务。Flight SQL 还支持 JDBC/ODBC 驱动程序,这意味着 InfluxDB 用户将能够将 InfluxDB 连接到商业分析工具和可视化工具,如 SupersetPowerBITableau。敬请期待更多相关博客文章!

Python 脚本

以下脚本是使用 InfluxDB 新数据库引擎进行降采样的最基本示例。此脚本遵循以下步骤

  1. 导入我们的依赖。
  2. 收集身份验证凭据,包括
  3. 实例化 Flight SQL 客户端
  4. 执行 SQL 查询。在这里,我们使用 Date_Bin() 函数进行降采样。我们将过去一小时的高精度原始数据转换为每分钟的汇总值。
  5. 创建一个读取器对象以消耗结果。
  6. 将所有数据读取到 pyarrow.Table 对象中。
  7. 将数据转换为 Pandas DataFrame。 注意:此脚本使用 SQL 进行降采样以展示最简单的示例。然而,这种方法使开发人员能够利用 Pandas 在此步骤之后进行任何数据操作和分析。
  8. 实例化 InfluxDB v2 Python 客户端库并将 Pandas DataFrame 写回 InfluxDB。
# 1. Import dependencies. 
from flightsql import FlightSQLClient
import pandas as pd
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS

# 2. Gather authentication credentials. 
token = "-_KiZFxxx"
url = "https://us-east-1-1.aws.cloud2.influxdata.com/"
org = "28d1f2f565460a6c"
bucket = "anais-iox"

# 3. Instantiate the FlightSQL Client
client = FlightSQLClient(host='us-east-1-1.aws.cloud2.influxdata.com',
                        token=token,
                        metadata={'bucket-name': 'anais-iox'},
                        features={'metadata-reflection': 'true'})

# 4. Execute a query against InfluxDB's Flight SQL endpoint                        
query = client.execute("SELECT DATE_BIN(INTERVAL '1 minute', time, '2019-09-18T00:00:00Z'::timestamp) as time, SUM(\"co\") as 'sum_co', SUM(\"temperature\") as 'sum_temp', SUM(\"humidity\") as 'sum_hum' FROM \"airSensors\" WHERE time >= now() - interval '1 hour' GROUP BY time")

# 5. Create reader to consume result
reader = client.do_get(query.endpoints[0].ticket)

# 6. Read all data into a pyarrow.Table
Table = reader.read_all()
print(Table)

# 7. Convert to Pandas DataFrame
df = Table.to_pandas()
df = df.sort_values(by="time")
print(df)
data_frame = df

# 8. Write the Pandas DataFrame back to InfluxDB
with InfluxDBClient(url=url, token=token, org=org) as client:

    client.write_api(write_options=SYNCHRONOUS).write(bucket=bucket,
    record=data_frame,
    data_frame_measurement_name="downsampled",
    data_frame_timestamp_column="time")

创建 AWS Lambda 函数

要创建 AWS Lambda 函数,首先登录到您的控制台。然后,搜索 AWS Lambda 并选择该服务。然后,点击 创建函数

create function

现在您可以命名您的函数,指定您想要使用的语言、架构以及任何特定的权限。完成设置后,点击右下角的 创建函数

create function -2

现在您可以包含您的Python脚本。点击 测试 以确保它按预期工作。通过查询以验证您是否成功将下采样数据写入InfluxDB。还建议您使用AWS Secrets Manager而不是简单地将令牌存储在脚本中。

code source

最后,选择 部署 以部署您的脚本。如果您想要执行下采样任务,您需要按计划运行您的Lambda脚本。您可以使用CloudWatch或EventBridge创建规则,并将您的AWS Lambda函数目标为在用户定义的计划上运行。

根据您的喜好服务使用以下文档

使用AWS Lambda函数执行下采样任务后,我们验证我们的数据正在被下采样,并写入InfluxDB的新测量值,“downsampled”。

downsampling-influxdb-aws-lambda

结论

要充分利用InfluxDB IOx的所有进步,请在此处注册此处。如果您想联系InfluxDB IOx开发者,请加入InfluxData Community Slack并查找#influxdb_iox频道。

我希望这篇博客文章能激发您探索InfluxDB云并利用Flight SQL将大型数据集从InfluxDB传输到您选择的数据处理工具。如果您需要任何帮助,请通过我们的社区网站Slack频道联系。我很乐意了解您试图实现什么以及您希望InfluxDB的任务系统具有哪些功能。