使用 Python、Go 和 JavaScript 客户端库将 JSON 数据导入 InfluxDB

导航至

设备、开发者、应用程序和服务每天生成和使用大量的 JSON 数据。其中一部分数据由时间戳事件或指标组成,非常适合在 InfluxDB 中存储和分析。为了帮助开发者构建未来的应用程序,InfluxDB 提供了几种轻松将 JSON 数据导入 InfluxDB 的方法。

这篇博文演示了如何使用 InfluxDB 客户端库获取 JSON 数据,将其转换为行协议,并发送到 InfluxDB。

什么是 JSON 和行协议?

JSON,或 JavaScript 对象表示法,是一种基于文本的数据格式,由属性-值对和数组组成,人类和机器都可读取。该格式的出现源于对无状态服务器到浏览器通信的需求。它通常与 REST 服务(如 Web 上的 API)相关联,如今已广泛应用于许多场景中以发送和接收数据。

下面是一个虚构的 JSON 数据点示例,来自一个包含各种传感器读数的气象传感器

{
    "location": {
        "lat": "43.56704 N",
        "lon": "116.24053 W",
        "elev": 2822
    },
    "name": "KBOI",
    "sensors": {
        "temperature": 48,
        "dew_point": 34,
        "humidity": 61,
        "wind": 9,
        "direction": "NW",
        "pressure": 30.23
    ],
    "updated": "25 Oct 12:05 PM MDT"
}

为了将这些数据导入 InfluxDB,以下工具展示了如何将这些行转换为行协议。行协议由以下项目组成

  • 测量名称:要存储和稍后查询的数据的名称
  • 字段:将被查询的实际数据
  • 标签:可选的字符串字段,用于索引和帮助过滤数据
  • 时间戳:可选;如果未提供,则使用当前时间戳

使用上面的 JSON 示例,目标状态是将数据转换为以下行格式

weather,station=KBOI dew_point=34i,direction="NW",humidity=61i,pressure=30.23,temperature=48i,wind=9i 1666721100000000000

客户端库

InfluxDB 客户端库是特定于语言的软件包,与 InfluxDB v2 API 集成。这些库为开发者提供了发送、查询和管理 InfluxDB 的强大方法。查看此 TL;DR,以获得客户端库的精彩概述。这些库以多种语言提供,包括 PythonJavaScriptGoC#Java 等。

客户端库为开发者提供了快速发送读取和转换后的数据到 InfluxDB 所需的工具。这样,开发者可以使用他们最熟悉的特定语言或问题空间要求的语言进行工作。同时,他们还拥有将数据直接注入 InfluxDB 的方法。

本文的其余部分演示了如何使用 Python、Go 和 JavaScript 客户端库从上面的示例中读取 JSON 数据。

Python

Python 语言和生态系统展示了解析和清理数据的绝佳方式。用户构建基于 Python 的工具来处理许多不同用例中的数据,从物联网设备到企业应用程序。简单的语法和强大的库使其成为将数据导入 InfluxDB 的绝佳首选。我们有一个视频系列介绍了 Python 客户端库,请点击此处

以下示例演示了如何将天气 JSON 数据作为文件读取到 Python 字典中,生成数据点,并将该数据点发送到 InfluxDB

#!/usr/bin/env python3
"""Example to read JSON data and send to InfluxDB."""

from dateutil import parser
import json

from influxdb_client import InfluxDBClient, Point, WriteOptions
from influxdb_client.client.write_api import SYNCHRONOUS

point = Point("weather")
with open("../data/weather.json", "r") as json_file:
    data = json.load(json_file)

    point.tag("station", data["name"])
    point.time(parser.parse(data["updated"]))
    for key, value in data["sensors"].items():
        point.field(key, value)

with InfluxDBClient.from_config_file("config.toml") as client:
    with client.write_api(write_options=SYNCHRONOUS) as writer:
        try:
            writer.write(bucket="my-bucket", record=[point])
        except InfluxDBError as e:
            print(e)

此示例使用 Point 助手。此助手允许开发者使用提供的助手函数快速轻松地生成行协议中的数据。这会创建天气数据点,然后根据站点名称设置单个标签,使用数据的时间戳作为时间戳,然后获取所有传感器读数并将其设为字段。

使用 Point 助手是写入数据的多种方法之一。开发者可以拥有以 Python 字典形式发送的数据,该字典指定指标名称、标签、字段和时间戳,或生成一个简单的字符串。有关这些的更多示例,请查看 深入 WriteAPI 博文。

Go

Go 确立了自己作为一种高性能、可读且高效的语言的地位。虽然它最初因服务器端后端开发而广受欢迎,但 Go 也被用于 CLI 应用程序、物联网设备和科学应用程序。

在此示例中,创建了一个 Weather 结构来轻松地解组气象站数据。然后,使用 NewPoint 助手创建数据点,最后将其发送到 InfluxDB。

package main

import (
	"encoding/json"
	"os"
	"time"

	influxdb2 "github.com/influxdata/influxdb-client-go/v2"
)

type Weather struct {
	Name      string         `json:"name"`
	Timestamp string         `json:"updated"`
	Location  Location       `json:"location"`
	Sensors   map[string]any `json:"sensors"`
}

type Location struct {
	Lat  string `json:"lat"`
	Lon  string `json:"lon"`
	Elev int    `json:"elev"`
}

type Sensor struct {
	Temperature int     `json:"temperature"`
	DewPoint    int     `json:"dew_point"`
	Humidity     int    `json:"humidity"`
	Wind        int     `json:"wind"`
	Direction   string  `json:"direction"`
	Pressure    float64 `json:"pressure"`
}

func main() {
	bytes, err := os.ReadFile("../../data/weather.json")
	if err != nil {
		panic(err)
	}

	data := Weather{}
	if err := json.Unmarshal(bytes, &data); err != nil {
		panic(err)
	}

	timestamp, err := time.Parse(
            "2006 02 Jan 03:04 PM -0700", 
            data.Timestamp
      )
	if err != nil {
		panic(err)
	}

	p := influxdb2.NewPoint(
		"weather",
		map[string]string{
			"station": data.Name,
		},
		data.Sensors,
		timestamp,
	)

	client := influxdb2.NewClient("http://localhost:8086/", "my-token")
	writer := client.WriteAPI("my-org", "my-bucket")
	writer.WritePoint(p)
	client.Close()
}

此示例读取 JSON 文件,然后解组数据并解析时间戳。这是通过内部解析方法以及使用 Go 标准时间来解释格式的格式来完成的。

在本例中,NewPoint 助手允许用户快速轻松地指定发送到 InfluxDB 的行协议的各个部分,并使用 WritePoint() 函数。开发者还可以使用 WriteRecord() 函数编写行协议字符串。

JavaScript/Node.js

JavaScript 是 Web 的核心技术。随着 Node.js 的发布,用户可以在浏览器以外的地方运行 JavaScript。因此,JavaScript 的使用量从主要用于网站显着增长到包括服务器端、移动和其他用户应用程序。

此示例是一个 Node.js 脚本,与其他示例类似,它读取文件、创建 Point 并将其发送到 InfluxDB。您可以调整此脚本以满足许多其他场景

#!/usr/bin/env node

import {readFileSync} from 'fs'

import {InfluxDB, Point, HttpError} from '@influxdata/influxdb-client'
import {url, token, org, bucket} from './env.mjs'

const data = JSON.parse(readFileSync('../data/weather.json', 'utf8'))

const point = new Point('weather')
    .tag('station', data.name)
    .intField('temperature', data.sensors.temperature)
    .intField('dew_point', data.sensors.dew_point)
    .intField('humidity', data.sensors.humidity)
    .intField('wind', data.sensors.wind)
    .stringField('direction', data.sensors.direction)
    .floatField('pressure', data.sensors.pressure)
    .timestamp(new Date(data.updated))

const writeApi = new InfluxDB({url, token}).getWriteApi(org, bucket, 'ns')
writeApi.writePoint(point)

try {
    await writeApi.close()
} catch (e) {
    if (e instanceof HttpError) {
        console.error(e.statusCode)
        console.error(e.statusMessage)
    }
}

与 Python 类似,JavaScript 使用 Point 助手来生成有效的行协议数据点。用户在将数据写入 InfluxDB 之前,指定各种标签、字段及其各自的数据类型以及时间戳。

立即将您的 JSON 数据导入 InfluxDB

这篇文章展示了 InfluxDB 客户端库的快速、简便和灵活。虽然上面只演示了 JSON 数据,但它开始说明开发者在将数据发送到 InfluxDB 时拥有的强大功能。结合其他 API,开发者拥有更多选择和潜力。

考虑一下您可能可以在哪里使用 InfluxDB客户端库,并立即试用一下!