使用 InfluxDB 进行数据管道传输
作者:Anais Dotis-Georgiou / 开发者
2024 年 11 月 21 日
导航至
在本博文中,我们将探讨如何使用 Kafka、Faust 和 InfluxDB 构建数据管道,以有效地摄取、转换和存储数据。我们将从概述 Kafka(一种高性能消息传递平台)和 Faust(一个为流处理而设计的 Python 库,现在由社区维护为 Faust-streaming)开始。在建立这些工具的基础之后,我们将演示如何使用 Telegraf Kafka Consumer Input Plugin 从 Kafka 主题读取数据并将其写入 InfluxDB。最后,我将分享一个示例管道,将所有内容联系在一起。在此处查找与此博文配套的存储库:此处。scratchpad.md 包含本教程中使用的所有命令,以便于遵循。
使用 InfluxDB 与 Kafka、Faust 和 Telegraf 的示意图。数据发布到 Kafka 主题;Faust 读取并转换数据,然后将其发布到新主题;Telegraf 跟踪新主题并将数据写入 InfluxDB。
关于 Faust-streaming 的说明
Faust-streaming 是一个 Python 原生的流处理库,与 Kafka 集成良好,非常适合构建实时、事件驱动型应用程序。但是,它可能无法与基于 Java 的替代方案的性能或可扩展性相媲美,并且作为一个社区维护的项目,它可能缺乏在更成熟的框架中看到的强大支持和更新。
要求
本教程假定您满足以下要求
- InfluxDB Cloud 免费试用版 或其他版本的 InfluxDB
- Telegraf 本地安装
- Docker
- Python 包管理器,如 Pipenv
您还需要以下 InfluxDB 资源
- 存储桶
- 令牌
- 组织 ID(查找组织 ID 的最快方法是从 InfluxDB 的主机 URL 中找到,例如,https://us-east-1-1.aws.cloud2.influxdata.com/orgs/<您的组织 ID 在这里>)
我建议使用 UI 创建这些资源,因为这是最快的方法。您也可以使用 InfluxDB CLI 进行资源创建和管理,但您必须先配置 CLI。
要运行以下任何示例,您需要按照以下步骤操作
- 克隆存储库
git clone https://github.com/InfluxCommunity/kafka_faust_examples
- 更改目录
cd first
- 启动运行 Kafka 的容器
docker compose up -d
- 安装依赖项并激活虚拟环境
pipenv install pipenv shell
Kafka 和 Faust 的“Hello World”示例
“Hello World”示例演示了 Faust 流应用程序的基础知识。它设置了一个简单的 Faust 应用程序,该应用程序从 Kafka 主题读取事件并将每个事件打印到控制台。此示例介绍了 Faust 的结构,帮助您了解如何在最少的设置下创建、配置和运行 Faust 应用程序。
让我们看一下 Faust 代码
import faust
app = faust.App(
'hello_world',
broker='kafka://localhost:9092',
# Be explicit about using in-memory Table storage
store='memory://',
value_serializer='raw',
)
greetings_topic = app.topic('greetings')
@app.agent(greetings_topic)
async def greet(greetings):
async for greeting in greetings:
print(greeting)
此 Faust 应用程序首先创建一个名为“hello_world”的实例。该应用程序连接到在端口 9092 上本地运行的 Kafka 代理,这是发送和接收消息的端点。它使用内存表进行临时存储,这意味着数据在会话之间不会持久保存,这对于测试或短生命周期应用程序很有用。消息以原始形式处理,无需额外的序列化。
该应用程序定义了一个名为“greetings”的 Kafka 主题,该主题充当本示例的消息通道。设置了一个代理(Faust 中的一种特殊类型的协程)来持续监听此“greetings”主题。此代理(名为“greet”)异步处理来自“greetings”主题的每个传入消息。从“greetings”主题接收的每条消息都打印到控制台。
当此应用程序运行时,它会连接到 Kafka,监听指定主题上的消息,并在消息到达时处理它们。要查看其运行情况,请启动此应用程序的 Faust 工作程序
faust -A hello_world worker -l info
然后将测试消息发送到“greetings”主题
faust -A hello_world send greetings "Hello Kafka"
每条测试消息都会显示在控制台中,演示了该应用程序使用 Kafka 和 Faust 进行实时消息处理的能力。我们还可以打开 Kafka Docker 容器内的交互式 shell
docker exec -it first-kafka-1 /bin/sh
启动 Kafka 控制台消费者以从头开始读取主题中的所有消息,包括历史消息
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic greetings --from-beginning
带有 InfluxDB 和 Telegraf 的页面浏览量示例
“页面浏览量”示例展示了使用 Faust 实时统计页面浏览量。它从 Kafka 主题读取页面浏览事件,聚合每个页面的浏览量,并将这些计数存储在内存表中。对于每个页面浏览量,更新后的计数都会发布到另一个主题,从而允许消费者实时访问每个页面的最新浏览量。此示例突出了 Faust 处理有状态流处理和执行实时聚合的能力。telegraf.conf 配置为从 Kafka 主题使用 JSON 消息并将它们写入 InfluxDB。
import faust
# Initialize the Faust app
app = faust.App(
'page_views',
broker='kafka://localhost:9092',
topic_partitions=4,
)
# Define the structure of a page view event
class PageView(faust.Record):
id: str
user: str
# Topics
page_view_topic = app.topic('page_views', value_type=PageView)
page_view_count_topic = app.topic('page_view_counts')
# Table to maintain the count of views per page
page_views = app.Table('page_views', default=int)
# Agent to consume and process page views
@app.agent(page_view_topic)
async def count_page_views(views):
async for view in views.group_by(PageView.id):
# Increment the count for the specific page ID
page_views[view.id] += 1
# Create a JSON-like dictionary for the count message
count_message = {
"id": view.id,
"count": page_views[view.id]
}
# Send the JSON message to the page_view_count_topic
await page_view_count_topic.send(value=count_message)
if __name__ == '__main__':
app.main()
此 Faust 应用程序名为“page_views”,也连接到 localhost:9092 上运行的 Kafka 代理,但侧重于跟踪和计数每个页面的用户浏览量。它定义了一个 PageView 记录,该记录表示每个事件,并包含页面 ID 和用户的字段。
为此应用程序创建了两个 Kafka 主题:page_views 用于传入浏览事件,page_view_counts 用于发布聚合计数。该应用程序在临时内存表中维护这些计数,在内存表中,它按 ID 存储每个页面的总浏览量。
一个代理监听 page_views 主题,并通过递增特定页面 ID 的计数来处理每条消息。更新后,它会构造一个包含页面 ID 和计数的 JSON 消息,然后将其异步发送到 page_view_counts 主题。
启动此应用程序的 Faust 工作程序,它将实时使用以下命令来使用和聚合页面浏览量
faust -A page_views worker -l info
向 Kafka 中的 page_views 主题发送测试消息,page_views Faust 应用程序正在使用以下命令监听该主题
faust -A page_views send page_views '{"id": "foo", "user": "bar"}'
现在,我们准备好开始使用 Telegraf 将聚合页面浏览量计数写入 InfluxDB。要配置 Telegraf,我们需要向配置添加一些身份验证凭据,并指定我们希望如何解析来自主题的 JSON
[[inputs.kafka_consumer]]
brokers = ["localhost:9092"]
topics = ["page_view_counts"]
data_format = "json_v2"
[[inputs.kafka_consumer.json_v2]]
measurement_name = "view_count"
[[inputs.kafka_consumer.json_v2.field]]
path = "id"
type = "string"
[[inputs.kafka_consumer.json_v2.field]]
path = "count"
type = "int"
[[outputs.influxdb_v2]]
urls = ["your host url i.e. https://us-east-1-1.aws.cloud2.influxdata.com/"]
# place your influxdb token and org ID here
token = "your token"
organization = "your org ID"
bucket = "views"
此 Telegraf 配置定义了一个 kafka_consumer 输入插件,用于使用来自 Kafka 主题 page_view_counts 的 JSON 消息,并提取两个字段:ID 作为字符串,count 作为整数,并将它们存储在 InfluxDB 测量值 view_count 中。influxdb_v2 输出插件指定了目标 InfluxDB 存储桶(views)以及必要的连接详细信息,包括 URL、令牌和组织。完整的 config 还会将行协议值写入 stdout 以进行验证。 页面浏览量示例中使用的 Telegraf 插件示意图。
现在,您可以继续向主题发布新消息,观看计数聚合,并在 InfluxDB 中可视化结果。
工业物联网的数据管道传输——机器学习是必要的吗?
在工业物联网 (IIoT) 中,数据管道传输对于实时监控和控制复杂的工业过程至关重要。一个典型的例子是连续搅拌釜式反应器 (CSTR),这是一种化学反应器,广泛用于各种行业——包括化学制造、制药、食品和饮料生产、生物技术、能源和环境工程。CSTR 有助于连续反应,其中反应物稳定地送入反应器,产品不断地移除,使其对于聚合物生产、发酵、生物柴油合成和废水处理等过程至关重要。
在 PID 控制器和 InfluxDB 第 1 部分 和 第 2 部分 中,我们创建了一个与比例-积分-微分 (PID) 控制器集成的 CSTR 数字孪生体。PID 控制器充当反馈机制,持续调整输入以维持所需的输出水平,确保过程的稳定性和精度。我们模拟了一级反应,该反应将化学物质 A 转化为化学物质 B——类似于现实世界中的反应,如酯的水解或乙醇的生产。
在实际应用中,传感器跟踪这些值并调整它们以维持最佳运行条件。此示例强调,通常可以通过微分方程和精确的模拟来实现工业物联网中有效的建模和维护,而无需总是求助于机器学习或深度学习解决方案。鉴于工业过程的明确性质,传统的建模技术通常提供必要的精度和洞察力,突出了基础工程原理在数字转型时代的持久价值。
最后的想法和资源
由于 Faust-streaming 由社区维护,您可能会发现它缺少功能,或者更喜欢具有更强大支持和安全保证的解决方案。我推荐 流处理框架比较第 1 部分 和 第 2 部分,以获得各种选项及其优缺点的良好概述。您可能还会喜欢以下资源,这些资源重点介绍了如何将 InfluxDB 与其他流处理工具一起使用
与往常一样,请在此处开始使用 InfluxDB v3 Cloud。如果您需要帮助,请在我们的 社区站点 或 Slack 频道 上联系我们。如果您也在使用 InfluxDB 进行数据处理项目,我很乐意收到您的来信!