从 Kafka、Telegraf 和 InfluxDB v3 入门
作者:Anais Dotis-Georgiou / 开发者,入门
2024 年 10 月 15 日
导航至
在智能园艺的世界里,跟踪湿度、温度、风速和土壤湿度等环境条件是确保您的植物茁壮成长的关键。但是,您如何以高效和可扩展的方式将所有这些数据汇总在一起呢?请进入强大的 Kafka、Telegraf 和 InfluxDB Cloud v3 三位一体。在本指南中,我们将带您设置一个无缝的管道,该管道从花园传感器实时收集数据,通过 Kafka 流式传输,并使用 Telegraf 存储到 InfluxDB 以进行监控和分析。无论您是这些工具的新手还是想扩展您的物联网工具包,这个示例将向您展示如何入门。本教程的相应存储库可以在这里找到。
要求和运行
在我们深入了解设置之前,需要满足一些要求。首先,您需要在您的系统上安装 Docker 和 Docker Compose,因为示例依赖于容器化服务以简化部署。您还应拥有一个 InfluxDB Cloud v3 账户,您的 URL、令牌、组织和个人存储桶信息应随时可用。这些详细信息对于配置 Telegraf 将花园传感器数据写入 InfluxDB 至关重要。此外,请确保您已安装 Python,因为花园传感器网关脚本依赖于 Python 的 Kafka 包来模拟和发送传感器数据。最后,熟悉 Kafka、Telegraf 和 InfluxDB 的基本概念将帮助您更容易地跟随。
要运行此示例,请按照以下步骤操作
- 克隆项目并导航到目录。
- 打开resources/mytelegraf.conf文件,并插入您的 InfluxDB Cloud v3 URL、令牌、组织和个人存储桶名称。您也可以使用环境文件。
- 通过将“目录”更改为resources并运行命令
docker-compose up --build -d
来启动容器。 - 等待大约 30 秒以使 Telegraf 初始化并开始写入指标。
- 一旦一切正常运行,花园传感器网关将开始生成随机的湿度、温度、风速和土壤数据,通过 Kafka 发送,并存储在您的 InfluxDB Cloud v3 实例中进行监控和分析。
代码解释
在本节中,我们将分解示例的组件,并解释每个组件如何组合在一起,以创建一个无缝的数据管道,用于使用 Kafka、Telegraf 和 InfluxDB Cloud v3 监控花园传感器数据。
1. app/Dockerfile
应用程序目录中的Dockerfile负责创建一个容器化环境来运行garden_sensor_gateway.py脚本。
2. app/garden_sensor_gateway.py
import time
import json
import random
from kafka import KafkaProducer
def random_temp_cels():
return round(random.uniform(-10, 50), 1)
def random_humidity():
return round(random.uniform(0, 100), 1)
def random_wind():
return round(random.uniform(0, 10), 1)
def random_soil():
return round(random.uniform(0, 100), 1)
def get_json_data():
data = {}
data["temperature"] = random_temp_cels()
data["humidity"] = random_humidity()
data["wind"] = random_wind()
data["soil"] = random_soil()
return json.dumps(data)
def main():
producer = KafkaProducer(bootstrap_servers=['kafka:9092'])
for _ in range(20000):
json_data = get_json_data()
producer.send('garden_sensor_data', bytes(f'{json_data}','UTF-8'))
print(f"Sensor data is sent: {json_data}")
time.sleep(5)
if __name__ == "__main__":
main()
这个Python脚本模拟花园传感器数据并将其发送到Kafka主题。让我们看看它是如何工作的。
- 导入库:脚本导入必要的库,如time、json、random和来自kafka-python包的KafkaProducer。
- 数据生成函数:random_temp_cels()、random_humidity()、random_wind()和random_soil()等函数分别生成温度、湿度、风速和土壤湿度的随机值。这些值四舍五入到小数点后一位,以模拟真实的传感器读数。
- 数据格式化:get_json_data()函数将这些生成的值收集到一个字典中,并使用json.dumps(data)将其转换为JSON字符串。
- Kafka生产者:main()函数使用KafkaProducer(bootstrap_servers=[‘kafka:9092’])初始化一个Kafka生产者,将其指向容器中运行的Kafka代理。然后它进入一个循环,生成传感器数据,将其发送到Kafka主题garden_sensor_data,并将数据打印到控制台。循环运行20,000次,每次迭代之间有5秒的延迟。
3. resources/docker-compose.yml
位于resources目录的docker-compose.yml文件定义了项目所需的服务,协调Kafka、Zookeeper、Telegraf和花园传感器网关的容器。以下是每个服务的作用:
- Kafka和Zookeeper:这些服务设置Kafka代理和Zookeeper,这是Kafka依赖的分布式协调服务。Kafka在9092端口暴露,Zookeeper在2181端口暴露。
- 花园传感器网关:此服务使用应用程序目录中的Dockerfile构建garden_sensor_gateway.py脚本的容器。它依赖于Kafka,以确保在脚本开始运行之前Kafka已启动且状态良好。
- Telegraf:Telegraf服务配置为从Kafka主题garden_sensor_data接收消息并将其写入InfluxDB Cloud v3。Telegraf配置文件mytelegraf.conf被挂载到容器中,以提供必要的设置。
4. resources/mytelegraf.conf
[[inputs.kafka_consumer]]
## Kafka brokers.
brokers = ["kafka:9092"]
## Topics to consume.
topics = ["garden_sensor_data"]
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "json"
此配置文件是Telegraf设置以处理花园传感器数据的地方:``
- InfluxDB输出:[[outputs.influxdb_v2]]部分配置Telegraf将数据写入InfluxDB Cloud v3。您必须用您的InfluxDB URL、令牌、组织和个人存储桶详细信息替换占位符。
- Kafka消费者输入:[[inputs.kafka_consumer]]部分配置Telegraf订阅Kafka上的garden_sensor_data主题。它消费JSON格式的传感器数据,并将其发送到InfluxDB进行存储和分析。
这些组件共同创建了一个强大的管道,其中花园传感器数据被生成、发送到Kafka、由Telegraf处理,并存储在InfluxDB Cloud v3中,允许您实时监控花园的环境。
结论
这篇博客文章介绍了如何开始使用InfluxDB、Kafka和Telegraf。一个Python脚本生成花园数据并发送到Kafka主题,Telegraf从Kafka主题读取数据并将其写入InfluxDB。一如既往,您可以从此处开始使用InfluxDB v3 Cloud。在下一篇文章中,我们将介绍如何运行项目,深入探讨架构和逻辑,并讨论所选堆栈的一些优缺点。如果您需要帮助,请通过我们的社区网站或Slack频道联系我们。如果您也在使用InfluxDB进行数据处理项目,我很乐意听到您的反馈!