使用Python、Go和JavaScript客户端库将JSON数据导入InfluxDB

导航到

设备、开发人员、应用程序和服务每天都会产生和利用大量的JSON数据。其中一部分数据包含时间戳事件或指标,非常适合存储和分析在InfluxDB中。为了帮助开发人员构建未来的应用程序,InfluxDB提供了几种简单地将JSON数据导入InfluxDB的方法。

本文将演示如何将JSON数据转换为行协议,并使用InfluxDB客户端库将其发送到InfluxDB。

什么是JSON和行协议?

JSON,或JavaScript对象表示法,是一种基于文本的数据格式,由属性值对和数组组成,可由人类和机器读取。这种格式源于无状态服务器到浏览器通信的需求。通常与Web上的REST服务(如API)相关联,如今在许多场景中广泛用于发送和接收数据。

以下是一个来自虚构的天气传感器的JSON数据点的示例,其中包含各种传感器读数

{
    "location": {
        "lat": "43.56704 N",
        "lon": "116.24053 W",
        "elev": 2822
    },
    "name": "KBOI",
    "sensors": {
        "temperature": 48,
        "dew_point": 34,
        "humidity": 61,
        "wind": 9,
        "direction": "NW",
        "pressure": 30.23
    ],
    "updated": "25 Oct 12:05 PM MDT"
}

为了将此数据导入InfluxDB,以下工具展示了如何将这些行转换为行协议。 行协议 由以下项目组成

  • 测量名称:要存储和稍后查询的数据名称
  • 字段:将要查询的实际数据
  • 标签:可选的字符串字段,用于索引和帮助过滤数据
  • 时间戳:可选;如果未提供,则使用当前时间戳

使用上面的JSON示例,目标是将数据转换为以下行格式

weather,station=KBOI dew_point=34i,direction="NW",humidity=61i,pressure=30.23,temperature=48i,wind=9i 1666721100000000000

客户端库

《InfluxDB 客户端库》(点击查看) 是针对特定语言的包,与 InfluxDB v2 API 集成。这些库为开发者提供了强大的发送、查询和管理 InfluxDB 的方法。您可以查看此处的TL;DR,了解客户端库的概述。这些库支持多种语言,包括 PythonJavaScriptGoC#Java 等。

客户端库为开发者提供了将读取和转换后的数据快速发送到 InfluxDB 的工具。这样,开发者可以在他们最舒适或问题空间所需的语言中工作,同时有一个直接的方法将数据注入 InfluxDB。

本篇文章的其余部分将演示如何使用 Python、Go 和 JavaScript 客户端库读取上述示例中的 JSON 数据。

Python

Python 语言和生态系统展示了出色的数据解析和清洗方式。用户构建基于 Python 的工具来处理各种用例中的数据,从物联网设备到企业应用程序。简单的语法和强大的库使其成为将数据导入 InfluxDB 的首选。我们有一系列视频介绍 Python 客户端库(点击查看)

以下示例演示了如何将天气 JSON 数据作为文件读取到 Python 字典中,生成数据点,并将其发送到 InfluxDB。

#!/usr/bin/env python3
"""Example to read JSON data and send to InfluxDB."""

from dateutil import parser
import json

from influxdb_client import InfluxDBClient, Point, WriteOptions
from influxdb_client.client.write_api import SYNCHRONOUS

point = Point("weather")
with open("../data/weather.json", "r") as json_file:
    data = json.load(json_file)

    point.tag("station", data["name"])
    point.time(parser.parse(data["updated"]))
    for key, value in data["sensors"].items():
        point.field(key, value)

with InfluxDBClient.from_config_file("config.toml") as client:
    with client.write_api(write_options=SYNCHRONOUS) as writer:
        try:
            writer.write(bucket="my-bucket", record=[point])
        except InfluxDBError as e:
            print(e)

此示例使用 Point 辅助器。这个辅助器允许开发者通过提供的辅助函数快速、轻松地生成与行协议相符的数据。它创建天气数据点,然后根据站名设置一个标签,使用数据的时间戳作为时间戳,然后取所有传感器读数并作为字段。

使用 Point 辅助器是写入数据的方式之一。开发者可以用 Python 字典的形式发送数据,指定度量名称、标签、字段和时间戳,或者生成简单的字符串。有关更多示例,请查看深入 WriteAPI 的博客文章。

Go

Go 语言以其高性能、可读性和效率而闻名。虽然它最初因服务器端、后端开发而流行,但现在 Go 被用于 CLI 应用程序、物联网设备和科学应用。

在此示例中,创建了一个 Weather 结构体,以便轻松解析天气站数据。然后使用 NewPoint 辅助器创建数据点,并将其最终发送到 InfluxDB。

package main

import (
	"encoding/json"
	"os"
	"time"

	influxdb2 "github.com/influxdata/influxdb-client-go/v2"
)

type Weather struct {
	Name      string         `json:"name"`
	Timestamp string         `json:"updated"`
	Location  Location       `json:"location"`
	Sensors   map[string]any `json:"sensors"`
}

type Location struct {
	Lat  string `json:"lat"`
	Lon  string `json:"lon"`
	Elev int    `json:"elev"`
}

type Sensor struct {
	Temperature int     `json:"temperature"`
	DewPoint    int     `json:"dew_point"`
	Humidity     int    `json:"humidity"`
	Wind        int     `json:"wind"`
	Direction   string  `json:"direction"`
	Pressure    float64 `json:"pressure"`
}

func main() {
	bytes, err := os.ReadFile("../../data/weather.json")
	if err != nil {
		panic(err)
	}

	data := Weather{}
	if err := json.Unmarshal(bytes, &data); err != nil {
		panic(err)
	}

	timestamp, err := time.Parse(
            "2006 02 Jan 03:04 PM -0700", 
            data.Timestamp
      )
	if err != nil {
		panic(err)
	}

	p := influxdb2.NewPoint(
		"weather",
		map[string]string{
			"station": data.Name,
		},
		data.Sensors,
		timestamp,
	)

	client := influxdb2.NewClient("https://127.0.0.1:8086/", "my-token")
	writer := client.WriteAPI("my-org", "my-bucket")
	writer.WritePoint(p)
	client.Close()
}

此示例读取 JSON 文件,然后使用内部解析方法解析数据,并使用 Go 标准时间格式解析时间戳。

在此例中,NewPoint 辅助器允许用户通过 WritePoint() 函数快速、轻松地指定发送到 InfluxDB 的行协议的各个部分。开发者也可以使用 WriteRecord() 函数编写行协议字符串。

JavaScript/Node.js

JavaScript是网络的核心技术。随着Node.js的发布,用户可以在除浏览器之外的地方运行JavaScript。因此,JavaScript的使用从主要网站扩展到服务器端、移动和其他用户应用,增长显著。

此示例是一个Node.js脚本,类似于其他示例,它读取文件,创建一个点,并将其发送到InfluxDB。您可以根据许多其他场景对其进行调整。

#!/usr/bin/env node

import {readFileSync} from 'fs'

import {InfluxDB, Point, HttpError} from '@influxdata/influxdb-client'
import {url, token, org, bucket} from './env.mjs'

const data = JSON.parse(readFileSync('../data/weather.json', 'utf8'))

const point = new Point('weather')
    .tag('station', data.name)
    .intField('temperature', data.sensors.temperature)
    .intField('dew_point', data.sensors.dew_point)
    .intField('humidity', data.sensors.humidity)
    .intField('wind', data.sensors.wind)
    .stringField('direction', data.sensors.direction)
    .floatField('pressure', data.sensors.pressure)
    .timestamp(new Date(data.updated))

const writeApi = new InfluxDB({url, token}).getWriteApi(org, bucket, 'ns')
writeApi.writePoint(point)

try {
    await writeApi.close()
} catch (e) {
    if (e instanceof HttpError) {
        console.error(e.statusCode)
        console.error(e.statusMessage)
    }
}

与Python类似,JavaScript使用Point辅助函数生成有效的行协议数据点。用户指定各种标签、字段及其相应的数据类型,并在写入InfluxDB之前指定时间戳。

今天就将您的JSON数据导入InfluxDB

本篇文章展示了InfluxDB客户端库如何快速、简单、灵活。虽然上述示例只展示了JSON数据,但它开始展示了开发者在向InfluxDB发送数据时所拥有的强大能力。结合其他API,开发者有更多选项和潜力。

考虑您可能在哪里使用InfluxDB客户端库,并尝试一下!