MQTT与Kafka:物联网倡导者的视角(第二部分——强大的Kafka)
作者:Jay Clifford / 用例,开发者
2022年4月25日
导航到
在本系列的第一部分中,我们开始比较在物联网基础设施中Kafka和MQTT的用法。结论是在物联网设备的基本发布和订阅模型中,Kafka可能有些过度。然而,我们也了解到Kafka有一些我们希望在物联网架构中使用的有价值的特性。
- 模式设计
- 高可用性
- 数据持久性和持久性
在这篇文章中,我想重点介绍重新设计我们的上一个演示,以便使用一些我们忽略的核心Kafka功能(主题设计和分区),以及引入新的主题(不是字面意思)如第三方集成。
在我们开始之前,这里有一些整理工作要做
主题和分区
在我们的原始演示中,我们的主题结构如下所示
基本上无论我们启动多少紧急发电机,我们的有效载荷都由同一个主题处理。在有限数量的发电机下,这是合理的。然而,这也留下了一些明显的问题
- 消费者必须引入自己的逻辑来处理/忽略有效载荷。例如,一个消费者可能只对处理1-2号引擎的数据或基于特定地区的数据感兴趣。
- 性能也会受到影响,因为我们的消费者将不得不处理所有消息,而不是特定的子集。
那么我们有什么不同的方法吗?
MQTT
在MQTT中,我们可以建立主题层次结构。HiveMQ团队撰写了一篇优秀的博客文章,概述了构建主题时的一些“该做”和“不该做”的事情
- 切勿使用开头的正斜杠
- 主题中不要使用空格
- 不要订阅#
- 不要忘记可扩展性
- 使用特定主题,而不是通用主题
我强烈推荐查看这篇帖子。
那么,让我们将一些逻辑应用到我们的应用程序中。目前,我们的有效载荷看起来像这样
{'generatorID': 'generator4', 'lat': 40.65995, 'lon': -111.99633, 'temperature': 74, 'pressure': 198, 'fuel': 901}
我们的主题如下
emergency_generator
由于我们在有效载荷中有了generatorID,我们可以考虑将其附加为新的主题层
def publish_to_topic(self, topic: str, data: dict):
topic = topic +"/"+ str(data["generatorID"])
message = json.dumps(data)
self.client.publish(topic, message)
这给了我们emergency_generator主题及其下面的后续层
emergency_generator/generator1
emergency_generator/generator2
emergency_generator/generator3
使用这种层次结构结构提供了设备(在我们的情况下是发电机)之间自然划分的额外好处。我们能否实现与Kafka相似的架构?是否有额外的优势?
Kafka
Kafka以一种简单而强大的方式自然地隔离我们的数据流。让我们再次查看这段代码
self.p.produce(topic=topic, key=str(uuid4()), value=data, on_delivery=delivery_report)
作为数据发布的一部分,我们分配一个键。这个键被分配给我们的数据作为键值对。虽然这个键是可选的,但它与分区结合时非常强大。我们将在下一部分详细介绍,但首先让我们更新我们的键,使用generatorID。
self.p.produce(topic=topic, key=str(data['generatorID']), value=data, on_delivery=delivery_report)
现在有趣的部分来了,我们在本博客系列的第1部分中提到,Kafka在以高效方式处理大量数据方面部署了一些独特的功能。这就是分区发挥作用的地方。你可以将分区想象成记录(我们的数据)的简单队列。分区的好处是提供记录的一致排序和并行性以提高性能,但这是如何实现的?这就是我们的键发挥作用的地方。一个很好的想象方式是机场入境排队,你的护照就是键。通常有一个国家公民的队列(分区)、快速通道如欧洲护照以及所有护照的队列。所有护照持有者都将进入各自的队列进行办理。这相当于我们的Kafka架构,Generator1记录将始终进入例如分区1,Generator2分区2。尽管键和分区之间通常不是一一对应的关系,但随着规模的扩大,它们有一些很好的好处。包括
- 主题并行性:假设我们的发电机每秒产生1000条记录。我们有一个微服务试图消费这些消息并执行一些逻辑。我们知道微服务每秒只能处理500条记录,所以我们的消费者永远无法与我们的生产者保持一致性。相反,使用分区,我们可以启动第二个或第三个微服务实例来并行消费这些记录的一部分。从而实现更高的吞吐量。
- 主题复制:这允许我们为保留记录副本的分区预留空间。由于我们的主题能够在许多服务器(如Kubernetes)上分布,这有助于防止因故障而造成的数据丢失。如果一个分区不可达,那么我们的消费者将简单地从备份分区中获取记录。
正如你所看到的,我们开始利用一些将Kafka与其他pub-sub消息协议区分开来的强大功能。
Kafka还能做什么?
Apache Kafka是一种企业消息协议,它了解它需要适应现有的生态系统才能有效。这就是Kafka Connect发挥作用的地方。
Kafka Connect 提供了一系列功能强大的预制连接器,适用于知名的企业系统和数据存储引擎,如 InfluxDB。因此,既然我有偏见,让我们使用 InfluxDB 插件来讨论源和目标。
Kafka Connect 将连接器分为两大类
- 源:我们可以从中提取数据并将其写入 Kafka 主题的地方。
- 同步:我们可以从中写入数据到主题的地方。
我们现在暂时将 InfluxDB 源放在一边(剧透一下:我将在博客系列的第三部分中使用它)。让我们设置一个 InfluxDB 同步。
注意:为了方便演示,我使用的是 Confluent Cloud:https://confluent.cloud/
- 在 数据集成 下,前往 连接器 并搜索 InfluxDB。
- 选择 InfluxDB 2 源
- 现在我们可以开始配置连接器了。这相当标准,以下是我的配置截图:
具体如下
- 我们希望订阅的主题。在我们的例子中,只有一个,emergency_generators。
- 最大任务数,这与我们之前讨论的微服务概念相当。由于我们使用的是 Confluent Cloud 的免费版,所以我们被限制在一个。
- Kafka 服务身份验证,用于安全(阻止恶意服务尝试连接到您的数据流)。
- 我们希望指向的 InfluxDB 实例。
- 我们的 InfluxDB 实例的组织。
- 我们存储数据的数据桶。
- 写入精度(秒、毫秒等)。
- 测量格式。我们选择使用主题名称作为我们的 InfluxDB 测量名称。
您现在应该能够部署连接器了。然后我们可以在流线图中查看数据流。
InfluxDB 是 Kafka Connect 的许多连接器之一。我们的 JSON 有效负载相对简单,这意味着它可以直接转换为接受 JSON 有效负载结构的许多连接器。然而,Kafka 还可以应用简单的 转换 到数据有效负载,以帮助提高互操作性。
展望和结论
我们现在对 MQTT 和 Kafka 的部署架构都有了一些更深入的了解。尽管 MQTT 和 Kafka 都使用主题来描述消息有效负载的容器,但我们现在看到它们在内部是相当不同的。从我的角度来看,我现在可以看到使用 Kafka 分区和记录键的性能影响。我们还探讨了使用 InfluxDB 提供的企业级连接器的功能。考虑到所有这些,这仍然不能抵消 Kafka 仅 IoT 解决方案的一些不足。
- Kafka 是为稳定网络和良好的基础设施部署而构建的。
- 它不部署诸如 Keep-Alive 和 Last Will 等关键数据传输功能。
MQTT 擅长的地方。那么接下来怎么办?
加入我,查看本系列的最后一篇博客 《MQTT 与 Kafka IoT:倡导者的视角,第三部分》,我们将结合所学知识,使用真实的 IoT 用例创建一个真正的混合 IoT 架构。我们将了解 Kafka 的 MQTT 代理,并进一步探索 Kafka 的企业级连接器,如 InfluxDB 源。