使用Python、Go和JavaScript客户端库将JSON数据导入InfluxDB
作者:Josh Powers / 产品
2022年11月23日
导航到
设备、开发人员、应用程序和服务每天都会产生和利用大量的JSON数据。其中一部分数据包含时间戳事件或指标,非常适合存储和分析在InfluxDB中。为了帮助开发人员构建未来的应用程序,InfluxDB提供了几种简单地将JSON数据导入InfluxDB的方法。
本文将演示如何将JSON数据转换为行协议,并使用InfluxDB客户端库将其发送到InfluxDB。
什么是JSON和行协议?
JSON,或JavaScript对象表示法,是一种基于文本的数据格式,由属性值对和数组组成,可由人类和机器读取。这种格式源于无状态服务器到浏览器通信的需求。通常与Web上的REST服务(如API)相关联,如今在许多场景中广泛用于发送和接收数据。
以下是一个来自虚构的天气传感器的JSON数据点的示例,其中包含各种传感器读数
{
"location": {
"lat": "43.56704 N",
"lon": "116.24053 W",
"elev": 2822
},
"name": "KBOI",
"sensors": {
"temperature": 48,
"dew_point": 34,
"humidity": 61,
"wind": 9,
"direction": "NW",
"pressure": 30.23
],
"updated": "25 Oct 12:05 PM MDT"
}
为了将此数据导入InfluxDB,以下工具展示了如何将这些行转换为行协议。 行协议 由以下项目组成
- 测量名称:要存储和稍后查询的数据名称
- 字段:将要查询的实际数据
- 标签:可选的字符串字段,用于索引和帮助过滤数据
- 时间戳:可选;如果未提供,则使用当前时间戳
使用上面的JSON示例,目标是将数据转换为以下行格式
weather,station=KBOI dew_point=34i,direction="NW",humidity=61i,pressure=30.23,temperature=48i,wind=9i 1666721100000000000
客户端库
《InfluxDB 客户端库》(点击查看) 是针对特定语言的包,与 InfluxDB v2 API 集成。这些库为开发者提供了强大的发送、查询和管理 InfluxDB 的方法。您可以查看此处的TL;DR,了解客户端库的概述。这些库支持多种语言,包括 Python、JavaScript、Go、C#、Java 等。
客户端库为开发者提供了将读取和转换后的数据快速发送到 InfluxDB 的工具。这样,开发者可以在他们最舒适或问题空间所需的语言中工作,同时有一个直接的方法将数据注入 InfluxDB。
本篇文章的其余部分将演示如何使用 Python、Go 和 JavaScript 客户端库读取上述示例中的 JSON 数据。
Python
Python 语言和生态系统展示了出色的数据解析和清洗方式。用户构建基于 Python 的工具来处理各种用例中的数据,从物联网设备到企业应用程序。简单的语法和强大的库使其成为将数据导入 InfluxDB 的首选。我们有一系列视频介绍 Python 客户端库(点击查看)。
以下示例演示了如何将天气 JSON 数据作为文件读取到 Python 字典中,生成数据点,并将其发送到 InfluxDB。
#!/usr/bin/env python3
"""Example to read JSON data and send to InfluxDB."""
from dateutil import parser
import json
from influxdb_client import InfluxDBClient, Point, WriteOptions
from influxdb_client.client.write_api import SYNCHRONOUS
point = Point("weather")
with open("../data/weather.json", "r") as json_file:
data = json.load(json_file)
point.tag("station", data["name"])
point.time(parser.parse(data["updated"]))
for key, value in data["sensors"].items():
point.field(key, value)
with InfluxDBClient.from_config_file("config.toml") as client:
with client.write_api(write_options=SYNCHRONOUS) as writer:
try:
writer.write(bucket="my-bucket", record=[point])
except InfluxDBError as e:
print(e)
此示例使用 Point
辅助器。这个辅助器允许开发者通过提供的辅助函数快速、轻松地生成与行协议相符的数据。它创建天气数据点,然后根据站名设置一个标签,使用数据的时间戳作为时间戳,然后取所有传感器读数并作为字段。
使用 Point 辅助器是写入数据的方式之一。开发者可以用 Python 字典的形式发送数据,指定度量名称、标签、字段和时间戳,或者生成简单的字符串。有关更多示例,请查看深入 WriteAPI 的博客文章。
Go
Go 语言以其高性能、可读性和效率而闻名。虽然它最初因服务器端、后端开发而流行,但现在 Go 被用于 CLI 应用程序、物联网设备和科学应用。
在此示例中,创建了一个 Weather
结构体,以便轻松解析天气站数据。然后使用 NewPoint
辅助器创建数据点,并将其最终发送到 InfluxDB。
package main
import (
"encoding/json"
"os"
"time"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
)
type Weather struct {
Name string `json:"name"`
Timestamp string `json:"updated"`
Location Location `json:"location"`
Sensors map[string]any `json:"sensors"`
}
type Location struct {
Lat string `json:"lat"`
Lon string `json:"lon"`
Elev int `json:"elev"`
}
type Sensor struct {
Temperature int `json:"temperature"`
DewPoint int `json:"dew_point"`
Humidity int `json:"humidity"`
Wind int `json:"wind"`
Direction string `json:"direction"`
Pressure float64 `json:"pressure"`
}
func main() {
bytes, err := os.ReadFile("../../data/weather.json")
if err != nil {
panic(err)
}
data := Weather{}
if err := json.Unmarshal(bytes, &data); err != nil {
panic(err)
}
timestamp, err := time.Parse(
"2006 02 Jan 03:04 PM -0700",
data.Timestamp
)
if err != nil {
panic(err)
}
p := influxdb2.NewPoint(
"weather",
map[string]string{
"station": data.Name,
},
data.Sensors,
timestamp,
)
client := influxdb2.NewClient("https://127.0.0.1:8086/", "my-token")
writer := client.WriteAPI("my-org", "my-bucket")
writer.WritePoint(p)
client.Close()
}
此示例读取 JSON 文件,然后使用内部解析方法解析数据,并使用 Go 标准时间格式解析时间戳。
在此例中,NewPoint
辅助器允许用户通过 WritePoint()
函数快速、轻松地指定发送到 InfluxDB 的行协议的各个部分。开发者也可以使用 WriteRecord()
函数编写行协议字符串。
JavaScript/Node.js
JavaScript是网络的核心技术。随着Node.js的发布,用户可以在除浏览器之外的地方运行JavaScript。因此,JavaScript的使用从主要网站扩展到服务器端、移动和其他用户应用,增长显著。
此示例是一个Node.js脚本,类似于其他示例,它读取文件,创建一个点,并将其发送到InfluxDB。您可以根据许多其他场景对其进行调整。
#!/usr/bin/env node
import {readFileSync} from 'fs'
import {InfluxDB, Point, HttpError} from '@influxdata/influxdb-client'
import {url, token, org, bucket} from './env.mjs'
const data = JSON.parse(readFileSync('../data/weather.json', 'utf8'))
const point = new Point('weather')
.tag('station', data.name)
.intField('temperature', data.sensors.temperature)
.intField('dew_point', data.sensors.dew_point)
.intField('humidity', data.sensors.humidity)
.intField('wind', data.sensors.wind)
.stringField('direction', data.sensors.direction)
.floatField('pressure', data.sensors.pressure)
.timestamp(new Date(data.updated))
const writeApi = new InfluxDB({url, token}).getWriteApi(org, bucket, 'ns')
writeApi.writePoint(point)
try {
await writeApi.close()
} catch (e) {
if (e instanceof HttpError) {
console.error(e.statusCode)
console.error(e.statusMessage)
}
}
与Python类似,JavaScript使用Point
辅助函数生成有效的行协议数据点。用户指定各种标签、字段及其相应的数据类型,并在写入InfluxDB之前指定时间戳。
今天就将您的JSON数据导入InfluxDB
本篇文章展示了InfluxDB客户端库如何快速、简单、灵活。虽然上述示例只展示了JSON数据,但它开始展示了开发者在向InfluxDB发送数据时所拥有的强大能力。结合其他API,开发者有更多选项和潜力。