使用Kafka和InfluxDB构建数据流管道

导航到

InfluxDB和Kafka不是竞争对手,而是互补的。流数据,尤其是时序数据,以高容量和高速度流动。将InfluxDB添加到您的Kafka集群中,可以为您的时序数据提供专门的解决方案。这种解决方案包括实时查询和分析,以及与最前沿的机器学习和人工智能技术的集成。像Hulu这样的公司已经将InfluxDB实例与Kafka配对。在数据源和InfluxDB之间实施Kafka集群可以提供额外的冗余层,并增加对数据输入和输出的更多控制。

您只需要几行TOML代码就可以将数据从Kafka发送到InfluxDB。 Telegraf 是一个基于插件的开放源代码工具,只需几行易于理解的代码即可将您的Kafka主题连接到InfluxDB。下面的教程将全面介绍如何将数据发送到Kafka主题,通过Telegraf连接到主题,并将数据发送到InfluxDB Cloud Serverless。示例数据集模拟了花园传感器的数据。Python代码生成模拟传感器数据的样本数据,并将其发送到Kafka主题。

Aykut Bulgu是此项目的原始创建者,该项目后来已被修改以适应InfluxDB Cloud Serverless。如果您希望直接从GitHub拉取此代码文件,您可以在此处找到它。

先决条件

构建代码文件

此代码文件的难度很低。构建将分阶段进行。首先,我们将构建包含数据生成函数、Dockerfile和说明的应用程序部分。接下来,我们将添加配置文件。最后,我们将连接代码文件到InfluxDB Cloud Serverless。

创建应用程序

创建一个名为“app”的文件夹。

以下文件都位于app文件夹内。

app/garden_sensor_gateway.py

此文件包含数据生成函数。此数据集发送到InfluxDB,包括花园的监控细节。该函数为温度、湿度、风速和土壤生成随机整数。然后,该函数将数据转换为JSON格式。

此文件初始化Kafka类并连接到Kafka主机,以便向名为“garden_sensor_data”的Kafka主题发送消息。

import time
import json
import random

from kafka import KafkaProducer

def random_temp_cels():
    return round(random.uniform(-10, 50), 1)

def random_humidity():
    return round(random.uniform(0, 100), 1)

def random_wind():
    ret
    data["temperature"] = random_temp_cels()
    data["humidity"] = random_humidity()
    data["wind"] = random_windurn round(random.uniform(0, 10), 1)

def random_soil():
    return round(random.uniform(0, 100), 1)

def get_json_data():
    data = {}
()
    data["soil"] = random_soil()

    return json.dumps(data) 

def main():
    producer = KafkaProducer(bootstrap_servers=['kafka:9092'])

    for _ in range(20000):
        json_data = get_json_data()
        producer.send('garden_sensor_data', bytes(f'{json_data}','UTF-8'))
        print(f"Sensor data is sent: {json_data}")
        time.sleep(5)

if __name__ == "__main__":
    main()

app/Dockerfile

此文件包含将‘garden_sensor_gateway.py’转换为容器的指令。

FROM python:3.11

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD ["python", "garden_sensor_gateway.py"]

app/requirements.txt

此文件包含kafka-python库。

kafka-python

资源

创建一个名为“resources”的新文件夹。此文件夹不在app文件夹内。

以下文件位于resources文件夹内。

resources/docker-compose.yaml

docker-compose.yaml 包含构建 kafkazookeeperpython garden_sensor_gatewaytelegraf 容器所需的所有信息。

version: '3.3'

services:
  kafka:
    container_name: kafka
    image: quay.io/strimzi/kafka:0.28.0-kafka-3.1.0
    command: [
      "sh", "-c",
      "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
    ]
    depends_on:
    - zookeeper
    ports:
    - "9092:9092"
    environment:
      LOG_DIR: "/tmp/logs"
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    healthcheck:
      test: ["CMD", "nc", "-z", "kafka", "9092"]
      interval: 30s
      timeout: 10s
      retries: 5

  zookeeper:
    container_name: zookeeper
    image: quay.io/strimzi/kafka:0.28.0-kafka-3.1.0
    command: [
        "sh", "-c",
        "bin/zookeeper-server-start.sh config/zookeeper.properties"
      ]
    ports:
    - "2181:2181"
    environment:
      LOG_DIR: /tmp/logs

  garden_sensor_gateway:
    container_name: garden_sensor_gateway
    image: garden_sensor_gateway
    build: ../app
    # networks:
    #  - my_network
    # command: ["tail", "-f", "/dev/null"]
    depends_on:
      kafka:
       condition: service_healthy

  telegraf:
    container_name: telegraf
    image: telegraf:latest
    command: ["telegraf", "--debug", "--config", "/etc/telegraf/telegraf.conf"]
    volumes:
      - ./mytelegraf.conf:/etc/telegraf/telegraf.conf
    depends_on:
      kafka:
       condition: service_healthy

Telegraf 命令不特定于本例。与本项目的大部分内容一样,您可以使用此容器配置用于其他项目。第一个命令是 “telegraf” 命令。“—debug” 标志在终端中提供代理的工作细节。然后,必须在配置之前包含 “—config” 标志,该配置为 "/etc/telegraf/telegraf.conf"。

resources/mytelegraf.conf

mytelegraf.conf 是 Telegraf 配置 TOML 文件。与其他文件不同,Telegraf 设置直接来自您的 InfluxDB 云无服务器账户。

让我们跳转到我们的 InfluxDB 云无服务器账户。登录 InfluxDB 云无服务器后,第一步是设置一个存储桶。将存储桶想象成数据库中的数据库。Telegraf 将从 Kafka 主题读取数据并发送到 InfluxDB 云无服务器中指定的存储桶。

在资源中心选择“管理数据库和安全”,或在页面左侧图标菜单中的向上箭头处悬停。

InfluxDB Cloud Serverless Resource Center

页面右侧的蓝色“创建存储桶”按钮是您可以创建存储桶的地方。在弹出窗口中创建您的存储桶。

Create Bucket Button

弹出窗口关闭后,“Telegraf”将出现在“加载数据”标题下的突出显示的“存储桶”旁边。选择“Telegraf”。这将开始 mytelegraf.conf 设置。

第1部分:InfluxDB 输出插件

在页面中间/右侧,您会看到“InfluxDB 输出插件”。这是 mytelegraf.conf 连接到 InfluxDB 云无服务器的方式。复制到剪贴板,并将整个文件添加到 mytelegraf.conf 中。

下一步是创建一个令牌。在本教程中,我们将创建一个全访问 API 令牌。在“Telegraf”标题旁边是“API 令牌”标题。生成 API 令牌。您可以选择创建环境变量或将内容粘贴到令牌字符串中(这不是最安全的方法)。

urls 使您连接到您的 InfluxDB 云无服务器账户。《code>organization 是组织名称。这是与账户一起设置的。《code>bucket 是您希望发送数据的存储桶。

第2部分:输入

导航回 InfluxDB 云无服务器账户中的“Telegraf”。在存储桶下拉菜单中选择您的存储桶。我们要查找的数据源选项是“Kafka 消费者”。我们不是创建配置,而是复制代码并将其粘贴到 mytelegraf.conf 文件中。

Telegraf Input Configuration

代码需要一些修改。brokers 行必须与发送数据的代理匹配。在本例中,localhost:9092 需要更改为 kafka:9092

接下来,调整主题以匹配代码文件中的 Kafka 主题。最后,代码的最后一行是 data_format = “influx”。这意味着 InfluxDB 期望行协议。然而,python 函数以 JSON 格式发送数据。这里的修复是将 data_format = “json”

最终的 mytelegraf.conf 文件将非常类似于这个

[[outputs.influxdb_v2]]
  ## The URLs of the InfluxDB cluster nodes.
  ##
  ## Multiple URLs can be specified for a single cluster, only ONE of the
  ## urls will be written to each interval.
  ##   ex: urls = ["https://us-west-2-1.aws.cloud2.influxdata.com"]
  urls = ["https://your-cloud-url"]

  ## API token for authentication.
  token = "token here"

  ## Organization is the name of the organization you wish to write to; must exist.
  organization = "Testing"

  ## Destination bucket to write into.
  bucket = "kafkaRebuild"

# Read metrics from Kafka topics
[[inputs.kafka_consumer]]
  ## Kafka brokers.
  brokers = ["kafka:9092"]

  ## Topics to consume.
  topics = ["garden_sensor_data"]

  ## When set this tag will be added to all metrics with the topic as the value.
  # topic_tag = ""

  ## Optional Client id
  # client_id = "Telegraf"

  ## Set the minimal supported Kafka version.  Setting this enables the use of new
  ## Kafka features and APIs.  Must be 0.10.2.0 or greater.
  ##   ex: version = "1.1.0"
  # version = ""

  ## Optional TLS Config
  # tls_ca = "/etc/telegraf/ca.pem"
  # tls_cert = "/etc/telegraf/cert.pem"
  # tls_key = "/etc/telegraf/key.pem"
  ## Use TLS but skip chain & host verification
  # insecure_skip_verify = false

  ## SASL authentication credentials.  These settings should typically be used
  ## with TLS encryption enabled
  # sasl_username = "kafka"
  # sasl_password = "secret"

  ## Optional SASL:
  ## one of: OAUTHBEARER, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI
  ## (defaults to PLAIN)
  # sasl_mechanism = ""

  ## used if sasl_mechanism is GSSAPI (experimental)
  # sasl_gssapi_service_name = ""
  # ## One of: KRB5_USER_AUTH and KRB5_KEYTAB_AUTH
  # sasl_gssapi_auth_type = "KRB5_USER_AUTH"
  # sasl_gssapi_kerberos_config_path = "/"
  # sasl_gssapi_realm = "realm"
  # sasl_gssapi_key_tab_path = ""
  # sasl_gssapi_disable_pafxfast = false

  ## used if sasl_mechanism is OAUTHBEARER (experimental)
  # sasl_access_token = ""

  ## SASL protocol version.  When connecting to Azure EventHub set to 0.
  # sasl_version = 1

  # Disable Kafka metadata full fetch
  # metadata_full = false

  ## Name of the consumer group.
  # consumer_group = "telegraf_metrics_consumers"

  ## Compression codec represents the various compression codecs recognized by
  ## Kafka in messages.
  ##  0 : None
  ##  1 : Gzip
  ##  2 : Snappy
  ##  3 : LZ4
  ##  4 : ZSTD
  # compression_codec = 0
  ## Initial offset position; one of "oldest" or "newest".
  # offset = "oldest"

  ## Consumer group partition assignment strategy; one of "range", "roundrobin" or "sticky".
  # balance_strategy = "range"

  ## Maximum length of a message to consume, in bytes (default 0/unlimited);
  ## larger messages are dropped
  max_message_len = 1000000

  ## Maximum messages to read from the broker that have not been written by an
  ## output.  For best throughput set based on the number of metrics within
  ## each message and the size of the output's metric_batch_size.
  ##
  ## For example, if each message from the queue contains 10 metrics and the
  ## output metric_batch_size is 1000, setting this to 100 will ensure that a
  ## full batch is collected and the write is triggered immediately without
  ## waiting until the next flush_interval.
  # max_undelivered_messages = 1000

  ## Maximum amount of time the consumer should take to process messages. If
  ## the debug log prints messages from sarama about 'abandoning subscription
  ## to [topic] because consuming was taking too long', increase this value to
  ## longer than the time taken by the output plugin(s).
  ##
  ## Note that the effective timeout could be between 'max_processing_time' and
  ## '2 * max_processing_time'.
  # max_processing_time = "100ms"

  ## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "json"

将数据写入 InfluxDB 云无服务器

代码文件的最后一步是启动容器并将数据写入 InfluxDB 云无服务器。在新终端窗口中,cd 到 resources 文件夹并运行命令 docker compose up --build。容器相互依赖,因此请等待几分钟,然后再检查数据。

如果 Telegraf 正确地将数据读取和写入到 InfluxDB 云无服务器,它将在终端中显示以下消息。

Telegraf Success Logs

查询数据

回到InfluxDB Cloud Serverless账户,您可以通过左侧图标菜单中的图形图标访问数据探索器。选择存储桶和测量,以及您要查询的时间窗口,然后运行查询。

InfluxDB Cloud Serverless Data Explorer

将您的Kafka集群连接到InfluxDB Cloud Serverless真的非常简单。

继续处理时间序列数据

这只是您可以用InfluxDB Cloud Serverless和Kafka做到的一小部分。为了在我们社区内继续工作,请访问我们的社区页面。此页面包括我们的Slack工作区链接以及更多帮助您入门的项目。

如果您准备好探索InfluxDB Cloud Serverless或我们的其他产品,如云专用集群式,您可以通过此处联系我们的销售团队。