MQTT 与 Kafka:物联网倡导者的视角(第 3 部分——天作之合)
作者:Jay Clifford / 产品,用例
2022 年 6 月 28 日
导航至
所以,我们就在这里……最后一章。在本系列的第二部分中,我们开始深入探讨使 Kafka 变得出色的概念。我们得出结论,尽管 MQTT 和 Kafka 之间的术语相似(例如主题),但在底层它们的行为却截然不同。我们还简要概述了 Kafka Connect 以及我们如何使用一些企业级连接器将数据流式传输到其他平台。
然而,我们确实了解到 Kafka 存在一些不足
-
Kafka 是为稳定网络和良好基础设施部署而构建的。
-
它没有部署关键数据交付功能,如 Keep-Alive 和 Last Will。
此时 MQTT 出现来拯救!
正如在最后一篇博客文章中所承诺的,我们将创建一个 MQTT、Kafka 和 InfluxDB 之间的完整混合架构。
在我们开始之前,这里有一些准备工作
所以,这是我的计划:
在这个架构中,我们将
-
将我们的原始生成器数据直接发送到 MQTT 代理(Kafka MQTT Proxy)。我们将继续使用第二部分中的主题分层。
-
Kafka MQTT 代理将数据桥接到我们的 Kafka 集群。
-
然后,我们将使用 InfluxDB 同步连接器将数据发送到InfluxDB Cloud(如果您托管,则为 InfluxDB OSS)。
-
从那里,我们将使用 InfluxDB 源连接器和 Flux 的组合来下采样和聚合我们的数据(在将转换后的数据发送回新的 Kafka 主题以供进一步消费之前)。
有很多事情要做,但谁不喜欢挑战呢!让我们开始吧。
生成器模拟器 + MQTT 代理
在 InfluxDB Roadshow 之后,我对生成器模拟器做了一些酷的改动
-
对输出的结果添加了更好的逻辑
-
添加了一些新字段:负载 + 功率
-
添加了 Docker 支持
您可以在此处找到第 3 部分的存储库。
现在,让我们来看看一个大变化……我们已经从架构中移除了我们的 Mosquitto 代理。我知道您在想什么。您在哪里连接您的 MQTT 客户端?让我向您介绍MQTT 代理。
MQTT代理到底是什么呢?我必须承认,文档有点简略(我花了一个小时来摸索,才真正理解它是如何工作的)。所以我觉得解释一下我的想法和它实际的工作方式会很有趣。
我以为它的工作方式
它实际的工作方式
很酷,对吧?所以我以为MQTT代理本质上是一个秘密的MQTT客户端,它会连接到你的当前代理。然后你指定你想拉取的主题,然后就会发生一些魔法般的事情,将负载映射到Kafka主题。但实际上,MQTT代理本质上是一个带有Kafka功能的秘密MQTT代理。这允许我们的MQTT客户端直接连接到它。
配置
现在我们已经有了点背景知识,让我们来谈谈配置。在我的无限智慧中,我决定用Docker容器来设置这个架构。如果我能回到过去,我就会直接安装本地的Confluent Kafka,然后就可以了。通过一线希望,我现在可以和你分享我的配置!
version: '3.9'
services:
generators:
image: emergency-generator:latest
environment:
- GENERATORS=3
- BROKER=kafka-mqtt-proxy
networks:
- mqtt
kafka-mqtt-proxy:
image: confluentinc/cp-kafka-mqtt
networks:
- mqtt
environment:
- KAFKA_MQTT_BOOTSTRAP_SERVERS=pkc-l6wr6.europe-west2.gcp.confluent.cloud:9092
- KAFKA_MQTT_KEY_CONVERTER=org.apache.kafka.connect.storage.StringConverter
- KAFKA_MQTT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
- KAFKA_MQTT_VALUE_CONVERTER_SCHEMAS_ENABLE=false
- KAFKA_MQTT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https
- KAFKA_MQTT_SECURITY_PROTOCOL=SASL_SSL
- KAFKA_MQTT_SASL_MECHANISM=PLAIN
- KAFKA_MQTT_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username="######" password="######";
- KAFKA_MQTT_REQUEST_TIMEOUT_MS=20000
- KAFKA_MQTT_RETRY_BACKOFF_MS=500
- KAFKA_MQTT_PRODUCER_BOOTSTRAP_SERVERS=pkc-l6wr6.europe-west2.gcp.confluent.cloud:9092
- KAFKA_MQTT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https
- KAFKA_MQTT_PRODUCER_SECURITY_PROTOCOL=SASL_SSL
- KAFKA_MQTT_PRODUCER_SASL_MECHANISM=PLAIN
- KAFKA_MQTT_PRODUCER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="######" password="######";
- KAFKA_MQTT_PRODUCER_REQUEST_TIMEOUT_MS=20000
- KAFKA_MQTT_PRODUCER_RETRY_BACKOFF_MS=500
- KAFKA_MQTT_OFFSET_FLUSH_INTERVAL_MS=10000
- KAFKA_MQTT_OFFSET_STORAGE_FILE_FILENAME=/tmp/connect.offsets
- KAFKA_MQTT_LISTENERS=0.0.0.0:1883
- KAFKA_MQTT_TOPIC_REGEX_LIST=emergency_generators:.*
- KAFKA_MQTT_CONFLUENT_TOPIC_REPLICATION_FACTOR=1
networks:
mqtt:
name: mqtt
所以为了解释一下,我的Kafka集群仍然由Confluent Cloud托管和维护。我在我的笔记本电脑上运行这个docker-compose。所以本质上,我们有一个混合架构,通过MQTT代理实现边缘到云的通信。让我们分析一下docker-compose文件
generators: 这是我们生成器模拟器,我不会在这里花费太多时间。环境变量允许你选择MQTT客户端想要连接到的代理,以及要启动的生成器的数量。
Kafka-mqtt-proxy: 如果你熟悉Kafka配置,你知道你通常必须定义像这样的属性文件
properties
topic.regex.list=temperature:.*temperature, brightness:.*brightness
listeners=0.0.0.0:1883
bootstrap.servers=PLAINTEXT://127.0.0.1:9092
confluent.topic.replication.factor=1
当处理Confluent Kafka容器时,这会稍微抽象一些。相反,我们必须将它们定义为环境变量。我将在这里链接文档这里。一般来说
- 全大写
- 被_替换
- 所有环境变量都附加了服务名称。文档会在这里指导你,但在我们的情况下是:KAFKA_MQTT
现在进入环境变量的内容。我将将它们分为两个类别,因为大部分使用环境变量都是用于连接和验证Confluent Cloud
-
Confluent Cloud身份验证:其中大部分可以由Confluent Cloud自动生成。然后你需要将属性转换为Docker环境变量格式,如果你使用Docker。或者,你最好使用我的环境变量,并替换以下内容
- KAFKA_MQTT_BOOTSTRAP_SERVERS
- KAFKA_MQTT_SASL_JAAS_CONFIG(用户名+密码)
- KAFKA_MQTT_PRODUCER_SASL_JAAS_CONFIG(用户名+密码)
-
MQTT代理:这里有很多参数可以添加。我将在这里留下完整的列表这里。对于这个博客,我们使用的是基本配置
- KAFKA_MQTT_LISTENERS:代理将监听哪个端口上的客户端。在我们的例子中,我们使用未经身份验证的1883。你可以使用带有适当配置的TLS 8883。
- Kafka_MQTT 主题正则表达式列表:这是 MQTT 主题如何映射到 Kafka 主题。一般规则
<KAFKA_TOPIC_NAME>:.<MQTT_REGEX>
。尽管这并不是一个好的做法,但我建议如果可能的话,从根级别开始使用通配符,以确保收集数据,然后根据需要缩小主题范围。 - Kafka_MQTT_CONFLUENT 主题复制因子:你的有效负载应该复制的次数。在生产级别,建议至少复制 3 次。
InfluxDB 同步连接器 -> InfluxDB -> Influx 源连接器
因此,我们已经启动并运行了我们物联网数据管道架构的第一个阶段。现在我们可以开始更加富有创意地处理我们的数据。让我们深入一层原始图表,看看我们试图实现什么:
因此,在这个架构中,InfluxDB 已经发展成为不仅仅是一个数据存储。以下是数据旅程
- 首先,我们使用 InfluxDB 同步连接器将数据写入 InfluxDB 桶 Kafka_raw。因此,根据你的有效负载格式,你可以定义哪些字段应该是标签,但我们对于 MQTT 代理有些限制。我们将把这个留给一个 Flux 任务。
- 接下来,我们将使用一个 Flux 任务来收集我们新写入的原始桶中的数据并执行以下 enrichments 任务
- 我们通过选择自上次任务运行以来每个生成器的最后一个样本来聚合我们的数据。
- 然后我们创建一个名为 alarm 的新字段。我们将根据每个生成器的当前油位来计算警报级别。
- 我们将设置一个新的标签 called region。
- 我们将将富集的数据存储在我们的第二个桶 Kafka_downsampled 中。
- 最后,我们将使用 InfluxDB 源查询 Kafka_downsampled 中的数据并将其发布到新的 Kafka 主题以供进一步消费。
配置
让我们一起摇滚吧。
InfluxDB 同步连接器
这部分相当直接
- 我们选择 InfluxDB 2 源插件。
- 填写论坛以构建连接器属性。它应该看起来像这样
{ "name": "InfluxDB2SinkConnector_0", "config": { "topics": "emergency_generators", "input.data.format": "JSON", "connector.class": "InfluxDB2Sink", "name": "InfluxDB2SinkConnector_0", "kafka.auth.mode": "KAFKA_API_KEY", "influxdb.url": "https://us-east-1-1.aws.cloud2.influxdata.com", "influxdb.org.id": "05ea551cd21fb6e4", "influxdb.bucket": "kafka", "measurement.name.format": "genData", "tasks.max": "1" } }
- 你现在应该能看到数据出现在你的 InfluxDB 桶中。如果你看到数据进入 DLQ,这通常是因为选择了错误的数据格式。
Flux 任务
现在我们可以开始丰富我们的数据
- 创建一个新任务。
- 让我们给它一个名字和一个触发时间。我们希望我们的聚合数据尽可能快,所以我选择了 1 分钟的间隔。
- 现在让我们创建任务
import "influxdata/influxdb/tasks"
option task = {name: "kafka_downsample", every: 1m, offset: 0s}
from(bucket: "kafka")
|> range(start: tasks.lastSuccess(orTime: -1h))
|> filter(fn: (r) => r["_measurement"] == "genData")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> group(columns: ["generatorID"], mode: "by")
|> last(column: "fuel")
|> map(fn: (r) => ({r with alarm: if r.fuel < 500 then "refuel" else "no action"}))
|> set(key: "region", value: "US")
|> to(
bucket: "kafka_downsampled",
tagColumns: ["generatorID", "region"],
fieldFn: (r) =>
({
"alarm": r.alarm,
"fuel": r.fuel,
"lat": r.lat,
"lon": r.lon,
"power": r.power,
"load": r.load,
"temperature": r.temperature,
}),
)
快速概述
-
我们使用 range() + tasks.lastSuccess 收集自上次任务运行以来的一系列数据。如果是第一次运行,我们默认为静态值 (-1h)
-
接下来,我们筛选出所有在测量 genData 下的数据。
-
Pivot() 用于将存储的值从垂直格式转换为水平格式,类似于使用 SQL 数据库(这对于使用我们的 map 函数非常重要)。
-
我们通过 generatorID 列 group() 来分组。这将每个生成器的数据分离到其自己的表中。
-
Last() 将选择并返回每个表的最后一行。
-
然后我们使用 map() 和一些条件逻辑来检查当前的油位。我们创建一个名为 alarm 的新列。警报值将根据条件逻辑填充。
-
set() 允许我们创建一个新列并手动为每行填充相同的值。
-
最后,to() 允许我们将数据传输到 kafka_downsampled。我们提供了一些映射逻辑来定义哪些列是字段和标签。
InfluxDB 源连接器
最后,让我们将数据流回 Kafka 主题
- 我们选择了 InfluxDB 2 源插件。
- 填写论坛以构建连接器属性。它应该看起来像这样
{ "name": "InfluxDB2SourceConnector_0", "config": { "connector.class": "InfluxDB2Source", "name": "InfluxDB2SourceConnector_0", "kafka.auth.mode": "KAFKA_API_KEY", "influxdb.url": "https://us-east-1-1.aws.cloud2.influxdata.com", "influxdb.org.id": "05ea551cd21fb6e4", "influxdb.bucket": "kafka_downsampled", "mode": "timestamp", "topic.mapper": "bucket", "topic.prefix": "EG", "output.data.format": "JSON", "tasks.max": "1" } }
- 现在,我们应该能够通过主题消息面板看到从我们的存储桶中拉取的数据。注意,我们在源配置中启用了一个非常酷的功能“模式:时间戳”。这将检查发送到 Kafka 主题的最后一个时间戳,并且只返回此时间戳之后的样本。您还可以使用批量模式。
结果和展望
虽然视觉效果不佳,但以下是结果
原始数据
丰富数据
我认为这个影响的含义非常有趣
-
这表明了一种架构,其中 InfluxDB 不是您数据旅程的最终步骤。
-
我们展示了一个解耦架构的绝佳例子,我们可以在不修改架构的其他阶段的情况下,修改 InfluxDB 中数据转换的行为。
-
综上所述,这个非常基础的例子在规划之后仅花费了 1-2 个小时。在设置这个架构时,代码量也非常少。
所以我们做到了!我们已经完成了关于 MQTT 与 Kafka 的三篇系列文章的最后部分。编写这个系列博客让我学到了很多东西!第一个重大发现是标题应该改为:MQTT 与 Kafka -> MQTT & Kafka:物联网的混合架构。非常认真地,让我们来总结一下
- 在第一部分中,我们学习了基础知识,并尝试直接比较 MQTT 和 Kafka。从表面上看,它们具有相似的术语
- 主题和代理
- 发布者和订阅者
- 在第一部分中,我们还明显地得出结论,将 Kafka 视为 MQTT 等同于用法拉利引擎为电动滑板车供电。我们没有充分利用 Kafka 这样一个有趣的流式协议的特性。
- 在第二部分中,我们更深入地探讨了 MQTT 和 Kafka 之间的差异。例如,我们深入探讨了主题和分区的概念。我们还通过 Kafka Connect 学习了企业连接器。
- 我们还得出结论,虽然 Kafka 具有很多强大功能,但在物联网架构中也有一些不足之处
- Kafka 是为稳定网络和良好基础设施部署而构建的。
- Kafka 不部署关键数据交付功能,如 Keep-Alive 和 Last Will。
我们已经构建了一个架构,它保留了 MQTT 为物联网架构提供的优势(轻量级、为低连接性而构建、有效地处理数千个连接)。然后,我们通过 Kafka 推荐与 MQTT 相结合,提供高可用性、高吞吐量和企业级连接器。最后,我们展示了如何将 InfluxDB 包含进来,通过提供扩展存储和数据转换功能来增强我们的物联网数据管道。
所以我希望您喜欢这个系列,就像我喜欢写它一样。我坚信,学习新事物的唯一方法就是尝试!理论只能带你走这么远。我还想分享社区中的一小点见解,我认为它真的很棒。有一家公司叫 WaterStream,它正在提供 Kafka 和 MQTT 之间的进一步集成。他们做了一个超级酷的演示,值得一看!
下次再聊!从我们的社区仓库下载代码,并告诉我您的想法。谁知道呢?也许我们能够根据足够的反馈启动第四部分的社区版。
一些思考...
在上面的例子中,我们大量依赖Kafka基础设施来传输数据。关键的是,Kafka为我们从边缘到云的数据传输提供了一种可行的方式。在某些情况下,我们可能不会这么幸运。让我们看看一种替代架构
上述架构是通过InfluxDB的一个新功能“边缘数据复制”实现的。边缘数据复制功能使我们能够在将数据写入本地InfluxDB之前,将其自动传输到InfluxDB云。这对于以下用例非常有用:
-
边缘基础设施已经存在。您可能已经在边缘使用InfluxDB,但希望有一种简单的方法将您的架构扩展到云。
-
您需要本地存储和数据的可见性。
我们仍然可以在云中保留我们的Kafka解决方案,但这使得InfluxDB能够提供从边缘到云的数据传输的骨干。