Apache Kafka 和 InfluxDB 入门指南

导航到

本文由 Aykut Bulgu 撰写。向下滚动查看他的简介和照片。

随着越来越多的应用程序架构转向微服务或无服务器结构,应用程序和服务的数量每天都在增加。您可以使用实时聚合或计算输出为测量或指标的算法来处理这种不断增加的时序数据。这些指标需要被监控,以便您能够快速解决系统中的问题并做出相关更改。

系统中的变化可以通过多种方式捕捉和观察。最流行的一种,特别是在云原生环境中,是使用 事件

事件驱动系统成为创建松耦合分布式系统的标准。您可以通过事件驱动的方式收集您的应用程序指标、测量或日志,例如 事件源变更数据捕获 (CDC) 等,并将它们发送到消息骨干网,由其他资源(如数据库或可观察性工具)消费。在这种情况下,持久性和性能很重要,而传统的消息代理通常不提供这些功能。

Apache Kafka 相反,是一个持久性、高性能的消息系统,也被认为是一个分布式流处理平台。Apache Kafka 可以应用于许多用例,包括消息传递、数据集成、日志聚合和指标。

当涉及到指标时,仅有一个消息骨干或代理是不够的。虽然 Apache Kafka 是持久的,但它不是为运行指标和监控查询而设计的。这就是 InfluxDB 的作用。

InfluxDB 是一个时序数据库 (TSDB),它为监控、应用程序指标、物联网 (IoT) 传感器数据和实时分析提供存储和时序数据检索。它可以与 Apache Kafka 集成,用于发送或接收用于处理和监控的指标或事件数据。

在本教程中,您将学习Apache Kafka和InfluxDB。您还将了解它们如何结合使用来创建任务和警报,以及通过客户端查询数据。

先决条件

要完成本教程,您需要以下内容

  • 安装了Homebrew的macOS环境。
  • 最新的Docker版本。(在撰写本文时,使用了Docker Desktop 4.6.1。)
  • 最新的Docker Compose版本(此处使用了2.3.3版本)。
  • Python 3.8或更高版本。

为了跟随教程,您可以使用这个GitHub仓库

背景信息:物联网SaaS

在本文中,您将了解一家名为InfluxGarden的虚构物联网软件公司,该公司为园艺公司提供服务。最近,他们需要一位软件开发专家将Apache Kafka与其InfluxDB系统集成,他们计划将其作为软件即服务(SaaS)平台。

InfluxGarden拥有测量湿度、温度、土壤和风速等数据的传感器。在本文中,您将收集模拟应用程序的数据,然后将数据发送到Apache Kafka,以便由InfluxDB读取并在其数据探索器界面显示。

整体应用架构如下

The application architecture courtesy of Aykut Bulgu

什么是Apache Kafka?

如前所述,Apache Kafka是一个开源的分布式流处理平台,最初被创建为一个高性能的消息系统,并被超过80%的《财富》100强公司使用。

Apache Kafka以其高吞吐量和低延迟而闻名。它可以以多种方式使用,包括以下几种

  • 消息传递:Apache Kafka取代了许多(特别是基于Java的)传统消息系统,包括ActiveMQRabbitMQ

  • 流处理:它通过存储实时事件来提供事件骨干,用于聚合、丰富和处理。

  • 度量:Apache Kafka成为许多分布式组件或应用(如微服务)的集中聚合点。这些应用可以将实时度量发送到其他平台,包括InfluxDB。

  • 数据集成:可以捕获数据和事件更改并将其发送到Apache Kafka,然后它们被任何需要针对这些更改采取行动的应用程序消费。

  • 日志聚合:Apache Kafka可以作为日志流平台的消息骨干,将日志块转换为数据流。

安装后,Apache Kafka形成了一个由代理Zookeeper实例组成的集群。Zookeeper是一个独立于Apache Kafka项目开发的第三方开源平台。它是Apache Kafka用于诸如选择控制器代理等任务的一个依赖项。

Kafka社区一直在努力消除这个依赖关系,并在最新版本中引入了无Zookeeper模式(但目前尚未准备好投入生产使用)。

有关更多信息,请参阅Apache Kafka改进提案(KIP-500)

“经纪人”通常指Apache Kafka集群中的一部分,即Apache Kafka服务器实例。为了实现水平扩展和可用性,经纪人必须在不同的机器上运行,其中之一成为控制器。有关Apache Kafka结构的更多信息,请参阅官方Apache Kafka文档

Apache Kafka具有主题,这是逻辑存储单元,类似于关系型数据库中的表。主题通过分区在经纪人之间进行分发,提供可扩展性和弹性。

当客户端向Apache Kafka集群实例发送数据时,必须将其发送到主题。

此外,当客户端从Apache Kafka集群读取数据时,必须从主题中读取。向Apache Kafka发送数据的客户端成为生产者,而从Kafka集群读取数据的客户端成为消费者

与传统消息平台不同,在Apache Kafka中,经纪人比较简单,而生产者和消费者则比较智能。这意味着经纪人仅配置为保留一定时间内的数据;除此之外,所有复杂的配置都在客户端(生产者和消费者)上完成。

Kafka durability

当您从Apache Kafka主题中消费消息时,消息不会被删除。只要它存储在经纪人处,并且根据保留配置,您就可以重新消费任何消息。这是Apache Kafka的回放机制,对于许多用例,包括事件驱动架构,都非常重要。

什么是InfluxDB?

TSDB是一种针对时间序列数据优化的数据库,这些数据是随时间跟踪、监控和聚合的指标或事件。这些指标和事件以时间戳的形式保存在TSDB中。

InfluxDB专门为监控、应用程序指标、物联网、传感器数据和实时分析中的时间序列数据而创建。它可以与第三方数据存储集成,如MongoDBElasticsearch、API、服务以及消息队列,以便发送和接收指标数据。

InfluxDB提供了一个插件驱动的服务器代理Telegraf,它可以收集和报告来自任何支持源的性能指标,并将这些数据或反之输入InfluxDB。

Telegraf

Telegraf可以在Linux上作为sysvinit或systemd服务启动,或者您也可以将其作为终端命令运行。

Telegraf具有输入和输出插件,必须在启动之前进行配置。例如,它有一个Apache Kafka消费者输入插件,可以使其能够从Kafka主题中读取消息,以及一个输出插件,可以帮助将任何输入数据写入InfluxDB。在此,您将使用这些插件来运行Telegraf。

运行Apache Kafka

在本教程中,您将在容器中运行Apache Kafka。首先,下载具有Apache Kafka及其依赖项Zookeeper配置的Docker Compose YAML文件。该文件使用基于Strimzi的容器镜像。

文件内容应如下所示

version: '3.3'

services:

  zookeeper:
    container_name: zookeeper
    image: quay.io/strimzi/kafka:0.28.0-kafka-3.1.0
    command: [
        "sh", "-c",
        "bin/zookeeper-server-start.sh config/zookeeper.properties"
      ]
    ports:
    - "2181:2181"
    environment:
      LOG_DIR: /tmp/logs

  kafka:
    container_name: kafka
    image: quay.io/strimzi/kafka:0.28.0-kafka-3.1.0
    command: [
      "sh", "-c",
      "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
    ]
    depends_on:
    - zookeeper
    ports:
    - "9092:9092"
    environment:
      LOG_DIR: "/tmp/logs"
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

  influxdb:
      container_name: influxdb
      ports:
          - '8086:8086'
      image: 'docker.io/influxdb:2.2.0'

在您的家目录下,创建一个名为 influxgarden_integration 的文件夹,并将文件命名为 docker-compose.yaml 放入其中。

然后打开一个终端窗口,运行以下命令在您的机器上执行 Apache Kafka

docker-compose -f '_YOUR_HOME_DIRECTORY_/influxgarden_integration/docker-compose.yaml' up

输出结果应如下所示

['podman', '--version', '']
using podman version: 4.1.0
** excluding:  set()
['podman', 'network', 'exists', 'kafka_default']
podman create --name=zookeeper --label io.podman.compose.config-hash=123 --label io.podman.compose.project=kafka --label io.podman.compose.version=0.0.1 --label com.docker.compose.project=kafka --label com.docker.compose.project.working_dir=/Users/mabulgu/github-repos/systemcraftsman/influxdb-kafka-demo/resources/kafka --label com.docker.compose.project.config_files=/Users/mabulgu/github-repos/systemcraftsman/influxdb-kafka-demo/resources/kafka/docker-compose.yaml --label com.docker.compose.container-number=1 --label com.docker.compose.service=zookeeper -e LOG_DIR=/tmp/logs --net kafka_default --network-alias zookeeper -p 2181:2181 quay.io/strimzi/kafka:0.28.0-kafka-3.1.0 sh -c bin/zookeeper-server-start.sh config/zookeeper.properties
9f3a20f17ab9c4ec2214649f3d92a70fd60790f7cc31d0b2493db83db78809aa
exit code: 0
['podman', 'network', 'exists', 'kafka_default']
podman create --name=kafka --label io.podman.compose.config-hash=123 --label io.podman.compose.project=kafka --label io.podman.compose.version=0.0.1 --label com.docker.compose.project=kafka --label com.docker.compose.project.working_dir=/Users/mabulgu/github-repos/systemcraftsman/influxdb-kafka-demo/resources/kafka --label com.docker.compose.project.config_files=/Users/mabulgu/github-repos/systemcraftsman/influxdb-kafka-demo/resources/kafka/docker-compose.yaml --label com.docker.compose.container-number=1 --label com.docker.compose.service=kafka -e LOG_DIR=/tmp/logs -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --net kafka_default --network-alias kafka -p 9092:9092 quay.io/strimzi/kafka:0.28.0-kafka-3.1.0 sh -c bin/kafka-server-start.sh config/server.properties --override listeners=${KAFKA_LISTENERS} --override advertised.listeners=${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=${KAFKA_ZOOKEEPER_CONNECT}
d3a8b43e84b810d757da603301216a84c5f7a22c0ff09183f7d3a78bf347b4ea
exit code: 0
podman start -a zookeeper
podman start -a kafka
...output omitted...
[2022-05-22 13:56:20,717] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
[2022-05-22 13:56:20,792] INFO [BrokerToControllerChannelManager broker=0 name=alterIsr]: Recorded new controller, from now on will use broker localhost:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
[2022-05-22 13:56:20,841] INFO [BrokerToControllerChannelManager broker=0 name=forwarding]: Recorded new controller, from now on will use broker localhost:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)

现在您需要打开一个新的终端窗口,并运行以下命令以验证 Apache Kafka 和 Zookeeper 实例是否正在运行

docker ps
9f3a20f17ab9  quay.io/strimzi/kafka:0.28.0-kafka-3.1.0  sh -c bin/zookeep...  3 minutes ago  Up 3 minutes ago  0.0.0.0:2181->2181/tcp  zookeeper
d3a8b43e84b8  quay.io/strimzi/kafka:0.28.0-kafka-3.1.0  sh -c bin/kafka-s...  3 minutes ago  Up 3 minutes ago  0.0.0.0:9092->9092/tcp  kafka

上述输出显示 Apache Kafka 集群运行正常,没有任何问题。

运行 InfluxDB

在本教程中,您还将通过 Docker 在容器中运行 InfluxDB。

有关在其他平台上安装 InfluxDB 的更多信息,请参阅 此文档

在您的 macOS 环境中安装并启动 Docker 守护进程后,您需要打开一个新的终端窗口并运行以下命令以启动 InfluxDB

docker run -d --name influxdb -p 8086:8086 docker.io/influxdb:2.2.0

在您的网页浏览器中,导航到 localhost:8086 以验证安装

InfluxDB welcome page

选择 开始使用 按钮,并按照提示输入信息

Setup initial user

现在选择 继续,您应该被重定向到下一页以完成设置

Complete setup

完成设置后,选择 高级,您将被重定向到带有 存储桶 选项卡的 加载数据 页。请注意,列表中已有 garden_sensor_data 存储桶,请保持页面打开,您将在本教程的后续部分使用它。

安装、配置和运行 Telegraf

要安装 Telegraf 到 macOS,打开一个新的终端窗口并运行以下命令

brew install telegraf

如先决条件所述,您需要安装 Homebrew 才能进行此步骤。有关其他平台上的安装选项,您可以参考 此文档

安装 Telegraf 后,您需要配置它以集成 InfluxDB 和 Apache Kafka。

返回您之前导航到的 InfluxDB 加载数据 页,点击 Telegraf > InfluxDB 输出插件

Telegraf configuration page

选择 InfluxDB 输出插件 后,应该会弹出一个配置对话框,显示输出配置

[[outputs.influxdb_v2]]
  ## The URLs of the InfluxDB cluster nodes.
  ##
  ## Multiple URLs can be specified for a single cluster, only ONE of the
  ## urls will be written to each interval.
  ##   ex: urls = ["https://us-west-2-1.aws.cloud2.influxdata.com"]
  urls = ["https://127.0.0.1:8086"]

  ## API token for authentication.
  token = "$INFLUX_TOKEN"

  ## Organization is the name of the organization you wish to write to; must exist.
  organization = "InfluxGarden"

  ## Destination bucket to write into.
  bucket = "garden_sensor_data"

  ## The value of this tag will be used to determine the bucket.  If this
  ## tag is not set the 'bucket' option is used as the default.
  # bucket_tag = ""

  ## If true, the bucket tag will not be added to the metric.
  # exclude_bucket_tag = false

  ## Timeout for HTTP messages.
  # timeout = "5s"

  ## Additional HTTP headers
  # http_headers = {"X-Special-Header" = "Special-Value"}

  ## HTTP Proxy override, if unset values the standard proxy environment
  ## variables are consulted to determine which proxy, if any, should be used.
  # http_proxy = "http://corporate.proxy:3128"

  ## HTTP User-Agent
  # user_agent = "telegraf"

  ## Content-Encoding for write request body, can be set to "gzip" to
  ## compress body or "identity" to apply no encoding.
  # content_encoding = "gzip"

  ## Enable or disable uint support for writing uints influxdb 2.0.
  # influx_uint_support = false

  ## Optional TLS Config for use on HTTP connections.
  # tls_ca = "/etc/telegraf/ca.pem"
  # tls_cert = "/etc/telegraf/cert.pem"
  # tls_key = "/etc/telegraf/key.pem"
  ## Use TLS but skip chain & host verification
  # insecure_skip_verify = false

注意,组织名称和存储桶名称已在该配置中设置。

influxgarden_integration 目录下创建一个名为 telegraf.conf 的文件,并将上述配置复制到其中。通过在配置文件中添加 outputs 符号,您将 Telegraf 集成到 InfluxDB 中。

为了从源获取数据并通过 Telegraf 发送到 InfluxDB,您还需要在相同的配置文件中添加一个输入配置。

将以下输入配置追加到您创建的 telegraf.conf 文件中

[[inputs.kafka_consumer]]
  ## Kafka brokers.
  brokers = ["localhost:9092"]

  ## Topics to consume.
  topics = ["garden_sensor_data"]

  ## When set this tag will be added to all metrics with the topic as the value.
  # topic_tag = ""

  ## Optional Client id
  # client_id = "Telegraf"

  ## Set the minimal supported Kafka version.  Setting this enables the use of new
  ## Kafka features and APIs.  Must be 0.10.2.0 or greater.
  ##   ex: version = "1.1.0"
  # version = ""

  ## Optional TLS Config
  # tls_ca = "/etc/telegraf/ca.pem"
  # tls_cert = "/etc/telegraf/cert.pem"
  # tls_key = "/etc/telegraf/key.pem"
  ## Use TLS but skip chain & host verification
  # insecure_skip_verify = false

  ## SASL authentication credentials.  These settings should typically be used
  ## with TLS encryption enabled
  # sasl_username = "kafka"
  # sasl_password = "secret"

  ## Optional SASL:
  ## one of: OAUTHBEARER, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI
  ## (defaults to PLAIN)
  # sasl_mechanism = ""

  ## used if sasl_mechanism is GSSAPI (experimental)
  # sasl_gssapi_service_name = ""
  # ## One of: KRB5_USER_AUTH and KRB5_KEYTAB_AUTH
  # sasl_gssapi_auth_type = "KRB5_USER_AUTH"
  # sasl_gssapi_kerberos_config_path = "/"
  # sasl_gssapi_realm = "realm"
  # sasl_gssapi_key_tab_path = ""
  # sasl_gssapi_disable_pafxfast = false

  ## used if sasl_mechanism is OAUTHBEARER (experimental)
  # sasl_access_token = ""

  ## SASL protocol version.  When connecting to Azure EventHub set to 0.
  # sasl_version = 1

  ## Name of the consumer group.
  # consumer_group = "telegraf_metrics_consumers"

  ## Compression codec represents the various compression codecs recognized by
  ## Kafka in messages.
  ##  0 : None
  ##  1 : Gzip
  ##  2 : Snappy
  ##  3 : LZ4
  ##  4 : ZSTD
  # compression_codec = 0
  ## Initial offset position; one of "oldest" or "newest".
  # offset = "oldest"

  ## Consumer group partition assignment strategy; one of "range", "roundrobin" or "sticky".
  # balance_strategy = "range"

  ## Maximum length of a message to consume, in bytes (default 0/unlimited);
  ## larger messages are dropped
  max_message_len = 1000000

  ## Maximum messages to read from the broker that have not been written by an
  ## output.  For best throughput set based on the number of metrics within
  ## each message and the size of the output's metric_batch_size.
  ##
  ## For example, if each message from the queue contains 10 metrics and the
  ## output metric_batch_size is 1000, setting this to 100 will ensure that a
  ## full batch is collected and the write is triggered immediately without
  ## waiting until the next flush_interval.
  # max_undelivered_messages = 1000

  ## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "json"

注意,brokers 字段是 localhost:9092,这是您的 Apache Kafka 代理地址。

topics 字段是 garden_sensor_data,这意味着任何发布到该主题的消息都将被 Telegraf 捕获。您还应该注意,由于 InfluxGarden 需要处理其 JSON 传感器数据,所以 data_format 设置为 json。有关支持的数据格式的更多信息,您可以阅读 Telegraf GitHub 页面的“输入数据格式”

现在您已经配置了Telegraf,它已准备好运行。在运行 telegraf 命令之前,您必须在输出配置中定义 $INFLUX_TOKEN

您可以通过回到 加载数据 页面并选择 API令牌 来获取令牌。点击 flux_user的令牌 链接,应该会打开一个弹出窗口,显示您为 flux_user 用户创建的令牌

InfluxDB user token

复制令牌并回到您的终端窗口。出于安全考虑,不要直接在 telegraf.conf 文件中定义它,而是在您的计算机上将其设置为环境变量。然后运行以下命令,将 _YOUR_INFLUXDB_TOKEN_ 替换为您的令牌

export INFLUX_TOKEN=_YOUR_INFLUXDB_TOKEN_

在同一个终端窗口中,运行以下命令以执行Telegraf。确保您正确设置了此命令中的 _YOUR_HOME_DIRECTORY_

telegraf --config _YOUR_HOME_DIRECTORY_/influxgarden_integration/telegraf.conf

输出结果应如下所示

2022-05-21T19:38:37Z I! Starting Telegraf 1.22.4
2022-05-21T19:38:37Z I! Loaded inputs: kafka_consumer
2022-05-21T19:38:37Z I! Loaded aggregators:
2022-05-21T19:38:37Z I! Loaded processors:
2022-05-21T19:38:37Z I! Loaded outputs: influxdb_v2
2022-05-21T19:38:37Z I! Tags enabled: host=_YOUR_HOSTNAME_
2022-05-21T19:38:37Z I! [agent] Config: Interval:10s, Quiet:false, Hostname:"_YOUR_HOSTNAME_", Flush Interval:10s

运行花园传感器网关应用

InfluxGarden共享一个Python应用,模拟传感器,向garden_sensor_data Kafka主题生成各种传感器数据。这些传感器数据包括湿度、温度、土壤和风速等信息,这是InfluxGarden所需的传感器数据类型。

要下载生产者应用,复制此GitHub页面中的文件内容,并将其保存到 influxgarden_integration 目录中,命名为 garden_sensor_gateway.py

在运行传感器应用之前,请确保Kafka集群、InfluxDB和Telegraf实例仍在运行。然后运行以下命令以执行应用

python3 garden_sensor_gateway.py

当您运行应用时,它会以每五秒一次的间隔向Kafka发送随机生成的传感器数据。经过几个五秒间隔后,输出应如下所示

Sensor data is sent: {"temperature": 33.6, "humidity": 49.1, "wind": 1.1, "soil": 0.6}
Sensor data is sent: {"temperature": 0.9, "humidity": 81.6, "wind": 6.4, "soil": 23.3}
Sensor data is sent: {"temperature": 30.6, "humidity": 10.2, "wind": 4.0, "soil": 80.3}
Sensor data is sent: {"temperature": 8.1, "humidity": 92.9, "wind": 10.0, "soil": 0.9}
Sensor data is sent: {"temperature": 35.1, "humidity": 71.5, "wind": 4.9, "soil": 56.5}
Sensor data is sent: {"temperature": 48.7, "humidity": 22.7, "wind": 1.2, "soil": 38.2}
...output omitted...

要查看数据,请回到您的InfluxDB web界面。从左侧菜单中点击 探索,这将打开 数据探索器 页面

Data explorer

在页面底部,您应该会看到已经选中了 garden_sensor_data 存储桶。点击其旁边的 kafka_consumer 过滤器复选框;然后点击右侧的 提交 按钮来运行查询所有字段的查询。这将打开一个显示传感器数据变化的图表

Data explorer graph

要监控特定的传感器数据变化,您可以点击 _field 过滤器上的一个字段复选框。然后选择一个传感器数据字段(在本例中为 humidity)并点击 提交 以过滤结果

Data explorer graph for humidity

除了 图表 之外,您还可以从下拉菜单中选择其他InfluxDB数据可视化功能

Data explorer view modes

结论

在本教程中,您成功使用Telegraf将实时传感器数据从Apache Kafka流式传输到InfluxDB

InfluxDB为监控、应用程序度量、IoT传感器数据和实时分析提供了存储和时序数据分析。它可以很容易地通过Telegraf与Apache Kafka集成,以发送或接收度量或事件数据进行处理和监控。

您可以在这个GitHub存储库中找到本教程的资源。

其他资源

现在您已经使用InfluxDB存储了时序数据,您可以利用许多其他功能来获取数据的最价值。如果您对下一步要采取的步骤感兴趣,请查看这些资源。

关于作者

Aykut Bulgu,Red Hat 的服务内容架构师,拥有 15 年的软件工程师、顾问和培训师经验。他参与了众多企业项目——主要是 Java——并使用了包括 JBoss 中间件在内的许多开源项目。目前,他与 Apache Kafka、Camel 和 Strimzi 等开源项目合作,创建坚实的基础课程。他热衷于传播系统工艺文化,并共同组织 Software Craftsmanship Turkey 社区的活动。

网站: https://www.systemcraftsman.com/

推特: https://twitter.com/systemcraftsman