MQTT vs Kafka:物联网倡导者的观点(第一部分 - 基础知识)
作者:Jay Clifford / 用例, 开发者
2022 年 4 月 20 日
导航至
随着 Kafka Summit 大会临近,我认为是时候亲自动手看看它到底是怎么回事了。作为物联网的倡导者,我听说过 Kafka,但过于专注于 MQTT 等协议,而没有进一步研究。对于像我这样不熟悉的人来说,这两种协议看起来非常相似,甚至几乎是相互竞争的。然而,我已经了解到事实远非如此,实际上,在许多情况下,它们是互补的。
在本博客系列中,我希望总结 Kafka 和 MQTT 是什么,以及它们如何融入物联网架构。为了帮助解释一些概念,我认为使用过去的情景会很实用
在之前的博客中,我们讨论了一个我们想要监控应急燃料发电机的场景。我们使用 InfluxDB Python 客户端库创建了一个模拟器,将发电机数据发送到 InfluxDB Cloud。对于这篇博客,我决定重用该模拟器,但将客户端库替换为 MQTT 发布者和 Kafka 生产者,以了解每个协议背后的核心机制。
您可以在此处找到此演示的代码。
了解基础知识
那么 Kafka 是什么?Kafka 被描述为一个事件流平台。它符合发布者-订阅者架构,并增加了数据持久化的好处(要了解更多基本原理,请查看这篇博客)。Kafka 还在物联网领域推广了一些非常棒的优势
- 高吞吐量
- 高可用性
- 连接到知名的第三方平台
那么为什么我不直接使用 Kafka 构建我的整个物联网平台呢?嗯,这归结为几个关键问题
- Kafka 是为部署良好基础设施的稳定网络而构建的
- 它不部署关键的数据交付功能,例如 Keep-Alive 和 Last Will
话虽如此,让我们继续比较一下编写基本 Kafka 生产者的实现,并将其与应急发电机演示上下文中的 MQTT 发布者进行比较
假设: 为了本演示的目的,我将使用 Mosquitto MQTT Broker 和 Confluent 平台 (Kafka)。我们不会在此处介绍初始创建/设置,但您可以相应地查阅这些说明
- Mosquito Broker
- Confluent(我强烈建议您使用 Confluent Cloud 的免费试用版来检查 Kafka 是否适合您,然后再让自己陷入本地部署的困境)
初始化
让我们从 MQTT 发布者和 Kafka 生产者的初始化开始
MQTT
MQTT 发布者的最低要求(省略安全性)如下
- 主机:托管 Mosquitto 服务器的平台的地址/IP
- 端口:MQTT 生产者将与之通信的端口。基本连接通常为 1883,TLS 为 8883。
- Keep Alive 间隔:通信之间允许的时间量(秒)。
self.client.connect(host=self.mqttBroker,port=self.port, keepalive=MQTT_KEEPALIVE_INTERVAL)
Kafka
在 Kafka 方面,需要做更多后台工作。我们必须建立与两个不同 Kafka 实体的连接
- Kafka 集群:这是给定的,我们将在这里发送我们的有效负载。
- Schema 注册表:注册表位于 Kafka 集群的范围之外。它处理主题模式的存储和交付。换句话说,这迫使生产者以 Kafka 消费者期望的格式交付数据。稍后详细介绍。
那么让我们设置与这两个实体的连接
Schema 注册表
schema_registry_conf = {'url': 'https://psrc-8vyvr.eu-central-1.aws.confluent.cloud',
'basic.auth.user.info': <USERNAME>:<PASSWORD>'}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
分解
- url:您的 schema 注册表的地址。Confluent 支持创建注册表以进行托管。
- authentication:与任何存储库一样,它包含基本安全性以确保您的 schema 设计安全。
Kafka 集群
self.json_serializer = JSONSerializer(self.schema_str, schema_registry_client, engine_to_dict)
self.p = SerializingProducer({
'bootstrap.servers': 'pkc-41wq6.eu-west-2.aws.confluent.cloud:9092',
'sasl.mechanism': 'PLAIN',
'security.protocol': 'SASL_SSL',
'sasl.username': '######',
'sasl.password': '######',
'error_cb': error_cb,
'key.serializer': StringSerializer('utf_8'),
'value.serializer': self.json_serializer
})
分解
- bootstrap.servers:简而言之,地址指向托管我们 Kafka 集群的 Confluent Cloud;更具体地说,是 Kafka broker。(Kafka 也有 broker 的概念,但按主题划分)。Bootstrap 是指生产者在全球集群中建立其存在。
- sasl.*:简单安全身份验证协议;这些是连接到 Confluent Kafka 的最低要求。我不会在此处介绍这一点,因为它与我们的总体比较无关。
- error_cb:处理 Kafka 错误处理。
- key_serializer:这描述了消息键将如何存储在 Kafka 中。键是 Kafka 如何处理有效负载的极其重要的组成部分。更多内容将在下一篇博客中介绍。
- Value.serializer:我们将在接下来介绍这一点,简而言之,我们必须描述我们的生产者将发送的数据类型。这就是为什么定义我们的 schema 注册表非常重要。
主题和交付
现在我们已经初始化了 MQTT 发布者和 Kafka 生产者,是时候发送我们的应急发电机数据了。为此,这两种协议都需要建立主题并在交付前准备数据
MQTT
在 MQTT 的世界中,主题是一个 UTF-8 字符串,用于建立有效负载之间的逻辑过滤。
主题名称 | 有效负载 |
温度 | 36 |
燃料 | 400 |
在本系列的第二部分中,我们更详细地分解了 MQTT 和 Kafka 主题的功能和差异。现在,我们将建立一个主题来发送我们所有的应急发电机数据(这不是最佳实践,但在本项目复杂性的构建中是合乎逻辑的)。
message = json.dumps(data)
self.client.publish(topic="emergency_generator", message)
MQTT 的好处是能够在交付有效负载期间按需生成主题。如果主题已存在,则有效负载只是发送到已建立的主题。如果不存在,则创建主题。这使我们的代码相对简单。我们定义我们的主题名称和我们计划发送的 JSON 字符串。默认情况下,MQTT 有效负载非常灵活,这有利有弊。从积极的方面来看,您不需要为您的数据定义严格的 schema 类型。另一方面,您依赖于您的订阅者足够健壮,能够处理超出规范的传入消息。
Kafka
因此我必须承认,我带着幼稚的乐观情绪来到这里,认为通过 Kafka 发送 JSON 有效负载就像 publish() 一样简单。我真是大错特错了!让我们一起来看看
self.schema_str = """{
"$schema": "https://json-schema.fullstack.org.cn/draft-07/schema#",
"title": "Generator",
"description": "A fuel engines health data",
"type": "object",
"properties": {
"generatorID": {
"description": "UniqueID of generator",
"type": "string"
},
"lat": {
"description": "latitude",
"type": "number"
},
"lon": {
"description": "longitude",
"type": "number"
},
"temperature": {
"description": "temperature",
"type": "number"
},
"pressure": {
"description": "pressure",
"type": "number"
},
"fuel": {
"description": "fuel",
"type": "number"
}
},
"required": [ "generatorID", "lat", "lon", "temperature", "pressure", "fuel" ]
}"""
schema_registry_conf = {'url': 'https://psrc-8vyvr.eu-central-1.aws.confluent.cloud',
'basic.auth.user.info': environ.get('SCHEMEA_REGISTRY_LOGIN')}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
self.json_serializer = JSONSerializer(self.schema_str, schema_registry_client, engine_to_dict)
self.p = SerializingProducer({
'bootstrap.servers': 'pkc-41wq6.eu-west-2.aws.confluent.cloud:9092',
'sasl.mechanism': 'PLAIN',
'security.protocol': 'SASL_SSL',
'sasl.username': environ.get('SASL_USERNAME'),
'sasl.password': environ.get('SASL_PASSWORD'),
'error_cb': error_cb,
'key.serializer': StringSerializer('utf_8'),
'value.serializer': self.json_serializer
})
我们列表上的首要任务是建立一个 JSON schema。JSON schema 描述了我们数据的预期结构。在我们的示例中,我们定义了我们的发电机仪表读数(温度、压力、燃料)以及我们的元数据(generdatorID、lat、lon)。注意,在定义中,我们定义了它们的数据类型以及每个有效负载需要发送哪些数据点。
我们之前已经讨论过连接到我们的 schema 注册表。接下来,我们想要在注册表中注册我们的 JSON schema 并创建一个 JSON 序列化器。为此,我们需要三个参数
- schema_str:我们讨论的 schema 设计
- schema _registry_client:我们的对象连接到注册表
- engine_to_dict:JSON 序列化器,它允许您编写自定义函数来构建 Python 字典结构,该结构将被转换为 JSON 格式。
然后将 json_serializer 对象包含在 Serializing Producer 的初始化中。
最后,要发送数据,我们调用我们的生产者对象
self.p.produce(topic=topic, key=str(uuid4()), value=data, on_delivery=delivery_report)
要将数据发送到我们的 Kafka 集群,我们
- 定义我们的主题名称(默认情况下,Kafka 需要手动生成主题。您可以通过 broker/集群中的设置来允许自动生成)。
- 为我们的数据创建一个唯一的键、我们希望发布的数据(这将通过我们的自定义函数和交付报告进行处理(定义用于提供有关有效负载成功或未成功交付的反馈的函数))。
我对强类型/schemer-based 设计的第一印象是:“哇,这肯定会让系统设计人员维护大量代码,并且学习曲线陡峭”。当我实现这个示例时,我意识到您可能会避免很多未来的技术债务。Schemer 强制新的生产者/消费者符合当前预期的数据结构或生成新的 schema 版本。这允许当前系统继续不受连接到您的 Kafka 集群的恶意生产者的阻碍。我将在本博客系列的第二部分中更详细地介绍这一点。
展望和结论
那么,我们做了什么?嗯,以最简单粗暴的形式,我们创建了一个 Kafka 生产者和一个 MQTT 发布者来传输我们的发电机数据。从表面上看,对于相同的结果,Kafka 在设置上似乎比 MQTT 复杂得多。
在这个层面上,您是对的。然而,我们只是略微触及了 Kafka 可以做什么以及它应该如何在真正的物联网架构中部署的表面。我计划在本系列中发布另外两篇博客
- 第二部分:我介绍了 Kafka 独有的更多功能,例如更深入地了解主题、可扩展性和第三方集成(包括 InfluxDB)。
- 第三部分:我们将运用我们所学到的知识,并将最佳实践应用于真实的物联网项目。我们将使用 Kafka 的 MQTT 代理,并更深入地研究第三方集成,以充分利用您的 Kafka 基础设施。
在那之前,请查看代码,运行它,玩转它并改进它。下一篇博客(本系列的第二部分)我们将更详细地介绍主题。