使用 Go 和 InfluxDB 入门

导航到

本文由 Alexandre Couëdelo 编写,最初发表在 The New Stack 上。向下滚动以查看作者的图片和简介。

传统的数据库,如 PostgreSQL 或 MongoDB,在安全地以表格或文档格式保存系统状态方面做得很好,但时间依赖性数据:系统度量、物联网设备测量或应用程序状态变化怎么办呢?

对于这些事情,你需要一种更适合的数据库类型,这种数据库旨在更好地管理具有时间特性的半结构化数据。

InfluxDB 是一个专为时序数据设计的性能高效的数据存储。InfluxData 不仅提供了数据库,还提供了工具来摄入、转换和可视化你的数据。例如,Telegraf 提供了超过 200 个插件来摄入数据。但是,如果你想直接将 InfluxDB 集成到你的后端应用程序中,你需要使用专门的 客户端库

本教程将指导你如何使用 InfluxDB Go 客户端库,创建与数据库的连接,并从中存储和查询数据。

开始使用 InfluxDB

你即将将 InfluxDB 添加到你的应用程序堆栈中。在本教程结束时,你将拥有一个代码库,演示如何将 Go 应用程序与 InfluxDB 接口。但是首先,让我们为此演示创建一些背景。

你正在设计一款新的智能恒温器 IoT 产品。你的 IoT 传感器频繁地接收温度测量数据。假设你将温度测量数据存储在 InfluxDB 数据库中。此外,你的用户可以通过你的应用程序调整智能恒温器的温度。每次用户更改恒温器时,你都会更新你在经典数据库中的恒温器状态。

此外,你希望保留所有恒温器温度设置的历史记录,与温度测量数据一起。温度设置和测量数据一起,使你能够分析用户行为。有了这些数据,你可以后来通过预测用户在采取行动之前的变化,使你的智能恒温器变得更加智能。

本教程的要求

本教程与操作系统无关,并假定你已经安装了 GO 1.16+Docker

我选择了 Docker 安装,因为它最适合持续集成。然而,InfluxDB 支持 许多平台(Linux、macOS、Windows、Docker、Kubernetes)

启动本地数据库

要开始设置,你需要定义一个 docker-compose.yml 文件,它定义了 Docker 中 InfluxDB 的配置。

services:
    influxdb:
        image: influxdb:2.0.7
        environment:
            DOCKER_INFLUXDB_INIT_MODE: setup
            DOCKER_INFLUXDB_INIT_USERNAME: ${INFLUXDB_USERNAME}
            DOCKER_INFLUXDB_INIT_PASSWORD: ${INFLUXDB_PASSWORD}
            DOCKER_INFLUXDB_INIT_ORG: iot
            DOCKER_INFLUXDB_INIT_BUCKET: users_business_events
            DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: ${INFLUXDB_TOKEN}
        ports:
        - "8086:8086"

运行 docker-compose up 以启动数据库。为了测试目的,请在一个名为 test_influxdb.env 的文件中定义你的环境变量。以下是我的 test_influxdb.env 的内容:

docker-compose --env-file test_influxdb.env up

INFLUXDB_USERNAME=admin
INFLUXDB_PASSWORD=admin1234
INFLUXDB_TOKEN=F-QFQpmCL9UkR3qyoXnLkzWj03s6m4eCvYgDl1ePfHBf9ph7yxaSgQ6WN0i9giNgRTfONwVMK1f977r_g71oNQ==
INFLUXDB_URL="https://127.0.0.1:8086"

你需要添加 --env-file 标志到你的 docker-compose 命令中,以强制 Docker 考虑该文件。

docker-compose --env-file test_influxdb.env up

InfluxDB 配备了易于使用的 UI。访问 https://127.0.0.1:8086 并查看。

InfluxDB UI - load data

熟悉 InfluxDB

InfluxDB 不仅仅是一个时序数据库。它是 influxdata 生态系统的核心元素。这个生态系统包括:

  • 一个 UI。网页界面提供了管理界面,如查询构建器和数据可视化等开发工具。
  • 一个命令行界面(CLI)influxinflux CLI 可以管理数据库,从 CSV 加载数据,插入数据并查询它。它是开发和调试应用程序时的良好伴侣。

开始使用 Go 客户端库

你的数据库已经准备好了,那么让我们写一些 Go 语言代码。初始化你的 Go 模块。

go mod init github.com/xNok/Getting-Started-with-Go-and-InfluxDB

influxdb-client-go 添加为你的项目的依赖项。

使用go get命令安装influxdb-client-go/v2库:go get github.com/influxdata/influxdb-client-go/v2

建立连接

让我们创建一个帮助你连接到数据库的函数。你已经在test_influxdb.env文件中定义了一个令牌,变量名为INFLUXDB_TOKEN。你将使用这个令牌进行测试。你也可以通过UI创建一个新的令牌。你的InfluxDB实例应该仍在运行。

  1. 回到UI,生成一个新的认证令牌。
  2. 点击“数据”。
  3. 在“客户端库”中,选择“Go”。

本节允许你创建一个认证令牌并提供一些代码片段,以帮助你开始使用Go库。

Getting Started with Go and InfluxDB

这是你要创建的函数

// Connect to an Influx Database reading the credentials from
// environment variables INFLUXDB_TOKEN, INFLUXDB_URL
// return influxdb Client or errors
func connectToInfluxDB() (influxdb2.Client, error) {

}

接下来,创建测试函数。当你调用connectToInfluxDB时,你将成功连接到数据库,你可以通过调用influxdb2.Client中的Health方法来验证这一点。如你所见,我使用了godotenv.Load("../test_influxdb.env")来获取你在Docker中为InfluxDB定义的凭据。(你需要在你的项目中添加godotenv作为依赖项)。

func Test_connectToInfluxDB(t *testing.T) {

    //load environment variable from a file for test purposes
    godotenv.Load("../test_influxdb.env")

    tests := []struct {
        name string
        wantErr bool
    }{
        {
            name:    "Successful connection to InfluxDB",
            wantErr: false,
        },
    }
    for _, tt := range tests {
        t.Run(tt.name, func(t *testing.T) {
            got, err := ConnectToInfluxDB()
            if (err != nil) != tt.wantErr {
                t.Errorf("ConnectToInfluxDB() error = %v, wantErr %v", err, tt.wantErr)
                return
            }
            health, err := got.Health(context.Background())
            if (err != nil) && health.Status == domain.HealthCheckStatusPass {
                t.Errorf("connectToInfluxDB() error. database not healthy")
                return
            }
            got.Close()
        })
    }
}

在建立连接时,调用influxdb2客户端初始化构造函数。包括从环境变量中读取凭据,并对connectToInfluxDB进行验证,代码如下所示

// Connect to an Influx Database reading the credentials from
// environement variables INFLUXDB_TOKEN, INFLUXDB_URL
// return influxdb Client or errors
func ConnectToInfluxDB() (influxdb2.Client, error) {

    dbToken := os.Getenv("INFLUXDB_TOKEN")
    if dbToken == "" {
        return nil, errors.New("INFLUXDB_TOKEN must be set")
    }

    dbURL := os.Getenv("INFLUXDB_URL")
    if dbURL == "" {
        return nil, errors.New("INFLUXDB_URL must be set")
    }

    client := influxdb2.NewClient(dbURL, dbToken)

    // validate client connection health
    _, err := client.Health(context.Background())

    return client, err
}

如果你通过了测试,你现在可以开始使用InfluxDB实现一些功能了。但是,你还没有准备好进入生产环境。

在生产环境中,强烈建议启用SSL/TLS加密。由于你使用的是本地Docker环境,你在这个教程中不需要它。

在你的应用程序中,你需要将证书传递给你的InfluxDB客户端。

// read the certificate
cer, _ := tls.LoadX509KeyPair("server.crt", "server.key")
// define the certificate
client := influxdb2.NewClientWithOptions(dbURL, dbToken, 
    influxdb2.DefaultOptions().
        SetTLSConfig(&tls.Config{
            Certificates: []tls.Certificate{cer}
        }))

插入数据

第一步:对你的数据进行建模。你的需求是将“恒温器设置”的变化发送到InfluxDB。该设置包含用户标识符和房间中所需平均温度和最高温度。

type ThermostatSetting struct {
    user string
    max  float64 //temperature
    avg  float64 //temperature
}

第二步:编写一个测试函数。你可以使用几种可能的方法来插入数据,以找到最适合你的方法。

InfluxDB Go客户端库提供了三种插入数据的方法

  • 行协议使用基于文本的数据库查询。
  • 使用构造函数的数据点使用映射来填充数据。
  • 使用流畅风格的data point使用构建器模式。

这里是一个通用的测试函数,用于此目的

func Test_write_event_with_line_protocol(t *testing.T) {
    tests := []struct {
        name  string
        f     func(influxdb2.Client, []ThermostatSetting)
        datas []ThermostatSetting 
    }{
        {
            name:  "Write new record with line protocol",
            // Your data Points
            datas: []ThermostatSetting{{user: "foo", avg: 35.5, max: 42}},
            f: func(c influxdb2.Client, datas []ThermostatSetting) {
                // Send all the data to the DB
                for _, data := range datas {
                    write_event_with_line_protocol(c, data)
                }
            },
        },
    }
    for _, tt := range tests {
        t.Run(tt.name, func(t *testing.T) {
            // helper to initialise and clean the database
            client := init_testDB(t)
            // call function under test
            tt.f(client, tt.datas)
            // TODO Validate the data
        })
    }
}

你需要一个小助手函数init_testDB来初始化连接并在每次测试之前清理数据库。

func init_testDB(t *testing.T) influxdb2.Client {
    t.Helper() // Tells `go test` that this is an helper
    godotenv.Load("./test_influxdb.env") //load environement variable
    client, err := drivers.ConnectToInfluxDB() // create the client

    if err != nil {
        t.Errorf("impossible to connect to DB")
    }

    // Clean the database by deleting the bucket
    ctx := context.Background()
    bucketsAPI := client.BucketsAPI()
    dBucket, err := bucketsAPI.FindBucketByName(ctx, bucket)
    if err == nil {
        client.BucketsAPI().DeleteBucketWithID(context.Background(), *dBucket.Id)
    }

    // create new empty bucket
    dOrg, _ := client.OrganizationsAPI().FindOrganizationByName(ctx, org)
    _, err = client.BucketsAPI().CreateBucketWithNameWithID(ctx, *dOrg.Id, bucket)

    if err != nil {
        t.Errorf("impossible to new create bucket")
    }

    return client
}

最后,你已经准备好尝试每种类型的数据插入。

行协议的使用很简单,有点像SQL查询。InfluxDB中的一个记录由三个元素组成:measurementNamefieldstags。InfluxDB的关键概念包括

  • measurementName,它指代一个数据集
  • fields,它们是键/值对
  • tags,它们也是键/值对,但作为你的记录的索引。
func write_event_with_line_protocol(client influxdb2.Client, t ThermostatSetting) {
    // get non-blocking write client
    writeAPI := client.WriteAPI(org, bucket)
    // write line protocol
    writeAPI.WriteRecord(fmt.Sprintf("thermostat,unit=temperature,user=%s avg=%f,max=%f", t.user, t.avg, t.max))
    // Flush writes
    writeAPI.Flush()
}

点数据方法写起来比较长,但提供了更多的结构。当数据参数已经是所需格式时,它很方便。

func write_event_with_params_constror(client influxdb2.Client, t ThermostatSetting) {
    // Use blocking write client for writes to desired bucket
    writeAPI := client.WriteAPI(org, bucket)
    // Create point using full params constructor
    p := influxdb2.NewPoint("thermostat",
        map[string]string{"unit": "temperature", "user": t.user},
        map[string]interface{}{"avg": t.avg, "max": t.max},
        time.Now())
    writeAPI.WritePoint(p)
    // Flush writes
    writeAPI.Flush()
}

或者,您可以使用构建器 NewPointWithMeasurement 一步一步构建查询,这很容易阅读。

func write_event_with_fluent_Style(client influxdb2.Client, t ThermostatSetting) {
    // Use blocking write client for writes to desired bucket
    writeAPI := client.WriteAPI(org, bucket)
    // create point using fluent style
    p := influxdb2.NewPointWithMeasurement("thermostat").
        AddTag("unit", "temperature").
        AddTag("user", t.user).
        AddField("avg", t.avg).
        AddField("max", t.max).
        SetTime(time.Now())
    writeAPI.WritePoint(p)
    // Flush writes
    writeAPI.Flush()
}

哪种插入方法最适合您?别忘了更新测试来验证您的实现。

批处理

请注意,InfluxDB 客户端使用批处理将数据发送到数据库。默认情况下,直到达到批大小(默认为5,000个点)之前,都不会向数据库发送数据,这是在数据库负载和数据可用性之间的一种权衡。较小的批大小意味着更高的速度,这可能会影响数据库的性能。另一方面,等待达到批大小意味着数据仍然存储在您的应用程序内存中,而不是数据库中。

您可以在调用 influxdb2 的初始化构造函数时调整 Batch Size

client := influxdb2.NewClientWithOptions(dbURL, dbToken,
        influxdb2.DefaultOptions().SetBatchSize(20))

此外,您还可以使用 Flush() 强制客户端发送数据。您在之前的例子中已经看到了这个。

writeAPI.Flush()

然而,根据我使用时序数据库的经验,不要在所有地方都使用 Flush 方法。即使将数据立即写入数据库似乎合情合理,但它可能会显著影响性能。相反,使用 Batch Size 选项。

阻塞与非阻塞

虽然 InfluxDB 的默认行为是使用异步调用和批处理(即非阻塞 I/O),但您可以选择同步写入点。此选项适用于需要立即提交到数据库的偶尔写入。

func write_event_with_blocking_write(client influxdb2.Client) {
    // Get blocking write client
    writeAPI := client.WriteAPIBlocking(org, bucket)

    // write line protocol
    writeAPI.WriteRecord(context.Background(), fmt.Sprintf("stat,unit=temperature1 avg=%f,max=%f", 23.5, 45.0))
}

查询数据

InfluxDB 使用一种名为 Flux 的查询语言。Flux 使用函数式方法来选择、过滤和聚合数据。一旦掌握了基础知识,它就很容易阅读和理解。此外,InfluxDB 提供了一个强大的查询构建器,可以根据您摄取的数据设计查询。

InfluxDB - querying data

为了完成本教程,运行您的集成测试以将数据点添加到数据库中。接下来,使用查询构建器创建一个查询,以隔离您所需的数据。这是使用 QueryBuilder 构建的查询

from(bucket: "users_business_events")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "thermostat")
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
  |> yield(name: "mean")

现在您需要实现一个函数,使用 Go 客户端查询您的数据。有两种查询数据的方法。

第一种是使用 QueryTableResult。您会注意到,将数据放回 ThermostatSetting 结构体需要一些工作。即使您发送 ThermostatSetting 的内容作为一个数据点,字段 avgmax 也会作为两个单独的记录输出。

func read_events_as_query_table_result(client influxdb2.Client) map[time.Time]ThermostatSetting {

    // Get query client
    queryAPI := client.QueryAPI(org)

    // Query. You need to change a bit the Query from the Query Builder
    // Otherwise it won't work
    fluxQuery := fmt.Sprintf(`from(bucket: "users_business_events")
    |> range(start: -1h)
    |> filter(fn: (r) => r["_measurement"] == "thermostat")
    |> yield(name: "mean")`)

    result, err := queryAPI.Query(context.Background(), fluxQuery)

    // Putting back the data in share requires a bit of work
    var resultPoints map[time.Time]ThermostatSetting
    resultPoints = make(map[time.Time]ThermostatSetting)

    if err == nil {
        // Iterate over query response
        for result.Next() {
            // Notice when group key has changed
            if result.TableChanged() {
                fmt.Printf("table: %s\n", result.TableMetadata().String())
            }

            val, ok := resultPoints[result.Record().Time()]

            if !ok {
                val = ThermostatSetting{
                    user: fmt.Sprintf("%v", result.Record().ValueByKey("user")),
                }
            }

            switch field := result.Record().Field(); field {
            case "avg":
                val.avg = result.Record().Value().(float64)
            case "max":
                val.max = result.Record().Value().(float64)
            default:
                fmt.Printf("unrecognized field %s.\n", field)
            }

            resultPoints[result.Record().Time()] = val

        }
        // check for an error
        if result.Err() != nil {
            fmt.Printf("query parsing error: %s\n", result.Err().Error())
        }
    } else {
        panic(err)
    }

    return resultPoints

}

第二种选项是 QueryRaw() ,它返回一个未解析的结果字符串。

func read_events_as_raw_string(client influxdb2.Client) {
    // Get query client
    queryAPI := client.QueryAPI(org)

    // Query
    fluxQuery := fmt.Sprintf(`from(bucket: "users_business_events")
    |> range(start: -1h)
    |> filter(fn: (r) => r["_measurement"] == "thermostat")
    |> yield(name: "mean")`)

    result, err := queryAPI.QueryRaw(context.Background(), fluxQuery, influxdb2.DefaultDialect())
    if err == nil {
        fmt.Println("QueryResult:")
        fmt.Println(result)
    } else {
        panic(err)
    }
}

最后,您需要更新您的测试函数并查看它是否按预期工作。

func Test_write_event_with_line_protocol(t *testing.T) {
    tests := []struct {
        name  string
        f     func(influxdb2.Client, []ThermostatSetting)
        datas []ThermostatSetting
    }{
        [...]
    }
    for _, tt := range tests {
        t.Run(tt.name, func(t *testing.T) {
            client := init_testDB(t)

            // call function to test
            tt.f(client, tt.datas)
            // test can be flicky if the query is done before that data is ready in the database
            time.Sleep(time.Millisecond * 1000)

            // Option one: QueryTableResult
            results := read_events_as_query_table_result(client)
            // convert results to array to compare with data
            resultsArr := []ThermostatSetting{}
            for _, v := range results {
                resultsArr = append(resultsArr, v)
            }

            if eq := reflect.DeepEqual(resultsArr, tt.datas); !eq {
                t.Errorf("want %v, got %v", tt.datas, resultsArr)
            }

            // Option two: query raw data
            // TODO add validation
            read_events_as_raw_string(client)

            client.Close()
        })
    }
}

结论

如果您完成了本教程,您将拥有一个经过彻底测试的应用程序,该应用程序使用 InfluxDB。您使用了四种不同的数据查询方法(三种非阻塞插入风格加上一种阻塞插入),并且您知道在投入生产之前需要启用 SSL/TLS 证书。

您有机会在 InfluxDB UI 中插入数据并可视化它,从而可以快速构建查询,并在应用程序中使用这些数据。总的来说,您可以在同一数据结构中插入和检索数据,以在您的应用程序中使用。您的未来智能恒温器公司无疑走在正确的道路上。

如果您对进一步学习 InfluxDB 感兴趣,请阅读官方文档,以更熟悉Go 客户端库

关于作者

Alexandre Couëdelo

亚历山大是一位复杂系统工程与管理专家。自从他开始职业生涯,通过参与加拿大一家领先金融机构的数字化转型,他就一直拥抱 DevOps 文化。他对 DevOps 革命和工业工程充满热情。他喜欢自己有足够的前瞻性,能够从两者中汲取精华。