MQTT与Kafka:物联网倡导者的观点(第一部分 - 基础知识)
作者:Jay Clifford / 用例,开发者
2022年4月20日
导航至
随着Kafka峰会即将举行,我认为是时候动手了解它的基本情况了。作为一名物联网倡导者,我听说过Kafka,但由于深陷MQTT等协议中,未能进一步调查。对于初学者(像我一样),这两种协议似乎非常相似,如果不是几乎相互竞争。然而,我了解到这远远不是事实,实际上,在许多情况下,它们实际上是互补的。
在这个博客系列中,我希望总结Kafka和MQTT是什么,以及它们如何都适合物联网架构。为了解释一些概念,我想使用一个过去的场景来实际说明。
在之前的博客中,我们讨论了一个场景,我们想要监控应急燃料发电机。我们使用InfluxDB Python客户端库创建了一个模拟器,将发电机数据发送到InfluxDB Cloud。对于这个博客,我决定重用那个模拟器,但用MQTT发布者和Kafka生产者替换客户端库,以了解每个的核心机制。
您可以在这里找到这个演示的代码。
理解基础知识
那么Kafka是什么?Kafka被描述为事件流平台。它遵循发布-订阅架构,并增加了数据持久化的好处(要了解更多的基础知识,请查看这篇博客)。Kafka还在物联网领域促进了相当多的好处。
- 高吞吐量
- 高可用性
- 连接到知名的第三方平台
那么我为什么不只用Kafka构建整个物联网平台呢?好吧,这归结为几个关键问题。
- Kafka是为稳定网络和良好基础设施部署而构建的。
- 它不部署关键数据交付功能,如Keep-Alive和Last Will。
话虽如此,让我们继续比较基本的Kafka生产者和MQTT发布者在应急发电机演示中的实现。
假设:出于这个演示的目的,我将使用Mosquitto MQTT代理和Confluent平台(Kafka)。我们不会在这里介绍初始创建/设置,但你可以根据这些说明进行咨询。
初始化
让我们从初始化我们的MQTT发布者和Kafka生产者开始。
MQTT
MQTT发布者的最低要求(省略安全性)如下
- 主机:托管Mosquitto服务器的地址/ IP
- 端口:MQTT生产者将与之通信的端口。通常是1883用于基本连接,8883 TLS。
- 保持活动间隔:通信之间允许的时间(秒)。
self.client.connect(host=self.mqttBroker,port=self.port, keepalive=MQTT_KEEPALIVE_INTERVAL)
Kafka
在处理Kafka时,需要做更多背景工作。我们需要建立与两个不同的Kafka实体的连接。
- Kafka集群:这是我们发送数据的地方。
- 模式注册表:该注册表位于Kafka集群之外。它负责存储和交付主题模式。换句话说,这迫使生产者以Kafka消费者期望的格式交付数据。稍后对此进行更多介绍。
因此,让我们设置与这两个实体的连接。
模式注册表
schema_registry_conf = {'url': 'https://psrc-8vyvr.eu-central-1.aws.confluent.cloud',
'basic.auth.user.info': <USERNAME>:<PASSWORD>'}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
分解如下
- url:模式注册表的地址。Confluent支持创建注册表以托管。
- 身份验证:像任何存储库一样,它包含基本安全措施,以保护您的模式设计安全。
Kafka集群
self.json_serializer = JSONSerializer(self.schema_str, schema_registry_client, engine_to_dict)
self.p = SerializingProducer({
'bootstrap.servers': 'pkc-41wq6.eu-west-2.aws.confluent.cloud:9092',
'sasl.mechanism': 'PLAIN',
'security.protocol': 'SASL_SSL',
'sasl.username': '######',
'sasl.password': '######',
'error_cb': error_cb,
'key.serializer': StringSerializer('utf_8'),
'value.serializer': self.json_serializer
})
分解如下
- bootstrap.servers:简而言之,地址指向托管我们的Kafka集群的Confluent Cloud;更具体地说,是一个Kafka代理。(Kafka还有代理的标记,但基于每个主题)。Bootstrap是指生产者在集群中建立其全球存在的引用。
- sasl.*:简单安全认证协议;这是连接到Confluent Kafka的最低要求。这里不涉及,因为这对我们整体比较没有兴趣。
- error_cb:处理Kafka错误处理。
- key_serializer:这描述了消息键如何在Kafka中存储。键是Kafka处理有效负载的重要组成部分。在下一篇博客中对此进行更多介绍。
- Value.serializer:我们将在下一部分介绍,简而言之,我们必须描述生产者将发送的数据类型。这就是为什么定义我们的模式注册表非常重要。
主题和交付
现在我们已经启动了MQTT发布者和Kafka生产者,是时候发送紧急发电机数据了。为此,这两个协议都需要在交付之前建立主题和数据准备。
MQTT
在MQTT的世界里,主题是一个UTF-8字符串,它建立了有效负载之间的逻辑过滤。
主题名称 | 有效负载 |
温度 | 36 |
燃料 | 400 |
在本系列的第二部分中,我们将更详细地介绍MQTT和Kafka主题的功能和差异。现在,我们将建立一个主题来发送所有紧急发电机数据(这不是最佳实践,但在本项目的复杂性构建中是合理的)。
message = json.dumps(data)
self.client.publish(topic="emergency_generator", message)
MQTT的好处是在交付有效负载时能够按需生成主题。如果主题已存在,有效负载将简单地发送到已建立的主题。如果不存在,则创建主题。这使得我们的代码相对简单。我们定义主题名称和计划发送的JSON字符串。MQTT有效负载默认非常灵活,这既有优点也有缺点。在积极方面,您不需要为您的数据定义严格的模式类型。另一方面,您依赖于您的订阅者足够强大,能够处理不符合常规的消息。
Kafka
所以我必须承认,我最初错误地乐观地认为通过Kafka发送JSON有效负载会像publish()一样简单!让我们一步一步来
self.schema_str = """{
"$schema": "https://json-schema.fullstack.org.cn/draft-07/schema#",
"title": "Generator",
"description": "A fuel engines health data",
"type": "object",
"properties": {
"generatorID": {
"description": "UniqueID of generator",
"type": "string"
},
"lat": {
"description": "latitude",
"type": "number"
},
"lon": {
"description": "longitude",
"type": "number"
},
"temperature": {
"description": "temperature",
"type": "number"
},
"pressure": {
"description": "pressure",
"type": "number"
},
"fuel": {
"description": "fuel",
"type": "number"
}
},
"required": [ "generatorID", "lat", "lon", "temperature", "pressure", "fuel" ]
}"""
schema_registry_conf = {'url': 'https://psrc-8vyvr.eu-central-1.aws.confluent.cloud',
'basic.auth.user.info': environ.get('SCHEMEA_REGISTRY_LOGIN')}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
self.json_serializer = JSONSerializer(self.schema_str, schema_registry_client, engine_to_dict)
self.p = SerializingProducer({
'bootstrap.servers': 'pkc-41wq6.eu-west-2.aws.confluent.cloud:9092',
'sasl.mechanism': 'PLAIN',
'security.protocol': 'SASL_SSL',
'sasl.username': environ.get('SASL_USERNAME'),
'sasl.password': environ.get('SASL_PASSWORD'),
'error_cb': error_cb,
'key.serializer': StringSerializer('utf_8'),
'value.serializer': self.json_serializer
})
我们清单上的第一个任务是要建立一个JSON模式。JSON模式描述了我们的数据预期结构。在我们的示例中,我们定义了我们的发电机表计读数(温度、压力、燃料)以及我们的元数据(generdatorID、纬度、经度)。注意,在定义中,我们定义了它们的数据类型以及哪些数据点需要与每个有效负载一起发送。
我们之前已经讨论了连接到我们的模式注册表。接下来,我们想将我们的JSON模式注册到注册表中并创建一个JSON序列化器。为此,我们需要三个参数
- schema_str:我们讨论的方案设计
- schema_registry_client:我们的对象连接到注册表
- engine_to_dict:允许您编写自定义函数以构建将转换为JSON格式的Python字典结构的JSON序列化器。
json_serializer对象随后被包含在序列化生产者的初始化中。
最后,要发送数据,我们调用我们的生产者对象
self.p.produce(topic=topic, key=str(uuid4()), value=data, on_delivery=delivery_report)
要向我们的Kafka集群发送数据
- 定义我们的主题名称(默认情况下,Kafka需要手动生成主题。您可以通过设置代理/集群中的设置来允许自动生成)。
- 为我们的数据创建一个唯一键,我们希望发布的数据(这将通过我们的自定义函数和交付报告(一个定义用于提供有效负载成功或失败交付反馈的函数)进行处理)。
我对基于强类型/模式的设计的第一印象是:“哇,这必须让系统设计者维护大量代码并学习曲线陡峭”。当我实现示例时,我意识到您可能会以这种方式避免许多未来的技术债务。模式设计者强制新的生产者/消费者遵守当前预期的数据结构或生成一个新的模式版本。这允许当前系统不受恶意生产者连接到您的Kafka集群的影响。我将在本系列博客的第二部分中更详细地介绍这一点。
前景和结论
那么,我们做了什么?好吧,在最简化的形式中,我们创建了一个Kafka生产者和MQTT发布者来传输我们的发电机数据。从表面上看,它可能看起来Kafka在设置上比MQTT复杂得多,但结果相同。
在这个层面上,您是对的。然而,我们刚刚触及了Kafka的表面和如何在真正的物联网架构中部署它。我计划在这个系列中发布两篇更多的博客。
- 第二部分:我介绍了Kafka的独特功能,如对主题、可扩展性和第三方集成(包括InfluxDB)的更深入了解。
- 第三部分:我们将所学知识应用于实际的物联网项目。我们将使用Kafka的MQTT代理并更深入地了解第三方集成,以充分利用您的Kafka基础设施。
在此之前,请查看代码,运行它,玩弄它,并改进它。下一篇博客(本系列的第二部分)我们将更详细地介绍主题。