TL;DR InfluxDB技术提示:使用Flight SQL和AWS Lambda进行降采样
作者:Anais Dotis-Georgiou / 开发者,产品
2023年2月2日
导航至
本教程介绍如何在InfluxDB Cloud中(从2月1日起在AWS us-east-1和AWS eu-central-1上可用)使用AWS Lambda,通过新的InfluxDB存储引擎InfluxDB IOx进行降采样。本教程描述了如何
-
使用Flight SQL Python库查询InfluxDB Cloud实例。
-
使用SQL进行数据降采样。
-
将查询输出转换为Pandas DataFrame。
-
使用InfluxDB v2 Python客户端库将数据写回InfluxDB。
-
创建和测试AWS Lambda函数。
-
使用CloudWatch或EventBridge按计划触发Lambda函数。
InfluxDB IOx解决了关键用户需求,包括但不限于
-
无限基数。
-
存储日志和跟踪数据的能力。
-
与ML和商业分析工具的互操作性。
-
SQL支持和快速分析。
我们通过在Apache生态系统(Apache Parquet、Apache DataFusion、Apache Arrow和Apache Flight SQL)上构建InfluxDB IOx)来实现这些目标。
要了解更多关于我们为什么选择这些技术以及这些决策如何帮助InfluxDB实现这些目标的信息,请考虑阅读这篇文章。
要求
本教程假设您满足以下要求
- AWS账户。按照此文档创建一个。
- Python 3.6
- InfluxDB v2 Python客户端库。使用
pip3 install 'influxdb-client[ciso]`
安装它。 - Flight SQL Python库。使用
pip install git+https://github.com/influxdata/flightsql-dbapi.git
或bash pip3 install flightsql-dbapi
- Pandas库。使用
bash pip3 install flightsql-dbapi
您还可以使用以下requirements.txt安装所有以下依赖项
pandas==0.23.4
influxdb_client==1.30.0
flightsql-dbapi==0.0.1
Arrow Flight SQL的优势
Arrow Flight SQL 是一个“新的通用客户端-服务器框架,用于简化大数据集通过网络接口的高性能传输”。换句话说,Arrow Flight SQL 是一种协议,用于加速利用 Apache Arrow 的数据库的 SQL 数据库访问。Apache Arrow 是一个定义内存中列式数据的框架。InfluxDB 使用 Apache Arrow 定义其内存数据,并使用 Flight SQL 来传输这些大数据集。这意味着 InfluxDB 用户现在可以轻松查询大数据集。Flight SQL 和 Apache Arrow 的性能优势使得在 InfluxDB 外进行降采样、数据分析和数据准备等任务变得可行且高效。这意味着用户可以使用他们最熟悉的工具和语言(如 Python 和 Pandas),而不是必须使用 Flux 和 InfluxDB 任务系统)来可靠且高效地执行分析和管理任务。Flight SQL 还支持 JDBC/ODBC 驱动程序,这意味着 InfluxDB 用户将能够将 InfluxDB 连接到商业分析工具和可视化工具,如 Superset、PowerBI 和 Tableau。敬请期待更多相关博客文章!
Python 脚本
以下脚本是使用 InfluxDB 新数据库引擎进行降采样的最基本示例。此脚本遵循以下步骤
- 导入我们的依赖。
- 收集身份验证凭据,包括
- 所有访问令牌。 注意:如果您使用 AWS Lambda 来执行降采样任务,您可能希望使用 AWS Secrets Manager,而不是简单地将令牌存储在脚本中。
- URL
- 组织 ID (org ID).
- 桶.
- 实例化 Flight SQL 客户端
- 执行 SQL 查询。在这里,我们使用
Date_Bin()
函数进行降采样。我们将过去一小时的高精度原始数据转换为每分钟的汇总值。 - 创建一个读取器对象以消耗结果。
- 将所有数据读取到
pyarrow.Table
对象中。 - 将数据转换为 Pandas DataFrame。 注意:此脚本使用 SQL 进行降采样以展示最简单的示例。然而,这种方法使开发人员能够利用 Pandas 在此步骤之后进行任何数据操作和分析。
- 实例化 InfluxDB v2 Python 客户端库并将 Pandas DataFrame 写回 InfluxDB。
# 1. Import dependencies.
from flightsql import FlightSQLClient
import pandas as pd
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS
# 2. Gather authentication credentials.
token = "-_KiZFxxx"
url = "https://us-east-1-1.aws.cloud2.influxdata.com/"
org = "28d1f2f565460a6c"
bucket = "anais-iox"
# 3. Instantiate the FlightSQL Client
client = FlightSQLClient(host='us-east-1-1.aws.cloud2.influxdata.com',
token=token,
metadata={'bucket-name': 'anais-iox'},
features={'metadata-reflection': 'true'})
# 4. Execute a query against InfluxDB's Flight SQL endpoint
query = client.execute("SELECT DATE_BIN(INTERVAL '1 minute', time, '2019-09-18T00:00:00Z'::timestamp) as time, SUM(\"co\") as 'sum_co', SUM(\"temperature\") as 'sum_temp', SUM(\"humidity\") as 'sum_hum' FROM \"airSensors\" WHERE time >= now() - interval '1 hour' GROUP BY time")
# 5. Create reader to consume result
reader = client.do_get(query.endpoints[0].ticket)
# 6. Read all data into a pyarrow.Table
Table = reader.read_all()
print(Table)
# 7. Convert to Pandas DataFrame
df = Table.to_pandas()
df = df.sort_values(by="time")
print(df)
data_frame = df
# 8. Write the Pandas DataFrame back to InfluxDB
with InfluxDBClient(url=url, token=token, org=org) as client:
client.write_api(write_options=SYNCHRONOUS).write(bucket=bucket,
record=data_frame,
data_frame_measurement_name="downsampled",
data_frame_timestamp_column="time")
创建 AWS Lambda 函数
要创建 AWS Lambda 函数,首先登录到您的控制台。然后,搜索 AWS Lambda 并选择该服务。然后,点击 创建函数。
现在您可以命名您的函数,指定您想要使用的语言、架构以及任何特定的权限。完成设置后,点击右下角的 创建函数。
现在您可以包含您的Python脚本。点击 测试 以确保它按预期工作。通过查询以验证您是否成功将下采样数据写入InfluxDB。还建议您使用AWS Secrets Manager而不是简单地将令牌存储在脚本中。
最后,选择 部署 以部署您的脚本。如果您想要执行下采样任务,您需要按计划运行您的Lambda脚本。您可以使用CloudWatch或EventBridge创建规则,并将您的AWS Lambda函数目标为在用户定义的计划上运行。
根据您的喜好服务使用以下文档
使用AWS Lambda函数执行下采样任务后,我们验证我们的数据正在被下采样,并写入InfluxDB的新测量值,“downsampled”。
结论
要充分利用InfluxDB IOx的所有进步,请在此处注册此处。如果您想联系InfluxDB IOx开发者,请加入InfluxData Community Slack并查找#influxdb_iox频道。
我希望这篇博客文章能激发您探索InfluxDB云并利用Flight SQL将大型数据集从InfluxDB传输到您选择的数据处理工具。如果您需要任何帮助,请通过我们的社区网站或Slack频道联系。我很乐意了解您试图实现什么以及您希望InfluxDB的任务系统具有哪些功能。