MQTT 与 Kafka:物联网倡导者的视角(第二部分 – 强大的 Kafka)
作者:Jay Clifford / 用例, 开发者
2022 年 4 月 25 日
导航至
在本系列的第一部分中,我们开始比较 Kafka 和 MQTT 在物联网基础设施中的应用。结论是,在物联网设备的基本发布-订阅模型中,Kafka 可能 просто 过于复杂。但是,我们也了解到 Kafka 具有一些我们希望在物联网架构中利用的宝贵功能
- 模式设计
- 高可用性
- 数据持久性和耐用性
在这篇文章中,我想重点重新设计我们上次的演示,以利用我们忽略的一些核心 Kafka 功能(主题设计和分区),并介绍新主题(双关语)如第三方集成。
在我们开始之前,这里有一些小的说明
主题和分区
在我们最初的演示中,我们的主题结构如下所示
基本上,无论我们启动多少个应急发电机,我们的有效负载都由同一个主题处理。在有限数量的发电机下,这是合理的。但是,这确实给我们留下了一些明显的问题
- 消费者必须暗示自己的逻辑来处理/忽略有效负载。例如,消费者可能只对处理来自引擎 1-2 或基于特定区域的数据感兴趣。
- 性能也会受到影响,因为我们的消费者将不得不处理所有消息,而不是特定的子集。
那么我们可以做些什么不同的事情呢?
MQTT
因此,在 MQTT 中,我们可以建立主题层次结构。HiveMQ 团队撰写了一篇很棒的博客文章,概述了构建主题时的一些注意事项
- 永远不要使用前导斜杠
- 永远不要在主题中使用空格
- 不要订阅 #
- 不要忘记可扩展性
- 使用具体的主题,而不是一般的主题
我强烈建议查看这篇文章。
因此,让我们将一些逻辑部署到我们的应用程序中。目前,我们的有效负载如下所示
{'generatorID': 'generator4', 'lat': 40.65995, 'lon': -111.99633, 'temperature': 74, 'pressure': 198, 'fuel': 901}
我们的主题如下
emergency_generator
由于我们在有效负载中具有 generatorID,因此我们可以考虑将其作为新的主题层附加
def publish_to_topic(self, topic: str, data: dict):
topic = topic +"/"+ str(data["generatorID"])
message = json.dumps(data)
self.client.publish(topic, message)
这为我们提供了 emergency_generator 主题以及下面的后续层
emergency_generator/generator1
emergency_generator/generator2
emergency_generator/generator3
使用这种层次结构提供了设备(在我们的例子中是发电机)之间自然划分的额外好处。我们可以在 Kafka 中实现类似的架构吗?是否有额外的好处?
Kafka
Kafka 有一种非常简单而强大的方法可以自然地隔离我们的数据流。让我们再次看一下这部分代码
self.p.produce(topic=topic, key=str(uuid4()), value=data, on_delivery=delivery_report)
作为数据发布的一部分,我们分配一个键。此键作为键值对分配给我们的数据。虽然此键是可选的,但当与分区结合使用时,它非常强大。我们将在稍后讨论这个问题,但首先让我们更新我们的键以使用我们的 generatorID 代替。
self.p.produce(topic=topic, key=str(data['generatorID']), value=data, on_delivery=delivery_report)
现在到了有趣的部分,我们在本博客系列的第一部分中说过,Kafka 部署了一些独特的功能,可以高效地处理大量数据。这就是分区发挥作用的地方。您可以将分区想象成记录(我们的数据)的简单队列。分区的好处是可以为性能目的提供记录和并行性的一致排序,但如何实现呢?这就是我们的键发挥作用的地方。一个很好的想象方式是机场移民队列,而您的护照就是键。通常,对于国民、快速通道(如欧洲护照)和所有护照都存在队列(分区)。所有护照持有人将进入各自的队列进行处理。这相当于我们的 Kafka 架构。例如,Generator1 记录将始终进入分区 1,Generator2 进入分区 2。虽然键和分区之间通常不是一对一的关系,但随着我们扩展,会有一些很大的好处。它们包括
- 主题并行性:假设我们的发电机每秒生成 1000 条记录。我们有一个微服务试图消费这些消息并执行一些逻辑。我们知道微服务每秒只能处理 500 条记录,因此我们的消费者永远无法与我们的生产者保持一致性。相反,使用分区,我们可以启动第二个甚至第三个微服务实例,以彼此并行地消费这些记录的一部分。实现更高的吞吐量。
- 主题复制:这允许我们保留分区来保存我们记录的副本。由于我们的主题能够分布在许多服务器上(如 Kubernetes),这有助于防止由于中断造成的数据丢失。如果一个分区无法访问,那么我们的消费者将 просто 从备份分区中获取记录。
如您所见,我们开始利用一些强大的功能,这些功能使 Kafka 与其他发布-订阅消息传递协议区分开来。
Kafka 还能做什么?
Apache Kafka 是一种企业消息传递协议,它了解它需要适应预先存在的生态系统才能有效。这就是 Kafka Connect 的用武之地。
Kafka Connect 为知名的企业系统和数据存储引擎(如 InfluxDB)提供了强大的预制连接器目录。因此,由于我存在偏见,让我们讨论一下使用 InfluxDB 插件的源和接收器。
Kafka Connect 将连接器分为两类
- 源:我们可以从中提取数据并将其写入 Kafka 主题的位置。
- 接收器:我们可以将数据从主题写入到的位置。
我们现在将暂停 InfluxDB 源(剧透警告,我将在本博客系列的第 3 部分中使用它)。让我们设置一个 InfluxDB 接收器
注意:为了便于演示,我正在使用 Confluent Cloud:https://confluent.cloud/
- 在数据集成下,转到连接器并搜索 InfluxDB。
- 选择 InfluxDB 2 接收器
- 现在我们可以开始配置我们的连接器。这非常标准,这是我的快照:
细分如下
- 我们希望订阅的主题。在我们的例子中,只有一个,emergency_generators
- 最大任务数,这相当于我们之前讨论的微服务概念。由于我们使用的是 Confluent Cloud 的免费层,因此我们仅限于一个。
- Kafka 服务身份验证,用于安全(阻止恶意服务尝试连接到您的数据流)。
- 我们希望指向的 InfluxDB 实例。
- 我们的 InfluxDB 实例的组织。
- 我们存储数据的存储桶。
- 写入精度(秒、毫秒等)。
- 测量格式。我们选择使用主题名称作为我们的 InfluxDB 测量名称。
您现在应该能够部署连接器。然后我们可以在流沿袭中查看数据流。
InfluxDB 只是 Kafka Connect 拥有的众多连接器之一。我们的 JSON 有效负载相对简单,这意味着它可以直接转换为许多接受 JSON 有效负载结构的连接器。但是,Kafka 还可以对数据有效负载应用简单的转换,用于即时转换,有助于提高互操作性。
展望与结论
我们现在更深入地研究了 MQTT 和 Kafka 的部署架构。虽然 MQTT 和 Kafka 都使用主题来描述消息有效负载的容器,但我们现在看到,在底层,它们截然不同。从我的角度来看,我现在可以看到使用 Kafka 分区和记录键的性能影响。我们还探索了使用 InfluxDB 提供的企业级连接器的能力。考虑到所有这些,这仍然无法弥补仅 Kafka 物联网解决方案的一些缺点
- Kafka 是为部署良好基础设施的稳定网络而构建的
- 它不部署关键的数据交付功能,如 Keep-Alive 和 Last Will
MQTT 在这些方面表现出色。那么接下来呢?
请加入我的本系列最后一篇博客,我们将在其中结合我们所学的知识,使用真实的物联网用例创建一个真正的混合物联网架构。我们将了解 Kafka 的 MQTT 代理,并进一步探索 Kafka 的企业连接器,如 InfluxDB 源。