MQTT 与 Kafka:物联网倡导者的视角(第二部分 – 强大的 Kafka)

导航至

在本系列的第一部分中,我们开始比较 Kafka 和 MQTT 在物联网基础设施中的应用。结论是,在物联网设备的基本发布-订阅模型中,Kafka 可能 просто 过于复杂。但是,我们也了解到 Kafka 具有一些我们希望在物联网架构中利用的宝贵功能

  • 模式设计
  • 高可用性
  • 数据持久性和耐用性

在这篇文章中,我想重点重新设计我们上次的演示,以利用我们忽略的一些核心 Kafka 功能(主题设计和分区),并介绍新主题(双关语)如第三方集成。

在我们开始之前,这里有一些小的说明

  1. 去看看第一部分,这将有助于您了解一些背景信息。
  2. 您可以在此处找到演示代码。

主题和分区

在我们最初的演示中,我们的主题结构如下所示

Kafka-MQTT-Generator-simulator

基本上,无论我们启动多少个应急发电机,我们的有效负载都由同一个主题处理。在有限数量的发电机下,这是合理的。但是,这确实给我们留下了一些明显的问题

  1. 消费者必须暗示自己的逻辑来处理/忽略有效负载。例如,消费者可能只对处理来自引擎 1-2 或基于特定区域的数据感兴趣。
  2. 性能也会受到影响,因为我们的消费者将不得不处理所有消息,而不是特定的子集。

那么我们可以做些什么不同的事情呢?

MQTT

因此,在 MQTT 中,我们可以建立主题层次结构。HiveMQ 团队撰写了一篇很棒的博客文章,概述了构建主题时的一些注意事项

  • 永远不要使用前导斜杠
  • 永远不要在主题中使用空格
  • 不要订阅 #
  • 不要忘记可扩展性
  • 使用具体的主题,而不是一般的主题

我强烈建议查看这篇文章

因此,让我们将一些逻辑部署到我们的应用程序中。目前,我们的有效负载如下所示

{'generatorID': 'generator4', 'lat': 40.65995, 'lon': -111.99633, 'temperature': 74, 'pressure': 198, 'fuel': 901}

我们的主题如下

emergency_generator

由于我们在有效负载中具有 generatorID,因此我们可以考虑将其作为新的主题层附加

def publish_to_topic(self, topic: str, data: dict):
        topic = topic +"/"+ str(data["generatorID"])
        message = json.dumps(data)
        self.client.publish(topic, message)

这为我们提供了 emergency_generator 主题以及下面的后续层

emergency_generator/generator1
emergency_generator/generator2
emergency_generator/generator3

使用这种层次结构提供了设备(在我们的例子中是发电机)之间自然划分的额外好处。我们可以在 Kafka 中实现类似的架构吗?是否有额外的好处?

Kafka

Kafka 有一种非常简单而强大的方法可以自然地隔离我们的数据流。让我们再次看一下这部分代码

self.p.produce(topic=topic, key=str(uuid4()), value=data, on_delivery=delivery_report)

作为数据发布的一部分,我们分配一个键。此键作为键值对分配给我们的数据。虽然此键是可选的,但当与分区结合使用时,它非常强大。我们将在稍后讨论这个问题,但首先让我们更新我们的键以使用我们的 generatorID 代替。

self.p.produce(topic=topic, key=str(data['generatorID']), value=data, on_delivery=delivery_report)

现在到了有趣的部分,我们在本博客系列的第一部分中说过,Kafka 部署了一些独特的功能,可以高效地处理大量数据。这就是分区发挥作用的地方。您可以将分区想象成记录(我们的数据)的简单队列。分区的好处是可以为性能目的提供记录和并行性的一致排序,但如何实现呢?这就是我们的键发挥作用的地方。一个很好的想象方式是机场移民队列,而您的护照就是键。通常,对于国民、快速通道(如欧洲护照)和所有护照都存在队列(分区)。所有护照持有人将进入各自的队列进行处理。这相当于我们的 Kafka 架构。例如,Generator1 记录将始终进入分区 1,Generator2 进入分区 2。虽然键和分区之间通常不是一对一的关系,但随着我们扩展,会有一些很大的好处。它们包括

  1. 主题并行性:假设我们的发电机每秒生成 1000 条记录。我们有一个微服务试图消费这些消息并执行一些逻辑。我们知道微服务每秒只能处理 500 条记录,因此我们的消费者永远无法与我们的生产者保持一致性。相反,使用分区,我们可以启动第二个甚至第三个微服务实例,以彼此并行地消费这些记录的一部分。实现更高的吞吐量。
  2. 主题复制:这允许我们保留分区来保存我们记录的副本。由于我们的主题能够分布在许多服务器上(如 Kubernetes),这有助于防止由于中断造成的数据丢失。如果一个分区无法访问,那么我们的消费者将 просто 从备份分区中获取记录。

如您所见,我们开始利用一些强大的功能,这些功能使 Kafka 与其他发布-订阅消息传递协议区分开来。

Kafka 还能做什么?

Apache Kafka 是一种企业消息传递协议,它了解它需要适应预先存在的生态系统才能有效。这就是 Kafka Connect 的用武之地。

Kafka Connect

Kafka Connect 为知名的企业系统和数据存储引擎(如 InfluxDB)提供了强大的预制连接器目录。因此,由于我存在偏见,让我们讨论一下使用 InfluxDB 插件的源和接收器。

Kafka Connect 将连接器分为两类

  1. 源:我们可以从中提取数据并将其写入 Kafka 主题的位置。
  2. 接收器:我们可以将数据从主题写入到的位置。

我们现在将暂停 InfluxDB 源(剧透警告,我将在本博客系列的第 3 部分中使用它)。让我们设置一个 InfluxDB 接收器

注意:为了便于演示,我正在使用 Confluent Cloud:https://confluent.cloud/

  1. 数据集成下,转到连接器并搜索 InfluxDBConnectors
  2. 选择 InfluxDB 2 接收器
  3. 现在我们可以开始配置我们的连接器。这非常标准,这是我的快照:Connector-Summary

细分如下

  1. 我们希望订阅的主题。在我们的例子中,只有一个,emergency_generators
  2. 最大任务数,这相当于我们之前讨论的微服务概念。由于我们使用的是 Confluent Cloud 的免费层,因此我们仅限于一个。
  3. Kafka 服务身份验证,用于安全(阻止恶意服务尝试连接到您的数据流)。
  4. 我们希望指向的 InfluxDB 实例。
  5. 我们的 InfluxDB 实例的组织。
  6. 我们存储数据的存储桶。
  7. 写入精度(秒、毫秒等)。
  8. 测量格式。我们选择使用主题名称作为我们的 InfluxDB 测量名称。

您现在应该能够部署连接器。然后我们可以在流沿袭中查看数据流。

Data flow within the stream lineage

InfluxDB 只是 Kafka Connect 拥有的众多连接器之一。我们的 JSON 有效负载相对简单,这意味着它可以直接转换为许多接受 JSON 有效负载结构的连接器。但是,Kafka 还可以对数据有效负载应用简单的转换,用于即时转换,有助于提高互操作性。

展望与结论

我们现在更深入地研究了 MQTT 和 Kafka 的部署架构。虽然 MQTT 和 Kafka 都使用主题来描述消息有效负载的容器,但我们现在看到,在底层,它们截然不同。从我的角度来看,我现在可以看到使用 Kafka 分区和记录键的性能影响。我们还探索了使用 InfluxDB 提供的企业级连接器的能力。考虑到所有这些,这仍然无法弥补仅 Kafka 物联网解决方案的一些缺点

  • Kafka 是为部署良好基础设施的稳定网络而构建的
  • 它不部署关键的数据交付功能,如 Keep-Alive 和 Last Will

MQTT 在这些方面表现出色。那么接下来呢?

请加入我的本系列最后一篇博客,我们将在其中结合我们所学的知识,使用真实的物联网用例创建一个真正的混合物联网架构。我们将了解 Kafka 的 MQTT 代理,并进一步探索 Kafka 的企业连接器,如 InfluxDB 源。