MQTT vs Kafka:物联网倡导者的观点(第一部分 - 基础知识)

导航至

随着 Kafka Summit 大会临近,我认为是时候亲自动手看看它到底是怎么回事了。作为物联网的倡导者,我听说过 Kafka,但过于专注于 MQTT 等协议,而没有进一步研究。对于像我这样不熟悉的人来说,这两种协议看起来非常相似,甚至几乎是相互竞争的。然而,我已经了解到事实远非如此,实际上,在许多情况下,它们是互补的。

在本博客系列中,我希望总结 Kafka 和 MQTT 是什么,以及它们如何融入物联网架构。为了帮助解释一些概念,我认为使用过去的情景会很实用

monitoring-emergency fuel generators

之前的博客中,我们讨论了一个我们想要监控应急燃料发电机的场景。我们使用 InfluxDB Python 客户端库创建了一个模拟器,将发电机数据发送到 InfluxDB Cloud。对于这篇博客,我决定重用该模拟器,但将客户端库替换为 MQTT 发布者和 Kafka 生产者,以了解每个协议背后的核心机制。

您可以在此处找到此演示的代码。

了解基础知识

那么 Kafka 是什么?Kafka 被描述为一个事件流平台。它符合发布者-订阅者架构,并增加了数据持久化的好处(要了解更多基本原理,请查看这篇博客)。Kafka 还在物联网领域推广了一些非常棒的优势

  • 高吞吐量
  • 高可用性
  • 连接到知名的第三方平台

那么为什么我不直接使用 Kafka 构建我的整个物联网平台呢?嗯,这归结为几个关键问题

  1. Kafka 是为部署良好基础设施的稳定网络而构建的
  2. 它不部署关键的数据交付功能,例如 Keep-Alive 和 Last Will

话虽如此,让我们继续比较一下编写基本 Kafka 生产者的实现,并将其与应急发电机演示上下文中的 MQTT 发布者进行比较

demo

假设: 为了本演示的目的,我将使用 Mosquitto MQTT Broker 和 Confluent 平台 (Kafka)。我们不会在此处介绍初始创建/设置,但您可以相应地查阅这些说明

  1. Mosquito Broker 
  2. Confluent(我强烈建议您使用 Confluent Cloud 的免费试用版来检查 Kafka 是否适合您,然后再让自己陷入本地部署的困境)

初始化

让我们从 MQTT 发布者和 Kafka 生产者的初始化开始

MQTT

MQTT 发布者的最低要求(省略安全性)如下

  1. 主机:托管 Mosquitto 服务器的平台的地址/IP
  2. 端口:MQTT 生产者将与之通信的端口。基本连接通常为 1883,TLS 为 8883。
  3. Keep Alive 间隔:通信之间允许的时间量(秒)。

self.client.connect(host=self.mqttBroker,port=self.port, keepalive=MQTT_KEEPALIVE_INTERVAL)

Kafka

在 Kafka 方面,需要做更多后台工作。我们必须建立与两个不同 Kafka 实体的连接

  1. Kafka 集群:这是给定的,我们将在这里发送我们的有效负载。
  2. Schema 注册表:注册表位于 Kafka 集群的范围之外。它处理主题模式的存储和交付。换句话说,这迫使生产者以 Kafka 消费者期望的格式交付数据。稍后详细介绍。

那么让我们设置与这两个实体的连接

Schema 注册表

schema_registry_conf = {'url': 'https://psrc-8vyvr.eu-central-1.aws.confluent.cloud', 
                                'basic.auth.user.info': <USERNAME>:<PASSWORD>'}
     schema_registry_client = SchemaRegistryClient(schema_registry_conf)

分解

  • url:您的 schema 注册表的地址。Confluent 支持创建注册表以进行托管。
  • authentication:与任何存储库一样,它包含基本安全性以确保您的 schema 设计安全。

Kafka 集群

self.json_serializer = JSONSerializer(self.schema_str, schema_registry_client, engine_to_dict)
    self.p = SerializingProducer({
        'bootstrap.servers': 'pkc-41wq6.eu-west-2.aws.confluent.cloud:9092',
        'sasl.mechanism': 'PLAIN',
        'security.protocol': 'SASL_SSL',
        'sasl.username': '######',
        'sasl.password': '######',
        'error_cb': error_cb,
        'key.serializer': StringSerializer('utf_8'),
        'value.serializer': self.json_serializer
        })

分解

  1. bootstrap.servers:简而言之,地址指向托管我们 Kafka 集群的 Confluent Cloud;更具体地说,是 Kafka broker。(Kafka 也有 broker 的概念,但按主题划分)。Bootstrap 是指生产者在全球集群中建立其存在。
  2. sasl.*:简单安全身份验证协议;这些是连接到 Confluent Kafka 的最低要求。我不会在此处介绍这一点,因为它与我们的总体比较无关。
  3. error_cb:处理 Kafka 错误处理。
  4. key_serializer:这描述了消息键将如何存储在 Kafka 中。键是 Kafka 如何处理有效负载的极其重要的组成部分。更多内容将在下一篇博客中介绍。
  5. Value.serializer:我们将在接下来介绍这一点,简而言之,我们必须描述我们的生产者将发送的数据类型。这就是为什么定义我们的 schema 注册表非常重要。

主题和交付

现在我们已经初始化了 MQTT 发布者和 Kafka 生产者,是时候发送我们的应急发电机数据了。为此,这两种协议都需要建立主题并在交付前准备数据

MQTT

在 MQTT 的世界中,主题是一个 UTF-8 字符串,用于建立有效负载之间的逻辑过滤。

主题名称 有效负载
温度 36
燃料 400

在本系列的第二部分中,我们更详细地分解了 MQTT 和 Kafka 主题的功能和差异。现在,我们将建立一个主题来发送我们所有的应急发电机数据(这不是最佳实践,但在本项目复杂性的构建中是合乎逻辑的)。

message = json.dumps(data)
self.client.publish(topic="emergency_generator", message)

MQTT 的好处是能够在交付有效负载期间按需生成主题。如果主题已存在,则有效负载只是发送到已建立的主题。如果不存在,则创建主题。这使我们的代码相对简单。我们定义我们的主题名称和我们计划发送的 JSON 字符串。默认情况下,MQTT 有效负载非常灵活,这有利有弊。从积极的方面来看,您不需要为您的数据定义严格的 schema 类型。另一方面,您依赖于您的订阅者足够健壮,能够处理超出规范的传入消息。

Kafka

因此我必须承认,我带着幼稚的乐观情绪来到这里,认为通过 Kafka 发送 JSON 有效负载就像 publish() 一样简单。我真是大错特错了!让我们一起来看看

self.schema_str = """{
            "$schema": "https://json-schema.fullstack.org.cn/draft-07/schema#",
            "title": "Generator",
            "description": "A fuel engines health data",
            "type": "object",
            "properties": {
                "generatorID": {
                "description": "UniqueID of generator",
                "type": "string"
                },
                "lat": {
                "description": "latitude",
                "type": "number"
                },
                "lon": {
                "description": "longitude",
                "type": "number"
                },
                "temperature": {
                "description": "temperature",
                "type": "number"
                },
                "pressure": {
                "description": "pressure",
                "type": "number"
                },
                "fuel": {
                "description": "fuel",
                "type": "number"
                }
            },
            "required": [ "generatorID", "lat", "lon", "temperature", "pressure", "fuel" ]
            }"""

        schema_registry_conf = {'url': 'https://psrc-8vyvr.eu-central-1.aws.confluent.cloud', 
                                'basic.auth.user.info': environ.get('SCHEMEA_REGISTRY_LOGIN')}
        schema_registry_client = SchemaRegistryClient(schema_registry_conf)

        self.json_serializer = JSONSerializer(self.schema_str, schema_registry_client, engine_to_dict)

        self.p = SerializingProducer({
        'bootstrap.servers': 'pkc-41wq6.eu-west-2.aws.confluent.cloud:9092',
        'sasl.mechanism': 'PLAIN',
        'security.protocol': 'SASL_SSL',
        'sasl.username': environ.get('SASL_USERNAME'),
        'sasl.password': environ.get('SASL_PASSWORD'),
        'error_cb': error_cb,
        'key.serializer': StringSerializer('utf_8'),
        'value.serializer': self.json_serializer
        })

我们列表上的首要任务是建立一个 JSON schema。JSON schema 描述了我们数据的预期结构。在我们的示例中,我们定义了我们的发电机仪表读数(温度、压力、燃料)以及我们的元数据(generdatorID、lat、lon)。注意,在定义中,我们定义了它们的数据类型以及每个有效负载需要发送哪些数据点。

我们之前已经讨论过连接到我们的 schema 注册表。接下来,我们想要在注册表中注册我们的 JSON schema 并创建一个 JSON 序列化器。为此,我们需要三个参数

  1. schema_str:我们讨论的 schema 设计
  2. schema _registry_client:我们的对象连接到注册表
  3. engine_to_dict:JSON 序列化器,它允许您编写自定义函数来构建 Python 字典结构,该结构将被转换为 JSON 格式。

然后将 json_serializer 对象包含在 Serializing Producer 的初始化中。

最后,要发送数据,我们调用我们的生产者对象

self.p.produce(topic=topic, key=str(uuid4()), value=data, on_delivery=delivery_report)

要将数据发送到我们的 Kafka 集群,我们

  1. 定义我们的主题名称(默认情况下,Kafka 需要手动生成主题。您可以通过 broker/集群中的设置来允许自动生成)。
  2. 为我们的数据创建一个唯一的键、我们希望发布的数据(这将通过我们的自定义函数和交付报告进行处理(定义用于提供有关有效负载成功或未成功交付的反馈的函数))。

我对强类型/schemer-based 设计的第一印象是:“哇,这肯定会让系统设计人员维护大量代码,并且学习曲线陡峭”。当我实现这个示例时,我意识到您可能会避免很多未来的技术债务。Schemer 强制新的生产者/消费者符合当前预期的数据结构或生成新的 schema 版本。这允许当前系统继续不受连接到您的 Kafka 集群的恶意生产者的阻碍。我将在本博客系列的第二部分中更详细地介绍这一点。

展望和结论

那么,我们做了什么?嗯,以最简单粗暴的形式,我们创建了一个 Kafka 生产者和一个 MQTT 发布者来传输我们的发电机数据。从表面上看,对于相同的结果,Kafka 在设置上似乎比 MQTT 复杂得多。

在这个层面上,您是对的。然而,我们只是略微触及了 Kafka 可以做什么以及它应该如何在真正的物联网架构中部署的表面。我计划在本系列中发布另外两篇博客

  • 第二部分:我介绍了 Kafka 独有的更多功能,例如更深入地了解主题、可扩展性和第三方集成(包括 InfluxDB)。
  • 第三部分:我们将运用我们所学到的知识,并将最佳实践应用于真实的物联网项目。我们将使用 Kafka 的 MQTT 代理,并更深入地研究第三方集成,以充分利用您的 Kafka 基础设施。

在那之前,请查看代码,运行它,玩转它并改进它。下一篇博客(本系列的第二部分)我们将更详细地介绍主题。