使用 Go InfluxDB v3 客户端库

导航至

在本教程中,我们将学习如何使用 Go InfluxDB v3 客户端库。InfluxDB 3.0 是 InfluxDB 的最新版本,与之前的版本相比,它具有 显著的性能改进。请务必查看我们最新的基准测试,其中包括以下亮点

  • 与 InfluxDB OSS 相比,InfluxDB 3.0 提供了 45 倍的写入吞吐量。
  • 与 InfluxDB OSS 相比,InfluxDB 3.0 具有 4.5 倍的存储压缩率。
  • 与 InfluxDB OSS 相比,InfluxDB 3.0 对于给定的硬件资源更有效率,并且可以将存储成本降低 >90%。
  • 与 InfluxDB OSS 相比,InfluxDB 3.0 查询在各种查询类型中,对于最近的数据 (5m) 快 2.5-45 倍。
  • 与 InfluxDB OSS 相比,InfluxDB 3.0 查询在各种查询类型中,对于过去 1 小时的时间范围快 5-25 倍。

Go InfluxDB v3 客户端库 是一个软件包,提供了一组工具和函数,用于使用 Go 编程语言与 InfluxDB 交互。它允许开发人员高效地从/向 InfluxDB 3.0 查询和写入时序数据,从而简化了 InfluxDB 与 Go 应用程序的集成。与 Go 客户端库的先前版本一样,v3 通过 /write API 端点实现写入。查询与以前的版本略有不同,因为它使用 Apache Arrow Flight 客户端库,并利用 Arrow Format 和 Flight gRPC 协议来提供高效的序列化和反序列化以及双向流式传输。

要求、安装和初始化

要学习本教程,您需要以下内容

将客户端包的最新版本添加到您的项目依赖项 (go.mod)

go get github.com/InfluxCommunity/influxdb3-go/influxdb3

要初始化客户端,您需要将您的包导入到您的 main.go 文件中并提供您的凭据

package main.go

import (
	"context"
	"fmt"
	"os"
	"time"

	"github.com/InfluxCommunity/influxdb3-go/influxdb3"
)
func main() {
	// Use env variables to initialize client
	url := os.Getenv("INFLUXDB_URL")
	token := os.Getenv("INFLUXDB_TOKEN")
	database := os.Getenv("INFLUXDB_DATABASE")

	// Create a new client using an InfluxDB server base URL and an authentication token
client, err := influxdb3.New(influxdb3.ClientConfig{
    Host: url,
    Token: token,
    Database: database,
})

使用 Go InfluxDB v3 客户端库写入数据

要将数据写入 InfluxDB,请使用行协议格式和数据库(或 bucket)名称调用 client.Write。行协议是 InfluxDB 的摄取格式。当将行协议点写入 InfluxDB 时,请使用标签在您的实例中存储元数据。字段包含您的实际时序值。但是,字段和标签都会转换为 InfluxDB 表中的列。实际上,这些列在数据库中的功能相同,因此这种区分仅用于用户的组织目的。数据是同步写入的。以下是将行协议写入 InfluxDB 的示例

line := "stat,unit=temperature avg=23.5,max=45.0"
err = client.Write(context.Background(), database, []byte(line))
if err != nil {
		panic(err)
	}

另一种方法是使用 client.WritePoints 方法写入 Point 对象。您还可以将点或行协议附加到数组,并通过将数组传递到 client.WritePoints 方法中来写入点数组到 InfluxDB。

// Create point using full params constructor
	p := influx.NewPoint("stat",
		map[string]string{"unit": "temperature"},
		map[string]interface{}{"avg": 24.5, "max": 45.0},
		time.Now())
	// write point synchronously
	err = client.WritePoints(context.Background(), database, p)
	if err != nil {
		panic(err)
	}

关于 upsert 的重要说明

您可以 upsert 字段,但不能 upsert 标签。例如,如果您添加第二个点(请注意字段值中添加了“2”),您将 upsert 这些字段值,并且您之前的值将被新的字段值覆盖

# Adding first point
line := "stat,unit=temperature avg=23.5,max=45.0 1690218372000000000"
err = client.Write(context.Background(), database, []byte(line))
if err != nil {
		panic(err)
	}

# Adding second point
line := "stat,unit=temperature avg=23.52,max=45.2 1690218378000000000"
err = client.Write(context.Background(), database, []byte(line))
if err != nil {
		panic(err)

但是,如果您添加第二个点(请注意标签中添加了“2”),您将不会 upsert 这些值。您只会添加更多标签值。

# Adding first point
line := "stat,unit=temperature avg=23.5,max=45.0 1690218372000000000"
err = client.Write(context.Background(), database, []byte(line))
if err != nil {
		panic(err)
	}

# Adding second point
line := "stat,unit=temperature2 avg=23.52,max=45.2 1690218378000000000"
err = client.Write(context.Background(), database, []byte(line))
if err != nil {
		panic(err)

使用 Go InfluxDB v3 客户端库查询数据

您可以使用 Go InfluxDB v3 客户端库使用 SQL 和 InfluxQL(InfluxDB 的类 SQL 查询语言,专为时序数据构建)查询 InfluxDB 3.0。

SQL

默认情况下,client.Query 方法使用 SQL 查询 InfluxDB。您只需构建您的 SQL 查询,并将其与您希望查询的数据库一起传递到该方法中。

query := `
			SELECT *
			FROM "stat"
			WHERE
			time >= now() - interval '5 minute'
			AND
			"unit" IN ('temperature')
	`

	iterator, err := client.Query(context.Background(), query)

	if err != nil {
		panic(err)
	}

	for iterator.Next() {
		value := iterator.Value()

		fmt.Printf("avg is %f\n", value["avg"])
		fmt.Printf("max is %f\n", value["max"])
	}

InfluxQL

您可以使用 client.QueryWithOptions 方法使用 InfluxQL 查询 InfluxDB。在这种方法中,您创建选项并将它们传递到该方法中。

influxQLQuery := "SHOW MEASUREMENTS"

	options := influxdb3.QueryOptions{
		QueryType: influxdb3.InfluxQL,
	}

	iterator, err := client.QueryWithOptions(context.Background(), &options, influxQLQuery)

	if err != nil {
		panic(err)
	}

	for iterator.Next() {
		value := iterator.Value()
		fmt.Printf("measurement is:", value["name"])
	}

最终想法

InfluxDB 是存储所有时序数据的绝佳工具。它构建在 Apache 生态系统之上,并利用 DataFusionArrowParquet 等技术来实现高效的写入、存储和查询。它还提供了与许多其他工具的互操作性,因此您可以利用它们来满足您的特定预测分析需求。Go 客户端库有助于支持这种互操作性。

此处开始使用 InfluxDB Cloud 3.0。如果您需要任何帮助,请使用我们的 社区站点Slack 频道联系我们。我很乐意了解您尝试实现的目标以及您希望 InfluxDB 拥有的功能。