Kafka、Telegraf 和 InfluxDB v3 入门
作者:Anais Dotis-Georgiou / 开发者, 入门
2024 年 10 月 15 日
导航至
在智能园艺领域,跟踪湿度、温度、风和土壤湿度等环境条件是确保植物茁壮成长的关键。但是,如何以高效且可扩展的方式将所有这些数据汇集在一起呢?强大的 Kafka、Telegraf 和 InfluxDB Cloud v3 三剑客应运而生。在本指南中,我们将引导您设置一个无缝管道,该管道通过 Telegraf 收集来自花园传感器的实时数据,通过 Kafka 流式传输,并将其存储在 InfluxDB 中以进行监控和分析。无论您是这些工具的新手还是希望扩展您的物联网工具包,本示例都将向您展示如何开始。本教程的对应代码仓库可以在这里找到。
要求和运行
在我们深入了解设置之前,需要先满足一些要求。首先,您需要在系统上安装 Docker 和 Docker Compose,因为该示例依赖于容器化服务来简化部署。您还应该拥有 InfluxDB Cloud v3 帐户,并准备好您的 URL、令牌、组织和存储桶信息。这些详细信息对于配置 Telegraf 将花园传感器数据写入 InfluxDB 至关重要。此外,请确保您已安装 Python,因为花园传感器网关脚本依赖于 Python 的 Kafka 包来模拟和发送传感器数据。最后,熟悉 Kafka、Telegraf 和 InfluxDB 的基本概念将有助于您更轻松地理解本文。
要运行此示例,请按照以下步骤操作
- 克隆项目并导航到目录。
- 打开 resources/mytelegraf.conf 文件,并插入您的 InfluxDB Cloud v3 URL、令牌、组织和存储桶名称。如果您愿意,也可以使用环境变量文件。
- 通过将“directory”更改为 resources 并运行命令
docker-compose up --build -d
来启动容器。 - 等待大约 30 秒,以便 Telegraf 初始化并开始写入指标。
- 一旦一切启动并运行,花园传感器网关将开始生成随机的湿度、温度、风和土壤数据,通过 Kafka 发送,并将其存储在您的 InfluxDB Cloud v3 实例中以进行监控和分析。
代码解释
在本节中,我们将分解示例的组件,并解释每个部分如何组合在一起,以创建一个无缝的数据管道,用于使用 Kafka、Telegraf 和 InfluxDB Cloud v3 监控花园传感器数据。
1. app/Dockerfile
app 目录中的 Dockerfile 负责创建一个容器化环境来运行 garden_sensor_gateway.py 脚本。
2. app/garden_sensor_gateway.py
import time
import json
import random
from kafka import KafkaProducer
def random_temp_cels():
return round(random.uniform(-10, 50), 1)
def random_humidity():
return round(random.uniform(0, 100), 1)
def random_wind():
return round(random.uniform(0, 10), 1)
def random_soil():
return round(random.uniform(0, 100), 1)
def get_json_data():
data = {}
data["temperature"] = random_temp_cels()
data["humidity"] = random_humidity()
data["wind"] = random_wind()
data["soil"] = random_soil()
return json.dumps(data)
def main():
producer = KafkaProducer(bootstrap_servers=['kafka:9092'])
for _ in range(20000):
json_data = get_json_data()
producer.send('garden_sensor_data', bytes(f'{json_data}','UTF-8'))
print(f"Sensor data is sent: {json_data}")
time.sleep(5)
if __name__ == "__main__":
main()
这个 Python 脚本模拟花园传感器数据并将其发送到 Kafka 主题。让我们看看它是如何工作的
- 导入库:该脚本从 kafka-python 包导入必要的库,如 time、json、random 和 KafkaProducer。
- 数据生成函数:诸如 random_temp_cels()、random_humidity()、random_wind() 和 random_soil() 等函数分别生成温度、湿度、风和土壤湿度的随机值。这些值四舍五入到小数点后一位,以模拟真实的传感器读数。
- 数据格式化:get_json_data() 函数将这些生成的值收集到一个字典中,并使用 json.dumps(data) 将其转换为 JSON 字符串。
- Kafka 生产者:main() 函数使用 KafkaProducer(bootstrap_servers=['kafka:9092']) 初始化一个 Kafka 生产者,将其指向在容器中运行的 Kafka Broker。然后,它进入一个循环,在循环中生成传感器数据,将其发送到 Kafka 主题 garden_sensor_data,并将数据打印到控制台。循环运行 20,000 次,每次迭代之间延迟 5 秒。
3. resources/docker-compose.yml
resources 目录中的 docker-compose.yml 文件定义了项目所需的服务,协调 Kafka、Zookeeper、Telegraf 和花园传感器网关的容器。以下是每个服务的作用
- Kafka 和 Zookeeper:这些服务设置 Kafka Broker 和 Zookeeper,Kafka 依赖 Zookeeper 进行分布式协调。Kafka 在端口 9092 上公开,Zookeeper 在端口 2181 上公开。
- 花园传感器网关:此服务使用 app 目录中的 Dockerfile 为 garden_sensor_gateway.py 脚本构建容器。它依赖于 Kafka,以确保在脚本开始运行之前 Kafka 已启动并运行正常。
- Telegraf:Telegraf 服务配置为从 Kafka 主题 garden_sensor_data 消费消息,并将它们写入 InfluxDB Cloud v3。Telegraf 配置文件 mytelegraf.conf 被挂载到容器中,以提供必要的设置。
4. resources/mytelegraf.conf
[[inputs.kafka_consumer]]
## Kafka brokers.
brokers = ["kafka:9092"]
## Topics to consume.
topics = ["garden_sensor_data"]
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "json"
此配置文件用于设置 Telegraf 以处理花园传感器数据:``
- InfluxDB 输出:[[outputs.influxdb_v2]] 部分配置 Telegraf 将数据写入 InfluxDB Cloud v3。您必须将占位符替换为您的 InfluxDB URL、令牌、组织和存储桶详细信息。
- Kafka 消费者输入:[[inputs.kafka_consumer]] 部分配置 Telegraf 订阅 Kafka 上的 garden_sensor_data 主题。它消费 JSON 格式的传感器数据,这些数据被发送到 InfluxDB 以进行存储和分析。
总而言之,这些组件创建了一个强大的管道,其中花园传感器数据被生成、发送到 Kafka、由 Telegraf 处理并存储在 InfluxDB Cloud v3 中,使您能够实时监控您的花园环境。
结论
这篇博文介绍了如何开始使用 InfluxDB、Kafka 和 Telegraf。一个 Python 脚本生成花园数据并将其发送到 Kafka 主题,Telegraf 从 Kafka 主题读取数据并将其写入 InfluxDB。与往常一样,请在此处开始使用 InfluxDB v3 Cloud。在下一篇文章中,我们将介绍如何运行该项目,深入探讨架构和逻辑,并讨论所选技术栈的一些优缺点。如果您需要帮助,请在我们的社区网站或 Slack 频道上联系我们。如果您也在使用 InfluxDB 进行数据管道项目,我很乐意听到您的消息!