使用 Go 和 InfluxDB 入门
作者:社区 / 产品,用例,开发者,入门
2021年11月19日
导航到
本文由 Alexandre Couëdelo 编写,最初发表在 The New Stack 上。向下滚动以查看作者的图片和简介。
传统的数据库,如 PostgreSQL 或 MongoDB,在安全地以表格或文档格式保存系统状态方面做得很好,但时间依赖性数据:系统度量、物联网设备测量或应用程序状态变化怎么办呢?
对于这些事情,你需要一种更适合的数据库类型,这种数据库旨在更好地管理具有时间特性的半结构化数据。
InfluxDB 是一个专为时序数据设计的性能高效的数据存储。InfluxData 不仅提供了数据库,还提供了工具来摄入、转换和可视化你的数据。例如,Telegraf 提供了超过 200 个插件来摄入数据。但是,如果你想直接将 InfluxDB 集成到你的后端应用程序中,你需要使用专门的 客户端库。
本教程将指导你如何使用 InfluxDB Go 客户端库,创建与数据库的连接,并从中存储和查询数据。
开始使用 InfluxDB
你即将将 InfluxDB 添加到你的应用程序堆栈中。在本教程结束时,你将拥有一个代码库,演示如何将 Go 应用程序与 InfluxDB 接口。但是首先,让我们为此演示创建一些背景。
你正在设计一款新的智能恒温器 IoT 产品。你的 IoT 传感器频繁地接收温度测量数据。假设你将温度测量数据存储在 InfluxDB 数据库中。此外,你的用户可以通过你的应用程序调整智能恒温器的温度。每次用户更改恒温器时,你都会更新你在经典数据库中的恒温器状态。
此外,你希望保留所有恒温器温度设置的历史记录,与温度测量数据一起。温度设置和测量数据一起,使你能够分析用户行为。有了这些数据,你可以后来通过预测用户在采取行动之前的变化,使你的智能恒温器变得更加智能。
本教程的要求
本教程与操作系统无关,并假定你已经安装了 GO 1.16+ 和 Docker。
我选择了 Docker 安装,因为它最适合持续集成。然而,InfluxDB 支持 许多平台(Linux、macOS、Windows、Docker、Kubernetes)。
启动本地数据库
要开始设置,你需要定义一个 docker-compose.yml
文件,它定义了 Docker 中 InfluxDB 的配置。
services:
influxdb:
image: influxdb:2.0.7
environment:
DOCKER_INFLUXDB_INIT_MODE: setup
DOCKER_INFLUXDB_INIT_USERNAME: ${INFLUXDB_USERNAME}
DOCKER_INFLUXDB_INIT_PASSWORD: ${INFLUXDB_PASSWORD}
DOCKER_INFLUXDB_INIT_ORG: iot
DOCKER_INFLUXDB_INIT_BUCKET: users_business_events
DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: ${INFLUXDB_TOKEN}
ports:
- "8086:8086"
运行 docker-compose up
以启动数据库。为了测试目的,请在一个名为 test_influxdb.env
的文件中定义你的环境变量。以下是我的 test_influxdb.env
的内容:
docker-compose --env-file test_influxdb.env up
INFLUXDB_USERNAME=admin
INFLUXDB_PASSWORD=admin1234
INFLUXDB_TOKEN=F-QFQpmCL9UkR3qyoXnLkzWj03s6m4eCvYgDl1ePfHBf9ph7yxaSgQ6WN0i9giNgRTfONwVMK1f977r_g71oNQ==
INFLUXDB_URL="https://127.0.0.1:8086"
你需要添加 --env-file
标志到你的 docker-compose 命令中,以强制 Docker 考虑该文件。
docker-compose --env-file test_influxdb.env up
InfluxDB 配备了易于使用的 UI。访问 https://127.0.0.1:8086
并查看。
熟悉 InfluxDB
InfluxDB 不仅仅是一个时序数据库。它是 influxdata
生态系统的核心元素。这个生态系统包括:
- 一个 UI。网页界面提供了管理界面,如查询构建器和数据可视化等开发工具。
- 一个命令行界面(CLI)influx。
influx
CLI 可以管理数据库,从 CSV 加载数据,插入数据并查询它。它是开发和调试应用程序时的良好伴侣。
开始使用 Go 客户端库
你的数据库已经准备好了,那么让我们写一些 Go 语言代码。初始化你的 Go 模块。
go mod init github.com/xNok/Getting-Started-with-Go-and-InfluxDB
将 influxdb-client-go
添加为你的项目的依赖项。
使用go get命令安装influxdb-client-go/v2库:go get github.com/influxdata/influxdb-client-go/v2
建立连接
让我们创建一个帮助你连接到数据库的函数。你已经在test_influxdb.env文件中定义了一个令牌,变量名为INFLUXDB_TOKEN。你将使用这个令牌进行测试。你也可以通过UI创建一个新的令牌。你的InfluxDB实例应该仍在运行。
- 回到UI,生成一个新的认证令牌。
- 点击“数据”。
- 在“客户端库”中,选择“Go”。
本节允许你创建一个认证令牌并提供一些代码片段,以帮助你开始使用Go库。
这是你要创建的函数
// Connect to an Influx Database reading the credentials from
// environment variables INFLUXDB_TOKEN, INFLUXDB_URL
// return influxdb Client or errors
func connectToInfluxDB() (influxdb2.Client, error) {
}
接下来,创建测试函数。当你调用connectToInfluxDB
时,你将成功连接到数据库,你可以通过调用influxdb2.Client
中的Health
方法来验证这一点。如你所见,我使用了godotenv.Load("../test_influxdb.env")
来获取你在Docker中为InfluxDB定义的凭据。(你需要在你的项目中添加godotenv
作为依赖项)。
func Test_connectToInfluxDB(t *testing.T) {
//load environment variable from a file for test purposes
godotenv.Load("../test_influxdb.env")
tests := []struct {
name string
wantErr bool
}{
{
name: "Successful connection to InfluxDB",
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := ConnectToInfluxDB()
if (err != nil) != tt.wantErr {
t.Errorf("ConnectToInfluxDB() error = %v, wantErr %v", err, tt.wantErr)
return
}
health, err := got.Health(context.Background())
if (err != nil) && health.Status == domain.HealthCheckStatusPass {
t.Errorf("connectToInfluxDB() error. database not healthy")
return
}
got.Close()
})
}
}
在建立连接时,调用influxdb2客户端初始化构造函数。包括从环境变量中读取凭据,并对connectToInfluxDB
进行验证,代码如下所示
// Connect to an Influx Database reading the credentials from
// environement variables INFLUXDB_TOKEN, INFLUXDB_URL
// return influxdb Client or errors
func ConnectToInfluxDB() (influxdb2.Client, error) {
dbToken := os.Getenv("INFLUXDB_TOKEN")
if dbToken == "" {
return nil, errors.New("INFLUXDB_TOKEN must be set")
}
dbURL := os.Getenv("INFLUXDB_URL")
if dbURL == "" {
return nil, errors.New("INFLUXDB_URL must be set")
}
client := influxdb2.NewClient(dbURL, dbToken)
// validate client connection health
_, err := client.Health(context.Background())
return client, err
}
如果你通过了测试,你现在可以开始使用InfluxDB实现一些功能了。但是,你还没有准备好进入生产环境。
在生产环境中,强烈建议启用SSL/TLS加密。由于你使用的是本地Docker环境,你在这个教程中不需要它。
在你的应用程序中,你需要将证书传递给你的InfluxDB客户端。
// read the certificate
cer, _ := tls.LoadX509KeyPair("server.crt", "server.key")
// define the certificate
client := influxdb2.NewClientWithOptions(dbURL, dbToken,
influxdb2.DefaultOptions().
SetTLSConfig(&tls.Config{
Certificates: []tls.Certificate{cer}
}))
插入数据
第一步:对你的数据进行建模。你的需求是将“恒温器设置”的变化发送到InfluxDB。该设置包含用户标识符和房间中所需平均温度和最高温度。
type ThermostatSetting struct {
user string
max float64 //temperature
avg float64 //temperature
}
第二步:编写一个测试函数。你可以使用几种可能的方法来插入数据,以找到最适合你的方法。
InfluxDB Go客户端库提供了三种插入数据的方法
- 行协议使用基于文本的数据库查询。
- 使用构造函数的数据点使用映射来填充数据。
- 使用流畅风格的data point使用构建器模式。
这里是一个通用的测试函数,用于此目的
func Test_write_event_with_line_protocol(t *testing.T) {
tests := []struct {
name string
f func(influxdb2.Client, []ThermostatSetting)
datas []ThermostatSetting
}{
{
name: "Write new record with line protocol",
// Your data Points
datas: []ThermostatSetting{{user: "foo", avg: 35.5, max: 42}},
f: func(c influxdb2.Client, datas []ThermostatSetting) {
// Send all the data to the DB
for _, data := range datas {
write_event_with_line_protocol(c, data)
}
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// helper to initialise and clean the database
client := init_testDB(t)
// call function under test
tt.f(client, tt.datas)
// TODO Validate the data
})
}
}
你需要一个小助手函数init_testDB
来初始化连接并在每次测试之前清理数据库。
func init_testDB(t *testing.T) influxdb2.Client {
t.Helper() // Tells `go test` that this is an helper
godotenv.Load("./test_influxdb.env") //load environement variable
client, err := drivers.ConnectToInfluxDB() // create the client
if err != nil {
t.Errorf("impossible to connect to DB")
}
// Clean the database by deleting the bucket
ctx := context.Background()
bucketsAPI := client.BucketsAPI()
dBucket, err := bucketsAPI.FindBucketByName(ctx, bucket)
if err == nil {
client.BucketsAPI().DeleteBucketWithID(context.Background(), *dBucket.Id)
}
// create new empty bucket
dOrg, _ := client.OrganizationsAPI().FindOrganizationByName(ctx, org)
_, err = client.BucketsAPI().CreateBucketWithNameWithID(ctx, *dOrg.Id, bucket)
if err != nil {
t.Errorf("impossible to new create bucket")
}
return client
}
最后,你已经准备好尝试每种类型的数据插入。
行协议的使用很简单,有点像SQL查询。InfluxDB中的一个记录由三个元素组成:measurementName
、fields
和tags
。InfluxDB的关键概念包括
measurementName
,它指代一个数据集fields
,它们是键/值对tags
,它们也是键/值对,但作为你的记录的索引。
func write_event_with_line_protocol(client influxdb2.Client, t ThermostatSetting) {
// get non-blocking write client
writeAPI := client.WriteAPI(org, bucket)
// write line protocol
writeAPI.WriteRecord(fmt.Sprintf("thermostat,unit=temperature,user=%s avg=%f,max=%f", t.user, t.avg, t.max))
// Flush writes
writeAPI.Flush()
}
点数据方法写起来比较长,但提供了更多的结构。当数据参数已经是所需格式时,它很方便。
func write_event_with_params_constror(client influxdb2.Client, t ThermostatSetting) {
// Use blocking write client for writes to desired bucket
writeAPI := client.WriteAPI(org, bucket)
// Create point using full params constructor
p := influxdb2.NewPoint("thermostat",
map[string]string{"unit": "temperature", "user": t.user},
map[string]interface{}{"avg": t.avg, "max": t.max},
time.Now())
writeAPI.WritePoint(p)
// Flush writes
writeAPI.Flush()
}
或者,您可以使用构建器 NewPointWithMeasurement
一步一步构建查询,这很容易阅读。
func write_event_with_fluent_Style(client influxdb2.Client, t ThermostatSetting) {
// Use blocking write client for writes to desired bucket
writeAPI := client.WriteAPI(org, bucket)
// create point using fluent style
p := influxdb2.NewPointWithMeasurement("thermostat").
AddTag("unit", "temperature").
AddTag("user", t.user).
AddField("avg", t.avg).
AddField("max", t.max).
SetTime(time.Now())
writeAPI.WritePoint(p)
// Flush writes
writeAPI.Flush()
}
哪种插入方法最适合您?别忘了更新测试来验证您的实现。
批处理
请注意,InfluxDB 客户端使用批处理将数据发送到数据库。默认情况下,直到达到批大小(默认为5,000个点)之前,都不会向数据库发送数据,这是在数据库负载和数据可用性之间的一种权衡。较小的批大小意味着更高的速度,这可能会影响数据库的性能。另一方面,等待达到批大小意味着数据仍然存储在您的应用程序内存中,而不是数据库中。
您可以在调用 influxdb2
的初始化构造函数时调整 Batch Size
。
client := influxdb2.NewClientWithOptions(dbURL, dbToken,
influxdb2.DefaultOptions().SetBatchSize(20))
此外,您还可以使用 Flush()
强制客户端发送数据。您在之前的例子中已经看到了这个。
writeAPI.Flush()
然而,根据我使用时序数据库的经验,不要在所有地方都使用 Flush
方法。即使将数据立即写入数据库似乎合情合理,但它可能会显著影响性能。相反,使用 Batch Size
选项。
阻塞与非阻塞
虽然 InfluxDB 的默认行为是使用异步调用和批处理(即非阻塞 I/O),但您可以选择同步写入点。此选项适用于需要立即提交到数据库的偶尔写入。
func write_event_with_blocking_write(client influxdb2.Client) {
// Get blocking write client
writeAPI := client.WriteAPIBlocking(org, bucket)
// write line protocol
writeAPI.WriteRecord(context.Background(), fmt.Sprintf("stat,unit=temperature1 avg=%f,max=%f", 23.5, 45.0))
}
查询数据
InfluxDB 使用一种名为 Flux 的查询语言。Flux 使用函数式方法来选择、过滤和聚合数据。一旦掌握了基础知识,它就很容易阅读和理解。此外,InfluxDB 提供了一个强大的查询构建器,可以根据您摄取的数据设计查询。
为了完成本教程,运行您的集成测试以将数据点添加到数据库中。接下来,使用查询构建器创建一个查询,以隔离您所需的数据。这是使用 QueryBuilder 构建的查询
from(bucket: "users_business_events")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "thermostat")
|> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
|> yield(name: "mean")
现在您需要实现一个函数,使用 Go 客户端查询您的数据。有两种查询数据的方法。
第一种是使用 QueryTableResult。您会注意到,将数据放回 ThermostatSetting
结构体需要一些工作。即使您发送 ThermostatSetting
的内容作为一个数据点,字段 avg
和 max
也会作为两个单独的记录输出。
func read_events_as_query_table_result(client influxdb2.Client) map[time.Time]ThermostatSetting {
// Get query client
queryAPI := client.QueryAPI(org)
// Query. You need to change a bit the Query from the Query Builder
// Otherwise it won't work
fluxQuery := fmt.Sprintf(`from(bucket: "users_business_events")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "thermostat")
|> yield(name: "mean")`)
result, err := queryAPI.Query(context.Background(), fluxQuery)
// Putting back the data in share requires a bit of work
var resultPoints map[time.Time]ThermostatSetting
resultPoints = make(map[time.Time]ThermostatSetting)
if err == nil {
// Iterate over query response
for result.Next() {
// Notice when group key has changed
if result.TableChanged() {
fmt.Printf("table: %s\n", result.TableMetadata().String())
}
val, ok := resultPoints[result.Record().Time()]
if !ok {
val = ThermostatSetting{
user: fmt.Sprintf("%v", result.Record().ValueByKey("user")),
}
}
switch field := result.Record().Field(); field {
case "avg":
val.avg = result.Record().Value().(float64)
case "max":
val.max = result.Record().Value().(float64)
default:
fmt.Printf("unrecognized field %s.\n", field)
}
resultPoints[result.Record().Time()] = val
}
// check for an error
if result.Err() != nil {
fmt.Printf("query parsing error: %s\n", result.Err().Error())
}
} else {
panic(err)
}
return resultPoints
}
第二种选项是 QueryRaw()
,它返回一个未解析的结果字符串。
func read_events_as_raw_string(client influxdb2.Client) {
// Get query client
queryAPI := client.QueryAPI(org)
// Query
fluxQuery := fmt.Sprintf(`from(bucket: "users_business_events")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "thermostat")
|> yield(name: "mean")`)
result, err := queryAPI.QueryRaw(context.Background(), fluxQuery, influxdb2.DefaultDialect())
if err == nil {
fmt.Println("QueryResult:")
fmt.Println(result)
} else {
panic(err)
}
}
最后,您需要更新您的测试函数并查看它是否按预期工作。
func Test_write_event_with_line_protocol(t *testing.T) {
tests := []struct {
name string
f func(influxdb2.Client, []ThermostatSetting)
datas []ThermostatSetting
}{
[...]
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
client := init_testDB(t)
// call function to test
tt.f(client, tt.datas)
// test can be flicky if the query is done before that data is ready in the database
time.Sleep(time.Millisecond * 1000)
// Option one: QueryTableResult
results := read_events_as_query_table_result(client)
// convert results to array to compare with data
resultsArr := []ThermostatSetting{}
for _, v := range results {
resultsArr = append(resultsArr, v)
}
if eq := reflect.DeepEqual(resultsArr, tt.datas); !eq {
t.Errorf("want %v, got %v", tt.datas, resultsArr)
}
// Option two: query raw data
// TODO add validation
read_events_as_raw_string(client)
client.Close()
})
}
}
结论
如果您完成了本教程,您将拥有一个经过彻底测试的应用程序,该应用程序使用 InfluxDB。您使用了四种不同的数据查询方法(三种非阻塞插入风格加上一种阻塞插入),并且您知道在投入生产之前需要启用 SSL/TLS 证书。
您有机会在 InfluxDB UI 中插入数据并可视化它,从而可以快速构建查询,并在应用程序中使用这些数据。总的来说,您可以在同一数据结构中插入和检索数据,以在您的应用程序中使用。您的未来智能恒温器公司无疑走在正确的道路上。
如果您对进一步学习 InfluxDB 感兴趣,请阅读官方文档,以更熟悉Go 客户端库。
关于作者
亚历山大是一位复杂系统工程与管理专家。自从他开始职业生涯,通过参与加拿大一家领先金融机构的数字化转型,他就一直拥抱 DevOps 文化。他对 DevOps 革命和工业工程充满热情。他喜欢自己有足够的前瞻性,能够从两者中汲取精华。