Go 和 InfluxDB 入门

导航至

本文由 Alexandre Couëdelo 撰写,最初由 The New Stack 发布。向下滚动查看作者的照片和简介。 

诸如 PostgreSQL 或 MongoDB 之类的传统数据库非常擅长以表格或文档格式安全地保存系统状态,但是对于时间相关的数据(系统指标、物联网设备测量或应用程序状态更改)呢?

对于这些情况,您需要一种更合适的数据库类型,一种旨在更好地管理具有时间特征的半结构化数据的数据库。

InfluxDB 是一个专门为时间序列数据编写的高性能数据存储。InfluxData 不仅提供数据库,还提供用于摄取、转换和可视化数据的工具。例如,Telegraf 提供了 200 多个插件来摄取数据。但是,如果您想将 InfluxDB 直接集成到您的后端应用程序中,则需要使用专用的 客户端库

本教程将引导您了解如何使用 InfluxDB Go 客户端库,创建与数据库的连接以及存储和查询数据。

InfluxDB 入门

您即将将 InfluxDB 添加到您的应用程序堆栈中。在本教程结束时,您将获得一个代码库,说明如何将 Go 应用程序与 InfluxDB 接口。但首先,让我们为这个演示创建一个上下文。

您正在设计一款新的智能恒温器物联网产品。您会收到来自物联网传感器的频繁温度测量值。假设您将温度测量值存储在 InfluxDB 数据库中。此外,您的用户可以使用您的应用程序调整智能恒温器的温度。每次用户更改恒温器时,您都会在您的传统数据库中更新恒温器的状态。

此外,您希望保留所有恒温器温度设置的历史记录以及温度测量值。温度设置和测量值一起使您能够分析用户行为。有了这些数据,您以后可以通过在用户采取行动之前预测变化,使您的智能恒温器更加智能。

本教程的要求

本教程与操作系统无关,并假设您已安装 GO 1.16+Docker

我选择了 Docker 安装,因为它最适合持续集成。但是,InfluxDB 支持多种平台 (Linux、macOS、Windows、Docker、Kubernetes)。

启动本地数据库

要开始设置,您需要定义一个 docker-compose.yml 文件,该文件定义 Docker 中 InfluxDB 的配置。

services:
    influxdb:
        image: influxdb:2.0.7
        environment:
            DOCKER_INFLUXDB_INIT_MODE: setup
            DOCKER_INFLUXDB_INIT_USERNAME: ${INFLUXDB_USERNAME}
            DOCKER_INFLUXDB_INIT_PASSWORD: ${INFLUXDB_PASSWORD}
            DOCKER_INFLUXDB_INIT_ORG: iot
            DOCKER_INFLUXDB_INIT_BUCKET: users_business_events
            DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: ${INFLUXDB_TOKEN}
        ports:
        - "8086:8086"

运行 docker-compose up 以启动数据库。出于测试目的,请在名为 test_influxdb.env 的文件中定义您的环境变量。以下是我的 test_influxdb.env 的内容

docker-compose --env-file test_influxdb.env up

INFLUXDB_USERNAME=admin
INFLUXDB_PASSWORD=admin1234
INFLUXDB_TOKEN=F-QFQpmCL9UkR3qyoXnLkzWj03s6m4eCvYgDl1ePfHBf9ph7yxaSgQ6WN0i9giNgRTfONwVMK1f977r_g71oNQ==
INFLUXDB_URL="http://localhost:8086"

您需要将标志 --env-file 添加到您的 docker-compose 命令中,以强制 Docker 将该文件考虑在内。

docker-compose --env-file test_influxdb.env up

InfluxDB 配备了易于使用的 UI。访问 http://localhost:8086 并查看。

InfluxDB UI - load data

熟悉 InfluxDB

InfluxDB 不仅仅是一个时间序列数据库。它是 influxdata 生态系统背后的核心元素。这个生态系统包括

  • UI。Web 界面提供管理界面、开发工具,例如查询构建器和数据可视化。
  • 命令行界面 (CLI) influxinflux CLI 可以管理数据库、从 CSV 加载数据、插入数据和查询数据。它是开发和调试应用程序的好帮手。

Go 客户端库入门

您的数据库已准备就绪,所以让我们编写一些 Go 代码。初始化您的 Go 模块。

go mod init github.com/xNok/Getting-Started-with-Go-and-InfluxDB

influxdb-client-go 作为依赖项添加到您的项目中。

go get github.com/influxdata/influxdb-client-go/v2

建立连接

让我们创建一个帮助您连接到数据库的函数。您已经在 test_influxdb.env 中使用变量 INFLUXDB_TOKEN 定义了一个令牌。您将使用此令牌进行测试。您还可以通过 UI 创建一个新令牌。您的 InfluxDB 实例应仍在运行。

  1. 返回 UI 并生成一个新的身份验证令牌。
  2. 点击数据
  3. 客户端库中,选择Go

本节允许您创建身份验证令牌,并提供一些代码片段以开始使用 Go 库。

Getting Started with Go and InfluxDB

这是您旨在创建的函数

// Connect to an Influx Database reading the credentials from
// environment variables INFLUXDB_TOKEN, INFLUXDB_URL
// return influxdb Client or errors
func connectToInfluxDB() (influxdb2.Client, error) {

}

接下来,创建测试函数。当您调用 connectToInfluxDB 时,您将成功连接到数据库,并且可以通过调用 influxdb2.Client 中的 Health 方法来验证这一点。如您所见,我使用了 godotenv.Load("../test_influxdb.env") 来获取您为 Docker 中的 InfluxDB 定义的凭据。(您需要将 godotenv 作为依赖项添加到您的项目中)。

func Test_connectToInfluxDB(t *testing.T) {

    //load environment variable from a file for test purposes
    godotenv.Load("../test_influxdb.env")

    tests := []struct {
        name string
        wantErr bool
    }{
        {
            name:    "Successful connection to InfluxDB",
            wantErr: false,
        },
    }
    for _, tt := range tests {
        t.Run(tt.name, func(t *testing.T) {
            got, err := ConnectToInfluxDB()
            if (err != nil) != tt.wantErr {
                t.Errorf("ConnectToInfluxDB() error = %v, wantErr %v", err, tt.wantErr)
                return
            }
            health, err := got.Health(context.Background())
            if (err != nil) && health.Status == domain.HealthCheckStatusPass {
                t.Errorf("connectToInfluxDB() error. database not healthy")
                return
            }
            got.Close()
        })
    }
}

当涉及到创建连接时,调用 influxdb2 客户端初始化构造函数。包括从环境变量读取凭据和验证 connectToInfluxDB 的代码看起来像这样

// Connect to an Influx Database reading the credentials from
// environement variables INFLUXDB_TOKEN, INFLUXDB_URL
// return influxdb Client or errors
func ConnectToInfluxDB() (influxdb2.Client, error) {

    dbToken := os.Getenv("INFLUXDB_TOKEN")
    if dbToken == "" {
        return nil, errors.New("INFLUXDB_TOKEN must be set")
    }

    dbURL := os.Getenv("INFLUXDB_URL")
    if dbURL == "" {
        return nil, errors.New("INFLUXDB_URL must be set")
    }

    client := influxdb2.NewClient(dbURL, dbToken)

    // validate client connection health
    _, err := client.Health(context.Background())

    return client, err
}

如果您通过了测试,您就可以使用 InfluxDB 实现一些功能了。但是,您尚未准备好投入生产环境。

强烈建议在生产环境中启用 SSL/TLS 加密。在本教程中您不需要它,因为您使用的是本地 Docker 环境。

在您的应用程序中,您需要将证书传递给您的 InfluxDB 客户端。

// read the certificate
cer, _ := tls.LoadX509KeyPair("server.crt", "server.key")
// define the certificate
client := influxdb2.NewClientWithOptions(dbURL, dbToken, 
    influxdb2.DefaultOptions().
        SetTLSConfig(&tls.Config{
            Certificates: []tls.Certificate{cer}
        }))

插入数据

第一步:对您的数据进行建模。您的要求是将“恒温器设置”中的更改发送到 InfluxDB。该设置包含用户的标识符以及房间内所需的平均和最高温度。

type ThermostatSetting struct {
    user string
    max  float64 //temperature
    avg  float64 //temperature
}

第二步:编写一个测试函数。您可以使用几种可能的方式来插入数据,以找到最适合您的一种。

InfluxDB Go 客户端库提供了三种插入数据的方式

  • Line protocol 使用基于文本的数据库查询。
  • 带有构造函数的数据点使用映射来填充数据。
  • 具有流畅风格的数据点使用构建器模式。

这是一个为此目的而制作的通用测试函数

func Test_write_event_with_line_protocol(t *testing.T) {
    tests := []struct {
        name  string
        f     func(influxdb2.Client, []ThermostatSetting)
        datas []ThermostatSetting 
    }{
        {
            name:  "Write new record with line protocol",
            // Your data Points
            datas: []ThermostatSetting{{user: "foo", avg: 35.5, max: 42}},
            f: func(c influxdb2.Client, datas []ThermostatSetting) {
                // Send all the data to the DB
                for _, data := range datas {
                    write_event_with_line_protocol(c, data)
                }
            },
        },
    }
    for _, tt := range tests {
        t.Run(tt.name, func(t *testing.T) {
            // helper to initialise and clean the database
            client := init_testDB(t)
            // call function under test
            tt.f(client, tt.datas)
            // TODO Validate the data
        })
    }
}

您将需要一个小型的辅助函数 init_testDB,以在每次测试之前初始化连接并清理数据库。

func init_testDB(t *testing.T) influxdb2.Client {
    t.Helper() // Tells `go test` that this is an helper
    godotenv.Load("./test_influxdb.env") //load environement variable
    client, err := drivers.ConnectToInfluxDB() // create the client

    if err != nil {
        t.Errorf("impossible to connect to DB")
    }

    // Clean the database by deleting the bucket
    ctx := context.Background()
    bucketsAPI := client.BucketsAPI()
    dBucket, err := bucketsAPI.FindBucketByName(ctx, bucket)
    if err == nil {
        client.BucketsAPI().DeleteBucketWithID(context.Background(), *dBucket.Id)
    }

    // create new empty bucket
    dOrg, _ := client.OrganizationsAPI().FindOrganizationByName(ctx, org)
    _, err = client.BucketsAPI().CreateBucketWithNameWithID(ctx, *dOrg.Id, bucket)

    if err != nil {
        t.Errorf("impossible to new create bucket")
    }

    return client
}

最后,您已准备好尝试每种类型的数据插入。

Line protocol 易于使用,并且有点像 SQL 查询。InfluxDB 中的记录由三个元素组成:measurementNamefieldstags。InfluxDB 的 关键概念 包括

  • measurementName,它指的是一个数据集
  • fields,它们是键/值对
  • tags,它们也是键/值对,但充当记录的索引。
func write_event_with_line_protocol(client influxdb2.Client, t ThermostatSetting) {
    // get non-blocking write client
    writeAPI := client.WriteAPI(org, bucket)
    // write line protocol
    writeAPI.WriteRecord(fmt.Sprintf("thermostat,unit=temperature,user=%s avg=%f,max=%f", t.user, t.avg, t.max))
    // Flush writes
    writeAPI.Flush()
}

点数据方法编写起来很冗长,但也提供了更多的结构。当数据参数已经采用所需的格式时,它很方便。

func write_event_with_params_constror(client influxdb2.Client, t ThermostatSetting) {
    // Use blocking write client for writes to desired bucket
    writeAPI := client.WriteAPI(org, bucket)
    // Create point using full params constructor
    p := influxdb2.NewPoint("thermostat",
        map[string]string{"unit": "temperature", "user": t.user},
        map[string]interface{}{"avg": t.avg, "max": t.max},
        time.Now())
    writeAPI.WritePoint(p)
    // Flush writes
    writeAPI.Flush()
}

或者,您可以使用构建器 NewPointWithMeasurement 逐步构建查询,这易于阅读。

func write_event_with_fluent_Style(client influxdb2.Client, t ThermostatSetting) {
    // Use blocking write client for writes to desired bucket
    writeAPI := client.WriteAPI(org, bucket)
    // create point using fluent style
    p := influxdb2.NewPointWithMeasurement("thermostat").
        AddTag("unit", "temperature").
        AddTag("user", t.user).
        AddField("avg", t.avg).
        AddField("max", t.max).
        SetTime(time.Now())
    writeAPI.WritePoint(p)
    // Flush writes
    writeAPI.Flush()
}

哪种插入方法最适合您?不要忘记更新测试以验证您的实现。

批处理

请注意,InfluxDB 客户端使用批处理将数据发送到数据库。默认情况下,在达到批处理大小(默认为 5,000 个点)之前,不会将任何数据发送到数据库,这是数据库负载和数据可用性之间的权衡。较小的批处理大小意味着更高的速度,因此可能会影响数据库的性能。另一方面,等待达到批处理大小意味着数据仍保留在应用程序的内存中,而不是在数据库中。

您可以在调用 influxdb2 的初始化构造函数时调整 Batch Size

client := influxdb2.NewClientWithOptions(dbURL, dbToken,
        influxdb2.DefaultOptions().SetBatchSize(20))

此外,您可以强制客户端使用 Flush() 发送数据。您在前面的示例中已经看到了这一点。

writeAPI.Flush()

但是,根据我在时间序列数据库方面的经验,不要到处使用 Flush 方法。即使将数据立即写入数据库看起来很合理,但它可能会严重影响性能。相反,请使用 Batch Size 选项。

阻塞与非阻塞

虽然 InfluxDB 的默认行为是使用异步调用和批处理(即非阻塞 I/O),但您可以选择同步写入点。对于需要立即提交到数据库的非频繁写入,建议使用此选项。

func write_event_with_blocking_write(client influxdb2.Client) {
    // Get blocking write client
    writeAPI := client.WriteAPIBlocking(org, bucket)

    // write line protocol
    writeAPI.WriteRecord(context.Background(), fmt.Sprintf("stat,unit=temperature1 avg=%f,max=%f", 23.5, 45.0))
}

查询数据

InfluxDB 使用一种名为 Flux 的查询语言。Flux 使用一种函数式方法来选择、过滤和聚合数据。一旦您掌握了它的基本知识,就很容易阅读和理解。此外,InfluxDB 提供了一个强大的查询构建器,用于根据摄取的数据设计您的查询。

InfluxDB - querying data

为了完成本教程,请运行您的集成测试以向数据库添加数据点。接下来,使用查询构建器创建一个查询,以隔离您需要的数据。这是使用 QueryBuilder 构建的查询

from(bucket: "users_business_events")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "thermostat")
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
  |> yield(name: "mean")

现在剩下要做的就是实现一个使用 Go 客户端查询数据的函数。有两种查询数据的方法。

第一种是使用 QueryTableResult。您会注意到将数据放回 ThermostatSetting 结构中需要做一些工作。即使您将 ThermostatSetting 的内容作为一个数据点发送,字段 avgmax 也会作为两个单独的记录输出。

func read_events_as_query_table_result(client influxdb2.Client) map[time.Time]ThermostatSetting {

    // Get query client
    queryAPI := client.QueryAPI(org)

    // Query. You need to change a bit the Query from the Query Builder
    // Otherwise it won't work
    fluxQuery := fmt.Sprintf(`from(bucket: "users_business_events")
    |> range(start: -1h)
    |> filter(fn: (r) => r["_measurement"] == "thermostat")
    |> yield(name: "mean")`)

    result, err := queryAPI.Query(context.Background(), fluxQuery)

    // Putting back the data in share requires a bit of work
    var resultPoints map[time.Time]ThermostatSetting
    resultPoints = make(map[time.Time]ThermostatSetting)

    if err == nil {
        // Iterate over query response
        for result.Next() {
            // Notice when group key has changed
            if result.TableChanged() {
                fmt.Printf("table: %s\n", result.TableMetadata().String())
            }

            val, ok := resultPoints[result.Record().Time()]

            if !ok {
                val = ThermostatSetting{
                    user: fmt.Sprintf("%v", result.Record().ValueByKey("user")),
                }
            }

            switch field := result.Record().Field(); field {
            case "avg":
                val.avg = result.Record().Value().(float64)
            case "max":
                val.max = result.Record().Value().(float64)
            default:
                fmt.Printf("unrecognized field %s.\n", field)
            }

            resultPoints[result.Record().Time()] = val

        }
        // check for an error
        if result.Err() != nil {
            fmt.Printf("query parsing error: %s\n", result.Err().Error())
        }
    } else {
        panic(err)
    }

    return resultPoints

}

第二个选项是 QueryRaw(),它返回一个未解析的结果字符串。

func read_events_as_raw_string(client influxdb2.Client) {
    // Get query client
    queryAPI := client.QueryAPI(org)

    // Query
    fluxQuery := fmt.Sprintf(`from(bucket: "users_business_events")
    |> range(start: -1h)
    |> filter(fn: (r) => r["_measurement"] == "thermostat")
    |> yield(name: "mean")`)

    result, err := queryAPI.QueryRaw(context.Background(), fluxQuery, influxdb2.DefaultDialect())
    if err == nil {
        fmt.Println("QueryResult:")
        fmt.Println(result)
    } else {
        panic(err)
    }
}

最后,您需要更新您的测试函数,看看它是否按预期工作。

func Test_write_event_with_line_protocol(t *testing.T) {
    tests := []struct {
        name  string
        f     func(influxdb2.Client, []ThermostatSetting)
        datas []ThermostatSetting
    }{
        [...]
    }
    for _, tt := range tests {
        t.Run(tt.name, func(t *testing.T) {
            client := init_testDB(t)

            // call function to test
            tt.f(client, tt.datas)
            // test can be flicky if the query is done before that data is ready in the database
            time.Sleep(time.Millisecond * 1000)

            // Option one: QueryTableResult
            results := read_events_as_query_table_result(client)
            // convert results to array to compare with data
            resultsArr := []ThermostatSetting{}
            for _, v := range results {
                resultsArr = append(resultsArr, v)
            }

            if eq := reflect.DeepEqual(resultsArr, tt.datas); !eq {
                t.Errorf("want %v, got %v", tt.datas, resultsArr)
            }

            // Option two: query raw data
            // TODO add validation
            read_events_as_raw_string(client)

            client.Close()
        })
    }
}

结论

如果您完成了本教程,您就拥有了一个经过全面测试的、使用 InfluxDB 的应用程序。您正在使用四种不同的方式来查询数据(三种风格的非阻塞插入和一种阻塞插入),并且您知道在投入生产环境之前需要启用 SSL/TLS 证书。

您有机会插入数据并在 InfluxDB UI 中对其进行可视化,从中您可以快速构建查询,然后在您的应用程序中使用数据。总而言之,您可以插入数据并以相同的数据结构检索它,以便在您的应用程序中使用。您未来的智能恒温器公司肯定走在正确的轨道上。

如果您有兴趣进一步了解 InfluxDB,请阅读 官方文档,以更熟悉 Go 客户端库

关于作者

Alexandre Couëdelo

Alexandre 是一位复杂系统工程和管理专家。自从他开始职业生涯,通过为加拿大一家领先金融机构的数字化转型做出贡献以来,他就一直在拥抱 DevOps 文化。他的热情是 DevOps 革命和工业工程。他喜欢他有足够的后见之明来兼顾两全。