Go 和 InfluxDB 入门
作者: Community / 产品, 用例, 开发者, 入门
2021年11月19日
导航至
本文由 Alexandre Couëdelo 撰写,最初由 The New Stack 发布。向下滚动查看作者的照片和简介。
诸如 PostgreSQL 或 MongoDB 之类的传统数据库非常擅长以表格或文档格式安全地保存系统状态,但是对于时间相关的数据(系统指标、物联网设备测量或应用程序状态更改)呢?
对于这些情况,您需要一种更合适的数据库类型,一种旨在更好地管理具有时间特征的半结构化数据的数据库。
InfluxDB 是一个专门为时间序列数据编写的高性能数据存储。InfluxData 不仅提供数据库,还提供用于摄取、转换和可视化数据的工具。例如,Telegraf 提供了 200 多个插件来摄取数据。但是,如果您想将 InfluxDB 直接集成到您的后端应用程序中,则需要使用专用的 客户端库。
本教程将引导您了解如何使用 InfluxDB Go 客户端库,创建与数据库的连接以及存储和查询数据。
InfluxDB 入门
您即将将 InfluxDB 添加到您的应用程序堆栈中。在本教程结束时,您将获得一个代码库,说明如何将 Go 应用程序与 InfluxDB 接口。但首先,让我们为这个演示创建一个上下文。
您正在设计一款新的智能恒温器物联网产品。您会收到来自物联网传感器的频繁温度测量值。假设您将温度测量值存储在 InfluxDB 数据库中。此外,您的用户可以使用您的应用程序调整智能恒温器的温度。每次用户更改恒温器时,您都会在您的传统数据库中更新恒温器的状态。
此外,您希望保留所有恒温器温度设置的历史记录以及温度测量值。温度设置和测量值一起使您能够分析用户行为。有了这些数据,您以后可以通过在用户采取行动之前预测变化,使您的智能恒温器更加智能。
本教程的要求
本教程与操作系统无关,并假设您已安装 GO 1.16+ 和 Docker。
我选择了 Docker 安装,因为它最适合持续集成。但是,InfluxDB 支持多种平台 (Linux、macOS、Windows、Docker、Kubernetes)。
启动本地数据库
要开始设置,您需要定义一个 docker-compose.yml
文件,该文件定义 Docker 中 InfluxDB 的配置。
services:
influxdb:
image: influxdb:2.0.7
environment:
DOCKER_INFLUXDB_INIT_MODE: setup
DOCKER_INFLUXDB_INIT_USERNAME: ${INFLUXDB_USERNAME}
DOCKER_INFLUXDB_INIT_PASSWORD: ${INFLUXDB_PASSWORD}
DOCKER_INFLUXDB_INIT_ORG: iot
DOCKER_INFLUXDB_INIT_BUCKET: users_business_events
DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: ${INFLUXDB_TOKEN}
ports:
- "8086:8086"
运行 docker-compose up
以启动数据库。出于测试目的,请在名为 test_influxdb.env
的文件中定义您的环境变量。以下是我的 test_influxdb.env
的内容
docker-compose --env-file test_influxdb.env up
INFLUXDB_USERNAME=admin
INFLUXDB_PASSWORD=admin1234
INFLUXDB_TOKEN=F-QFQpmCL9UkR3qyoXnLkzWj03s6m4eCvYgDl1ePfHBf9ph7yxaSgQ6WN0i9giNgRTfONwVMK1f977r_g71oNQ==
INFLUXDB_URL="http://localhost:8086"
您需要将标志 --env-file
添加到您的 docker-compose 命令中,以强制 Docker 将该文件考虑在内。
docker-compose --env-file test_influxdb.env up
InfluxDB 配备了易于使用的 UI。访问 http://localhost:8086
并查看。
熟悉 InfluxDB
InfluxDB 不仅仅是一个时间序列数据库。它是 influxdata
生态系统背后的核心元素。这个生态系统包括
- UI。Web 界面提供管理界面、开发工具,例如查询构建器和数据可视化。
- 命令行界面 (CLI) influx。
influx
CLI 可以管理数据库、从 CSV 加载数据、插入数据和查询数据。它是开发和调试应用程序的好帮手。
Go 客户端库入门
您的数据库已准备就绪,所以让我们编写一些 Go 代码。初始化您的 Go 模块。
go mod init github.com/xNok/Getting-Started-with-Go-and-InfluxDB
将 influxdb-client-go
作为依赖项添加到您的项目中。
go get github.com/influxdata/influxdb-client-go/v2
建立连接
让我们创建一个帮助您连接到数据库的函数。您已经在 test_influxdb.env
中使用变量 INFLUXDB_TOKEN
定义了一个令牌。您将使用此令牌进行测试。您还可以通过 UI 创建一个新令牌。您的 InfluxDB 实例应仍在运行。
- 返回 UI 并生成一个新的身份验证令牌。
- 点击数据。
- 在客户端库中,选择Go。
本节允许您创建身份验证令牌,并提供一些代码片段以开始使用 Go 库。
这是您旨在创建的函数
// Connect to an Influx Database reading the credentials from
// environment variables INFLUXDB_TOKEN, INFLUXDB_URL
// return influxdb Client or errors
func connectToInfluxDB() (influxdb2.Client, error) {
}
接下来,创建测试函数。当您调用 connectToInfluxDB
时,您将成功连接到数据库,并且可以通过调用 influxdb2.Client
中的 Health
方法来验证这一点。如您所见,我使用了 godotenv.Load("../test_influxdb.env")
来获取您为 Docker 中的 InfluxDB 定义的凭据。(您需要将 godotenv
作为依赖项添加到您的项目中)。
func Test_connectToInfluxDB(t *testing.T) {
//load environment variable from a file for test purposes
godotenv.Load("../test_influxdb.env")
tests := []struct {
name string
wantErr bool
}{
{
name: "Successful connection to InfluxDB",
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := ConnectToInfluxDB()
if (err != nil) != tt.wantErr {
t.Errorf("ConnectToInfluxDB() error = %v, wantErr %v", err, tt.wantErr)
return
}
health, err := got.Health(context.Background())
if (err != nil) && health.Status == domain.HealthCheckStatusPass {
t.Errorf("connectToInfluxDB() error. database not healthy")
return
}
got.Close()
})
}
}
当涉及到创建连接时,调用 influxdb2
客户端初始化构造函数。包括从环境变量读取凭据和验证 connectToInfluxDB
的代码看起来像这样
// Connect to an Influx Database reading the credentials from
// environement variables INFLUXDB_TOKEN, INFLUXDB_URL
// return influxdb Client or errors
func ConnectToInfluxDB() (influxdb2.Client, error) {
dbToken := os.Getenv("INFLUXDB_TOKEN")
if dbToken == "" {
return nil, errors.New("INFLUXDB_TOKEN must be set")
}
dbURL := os.Getenv("INFLUXDB_URL")
if dbURL == "" {
return nil, errors.New("INFLUXDB_URL must be set")
}
client := influxdb2.NewClient(dbURL, dbToken)
// validate client connection health
_, err := client.Health(context.Background())
return client, err
}
如果您通过了测试,您就可以使用 InfluxDB 实现一些功能了。但是,您尚未准备好投入生产环境。
强烈建议在生产环境中启用 SSL/TLS 加密。在本教程中您不需要它,因为您使用的是本地 Docker 环境。
在您的应用程序中,您需要将证书传递给您的 InfluxDB 客户端。
// read the certificate
cer, _ := tls.LoadX509KeyPair("server.crt", "server.key")
// define the certificate
client := influxdb2.NewClientWithOptions(dbURL, dbToken,
influxdb2.DefaultOptions().
SetTLSConfig(&tls.Config{
Certificates: []tls.Certificate{cer}
}))
插入数据
第一步:对您的数据进行建模。您的要求是将“恒温器设置”中的更改发送到 InfluxDB。该设置包含用户的标识符以及房间内所需的平均和最高温度。
type ThermostatSetting struct {
user string
max float64 //temperature
avg float64 //temperature
}
第二步:编写一个测试函数。您可以使用几种可能的方式来插入数据,以找到最适合您的一种。
InfluxDB Go 客户端库提供了三种插入数据的方式
- Line protocol 使用基于文本的数据库查询。
- 带有构造函数的数据点使用映射来填充数据。
- 具有流畅风格的数据点使用构建器模式。
这是一个为此目的而制作的通用测试函数
func Test_write_event_with_line_protocol(t *testing.T) {
tests := []struct {
name string
f func(influxdb2.Client, []ThermostatSetting)
datas []ThermostatSetting
}{
{
name: "Write new record with line protocol",
// Your data Points
datas: []ThermostatSetting{{user: "foo", avg: 35.5, max: 42}},
f: func(c influxdb2.Client, datas []ThermostatSetting) {
// Send all the data to the DB
for _, data := range datas {
write_event_with_line_protocol(c, data)
}
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// helper to initialise and clean the database
client := init_testDB(t)
// call function under test
tt.f(client, tt.datas)
// TODO Validate the data
})
}
}
您将需要一个小型的辅助函数 init_testDB
,以在每次测试之前初始化连接并清理数据库。
func init_testDB(t *testing.T) influxdb2.Client {
t.Helper() // Tells `go test` that this is an helper
godotenv.Load("./test_influxdb.env") //load environement variable
client, err := drivers.ConnectToInfluxDB() // create the client
if err != nil {
t.Errorf("impossible to connect to DB")
}
// Clean the database by deleting the bucket
ctx := context.Background()
bucketsAPI := client.BucketsAPI()
dBucket, err := bucketsAPI.FindBucketByName(ctx, bucket)
if err == nil {
client.BucketsAPI().DeleteBucketWithID(context.Background(), *dBucket.Id)
}
// create new empty bucket
dOrg, _ := client.OrganizationsAPI().FindOrganizationByName(ctx, org)
_, err = client.BucketsAPI().CreateBucketWithNameWithID(ctx, *dOrg.Id, bucket)
if err != nil {
t.Errorf("impossible to new create bucket")
}
return client
}
最后,您已准备好尝试每种类型的数据插入。
Line protocol 易于使用,并且有点像 SQL 查询。InfluxDB 中的记录由三个元素组成:measurementName
、fields
和 tags
。InfluxDB 的 关键概念 包括
measurementName
,它指的是一个数据集fields
,它们是键/值对tags
,它们也是键/值对,但充当记录的索引。
func write_event_with_line_protocol(client influxdb2.Client, t ThermostatSetting) {
// get non-blocking write client
writeAPI := client.WriteAPI(org, bucket)
// write line protocol
writeAPI.WriteRecord(fmt.Sprintf("thermostat,unit=temperature,user=%s avg=%f,max=%f", t.user, t.avg, t.max))
// Flush writes
writeAPI.Flush()
}
点数据方法编写起来很冗长,但也提供了更多的结构。当数据参数已经采用所需的格式时,它很方便。
func write_event_with_params_constror(client influxdb2.Client, t ThermostatSetting) {
// Use blocking write client for writes to desired bucket
writeAPI := client.WriteAPI(org, bucket)
// Create point using full params constructor
p := influxdb2.NewPoint("thermostat",
map[string]string{"unit": "temperature", "user": t.user},
map[string]interface{}{"avg": t.avg, "max": t.max},
time.Now())
writeAPI.WritePoint(p)
// Flush writes
writeAPI.Flush()
}
或者,您可以使用构建器 NewPointWithMeasurement
逐步构建查询,这易于阅读。
func write_event_with_fluent_Style(client influxdb2.Client, t ThermostatSetting) {
// Use blocking write client for writes to desired bucket
writeAPI := client.WriteAPI(org, bucket)
// create point using fluent style
p := influxdb2.NewPointWithMeasurement("thermostat").
AddTag("unit", "temperature").
AddTag("user", t.user).
AddField("avg", t.avg).
AddField("max", t.max).
SetTime(time.Now())
writeAPI.WritePoint(p)
// Flush writes
writeAPI.Flush()
}
哪种插入方法最适合您?不要忘记更新测试以验证您的实现。
批处理
请注意,InfluxDB 客户端使用批处理将数据发送到数据库。默认情况下,在达到批处理大小(默认为 5,000 个点)之前,不会将任何数据发送到数据库,这是数据库负载和数据可用性之间的权衡。较小的批处理大小意味着更高的速度,因此可能会影响数据库的性能。另一方面,等待达到批处理大小意味着数据仍保留在应用程序的内存中,而不是在数据库中。
您可以在调用 influxdb2
的初始化构造函数时调整 Batch Size
client := influxdb2.NewClientWithOptions(dbURL, dbToken,
influxdb2.DefaultOptions().SetBatchSize(20))
此外,您可以强制客户端使用 Flush()
发送数据。您在前面的示例中已经看到了这一点。
writeAPI.Flush()
但是,根据我在时间序列数据库方面的经验,不要到处使用 Flush
方法。即使将数据立即写入数据库看起来很合理,但它可能会严重影响性能。相反,请使用 Batch Size
选项。
阻塞与非阻塞
虽然 InfluxDB 的默认行为是使用异步调用和批处理(即非阻塞 I/O),但您可以选择同步写入点。对于需要立即提交到数据库的非频繁写入,建议使用此选项。
func write_event_with_blocking_write(client influxdb2.Client) {
// Get blocking write client
writeAPI := client.WriteAPIBlocking(org, bucket)
// write line protocol
writeAPI.WriteRecord(context.Background(), fmt.Sprintf("stat,unit=temperature1 avg=%f,max=%f", 23.5, 45.0))
}
查询数据
InfluxDB 使用一种名为 Flux 的查询语言。Flux 使用一种函数式方法来选择、过滤和聚合数据。一旦您掌握了它的基本知识,就很容易阅读和理解。此外,InfluxDB 提供了一个强大的查询构建器,用于根据摄取的数据设计您的查询。
为了完成本教程,请运行您的集成测试以向数据库添加数据点。接下来,使用查询构建器创建一个查询,以隔离您需要的数据。这是使用 QueryBuilder 构建的查询
from(bucket: "users_business_events")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "thermostat")
|> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
|> yield(name: "mean")
现在剩下要做的就是实现一个使用 Go 客户端查询数据的函数。有两种查询数据的方法。
第一种是使用 QueryTableResult。您会注意到将数据放回 ThermostatSetting
结构中需要做一些工作。即使您将 ThermostatSetting
的内容作为一个数据点发送,字段 avg
和 max
也会作为两个单独的记录输出。
func read_events_as_query_table_result(client influxdb2.Client) map[time.Time]ThermostatSetting {
// Get query client
queryAPI := client.QueryAPI(org)
// Query. You need to change a bit the Query from the Query Builder
// Otherwise it won't work
fluxQuery := fmt.Sprintf(`from(bucket: "users_business_events")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "thermostat")
|> yield(name: "mean")`)
result, err := queryAPI.Query(context.Background(), fluxQuery)
// Putting back the data in share requires a bit of work
var resultPoints map[time.Time]ThermostatSetting
resultPoints = make(map[time.Time]ThermostatSetting)
if err == nil {
// Iterate over query response
for result.Next() {
// Notice when group key has changed
if result.TableChanged() {
fmt.Printf("table: %s\n", result.TableMetadata().String())
}
val, ok := resultPoints[result.Record().Time()]
if !ok {
val = ThermostatSetting{
user: fmt.Sprintf("%v", result.Record().ValueByKey("user")),
}
}
switch field := result.Record().Field(); field {
case "avg":
val.avg = result.Record().Value().(float64)
case "max":
val.max = result.Record().Value().(float64)
default:
fmt.Printf("unrecognized field %s.\n", field)
}
resultPoints[result.Record().Time()] = val
}
// check for an error
if result.Err() != nil {
fmt.Printf("query parsing error: %s\n", result.Err().Error())
}
} else {
panic(err)
}
return resultPoints
}
第二个选项是 QueryRaw()
,它返回一个未解析的结果字符串。
func read_events_as_raw_string(client influxdb2.Client) {
// Get query client
queryAPI := client.QueryAPI(org)
// Query
fluxQuery := fmt.Sprintf(`from(bucket: "users_business_events")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "thermostat")
|> yield(name: "mean")`)
result, err := queryAPI.QueryRaw(context.Background(), fluxQuery, influxdb2.DefaultDialect())
if err == nil {
fmt.Println("QueryResult:")
fmt.Println(result)
} else {
panic(err)
}
}
最后,您需要更新您的测试函数,看看它是否按预期工作。
func Test_write_event_with_line_protocol(t *testing.T) {
tests := []struct {
name string
f func(influxdb2.Client, []ThermostatSetting)
datas []ThermostatSetting
}{
[...]
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
client := init_testDB(t)
// call function to test
tt.f(client, tt.datas)
// test can be flicky if the query is done before that data is ready in the database
time.Sleep(time.Millisecond * 1000)
// Option one: QueryTableResult
results := read_events_as_query_table_result(client)
// convert results to array to compare with data
resultsArr := []ThermostatSetting{}
for _, v := range results {
resultsArr = append(resultsArr, v)
}
if eq := reflect.DeepEqual(resultsArr, tt.datas); !eq {
t.Errorf("want %v, got %v", tt.datas, resultsArr)
}
// Option two: query raw data
// TODO add validation
read_events_as_raw_string(client)
client.Close()
})
}
}
结论
如果您完成了本教程,您就拥有了一个经过全面测试的、使用 InfluxDB 的应用程序。您正在使用四种不同的方式来查询数据(三种风格的非阻塞插入和一种阻塞插入),并且您知道在投入生产环境之前需要启用 SSL/TLS 证书。
您有机会插入数据并在 InfluxDB UI 中对其进行可视化,从中您可以快速构建查询,然后在您的应用程序中使用数据。总而言之,您可以插入数据并以相同的数据结构检索它,以便在您的应用程序中使用。您未来的智能恒温器公司肯定走在正确的轨道上。
如果您有兴趣进一步了解 InfluxDB,请阅读 官方文档,以更熟悉 Go 客户端库。
关于作者
Alexandre 是一位复杂系统工程和管理专家。自从他开始职业生涯,通过为加拿大一家领先金融机构的数字化转型做出贡献以来,他就一直在拥抱 DevOps 文化。他的热情是 DevOps 革命和工业工程。他喜欢他有足够的后见之明来兼顾两全。