从 Chess.com 收集数据:自定义 Telegraf 插件编写指南
作者:社区 / 用例, 产品, 开发者
2021 年 7 月 26 日
导航至
本文由 InfluxData 2021 年夏季实习生 Noé Garcia、Mya Longmire、Aidan Tai、Dane Strandboge 和 Merrick Tian 撰写。
Telegraf 是一个强大的工具,使您能够从堆栈、传感器或系统收集指标和信息。Telegraf 处理繁重的工作和计算,让您可以花更多时间专注于如何使用新收集的数据。因此,我们的团队决定为 Telegraf 构建一个简单的插件,该插件从 Chess.com 收集信息和用户指标。我们使用官方 Chess.com 发布的数据 API 来收集信息。在这篇博文中,我们将介绍我们如何构建这个插件,试图指导读者完成这个过程,以便他们可以构建自己的插件。
入门
为了开始我们的插件,我们从其官方 GitHub 存储库 fork 并下载了 Telegraf 项目。所有 Telegraf 插件都位于 plugins
目录中,由于我们是从系统的外部源收集信息,因此我们将源代码放在 plugins/inputs
目录中。在那里,我们创建一个名为 chess
的目录来保存我们所有的代码。我们的源代码目录的工作路径如下:telegraf/plugins/inputs/chess
。
文档 提供了一个示例 Telegraf 配置,用于构建我们的插件。以下是可以放在插件目录中的 chess.go
文件中的示例配置
package simple
// simple.go
import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
type Simple struct {
Ok bool `toml:"ok"`
Log telegraf.Logger `toml:"-"`
}
func (s *Simple) Description() string {
return "a demo plugin"
}
func (s *Simple) SampleConfig() string {
return `
## Indicate if everything is fine
ok = true
`
}
// Init is for setup, and validating config.
func (s *Simple) Init() error {
return nil
}
func (s *Simple) Gather(acc telegraf.Accumulator) error {
if s.Ok {
acc.AddFields("state", map[string]interface{}{"value": "pretty good"}, nil)
} else {
acc.AddFields("state", map[string]interface{}{"value": "not great"}, nil)
}
return nil
}
func init() {
inputs.Add("simple", func() telegraf.Input { return &Simple{} })
}
将示例代码放在 Go 文件中后,我们可以开始修改代码以适应我们的插件。首先,我们需要通过编写 package chess
来更新我们的包名称为插件的名称。接下来,我们需要更改 imports 语句下方的数据结构,以表示我们的插件配置并匹配 README。最后,我们可以更新我们在代码中更改的任何其他名称或数据类型。这样,我们就得到了以下内容
package chess
// chess.go
import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
type Chess struct {
Profiles []string `toml:"ok"`
Leaderboard bool `toml:"ok"`
Streamers bool `toml:"ok"`
Countries []string `toml:"ok"`
Log telegraf.Logger `toml:"-"`
}
func (c *Chess) Description() string {
return "a chess plugin"
}
func (c *Chess) SampleConfig() string {
return `
## Indicate if everything is fine
ok = true
`
}
// Init is for setup, and validating config.
func (c *Chess) Init() error {
return nil
}
func (c *Chess) Gather(acc telegraf.Accumulator) error {
if s.Ok {
acc.AddFields("state", map[string]interface{}{"value": "pretty good"}, nil)
} else {
acc.AddFields("state", map[string]interface{}{"value": "not great"}, nil)
}
return nil
}
func init() {
inputs.Add("chess", func() telegraf.Input { return &Chess{} })
}
收集数据
我们现在可以开始致力于从 Chess.com API 收集数据。在上面的代码中,我们有一个名为 Gather()
的方法。此方法将包含从 API 收集信息并将其传递到 Telegraf 的累加器的所有逻辑。为了展示如何做到这一点,以下将解释我们从 API 收集排行榜信息、将 json 响应解组到中间结构中以及将数据传递到累加器的方法。
为了保存从 API 调用传递的信息,我们创建了一些中间结构来保存数据。从 API 调用排行榜数据返回的 json 格式如下
{
"daily":[
{
"player_id": "integer",
"@id": "URL",
"url": "URL",
"username": "string",
"score": "integer",
"rank": "integer" /* [1..50] */
},
[...]
],
}
因此,我们构建了两个结构来保存从 API 返回的数据
type ResponseLeaderboards struct {
PlayerID int `json:"player_id"`
Username string `json:"username"`
Rank int `json:"rank"`
Score int `json:"score"`
}
type Leaderboards struct {
Daily []ResponseLeaderboards `json:"daily"`
}
使用上面定义的结构,我们能够解组响应中的 json 信息并将其传递给累加器。以下代码替换了我们当前 Gather()
方法的实现
func (c *Chess) Gather(acc telegraf.Accumulator) error {
if c.Leaderboard {
// Obtain all public leaderboard information from the
// chess.com api
var leaderboards Leaderboards
// request and unmarshall leaderboard information
// and add it to the accumulator
resp, err := http.Get("https://api.chess.com/pub/leaderboards")
if err != nil {
c.Log.Errorf("failed to GET leaderboards json: %w", err)
return err
}
data, err := io.ReadAll(resp.Body)
defer resp.Body.Close()
if err != nil {
c.Log.Errorf("failed to read leaderboards json response body: %w", err)
return err
}
//unmarshall the data
err = json.Unmarshal(data, &leaderboards)
if err != nil {
c.Log.Errorf("failed to unmarshall leaderboards json: %w", err)
return err
}
for _, stat := range leaderboards.Daily {
var fields = make(map[string]interface{}, len(leaderboards.Daily))
var tags = map[string]string{
"playerId": strconv.Itoa(stat.PlayerID),
}
fields["username"] = stat.Username
fields["rank"] = stat.Rank
fields["score"] = stat.Score
acc.AddFields("leaderboards", fields, tags)
}
}
return nil;
}
在上面的代码中,我们从 chess.com API 调用排行榜端点,并将结果存储在 resp
变量中,最终将所有 json 信息读取到 data
变量中。从那里,我们可以使用 json.Unmarshal()
将 data
中的 json 信息存储在 LeaderBoards 结构 leaderboards
中。完成后,我们遍历排行榜的 Daily
值(其中包含 ResponseLeaderBoard 结构的数组)以将信息传输到 map fields
和 tags
。对于此示例,我们选择玩家 ID 作为用于每个 fields
map 的标签。最后,我们将字符串“leaderboards”以及两个 map fields
和 tags
提供给 acc.AddFields()
方法,以将数据添加到累加器。这样,我们就完成了从 Chess.com 收集有关排行榜的信息。
使用 README 记录您的插件并制作配置文件
样板 readme 在 plugins/inputs/EXAMPLE_README.md
中提供。我们创建了一个示例配置文件来演示我们的插件提供的输入。这是我们的示例配置文件
[[inputs.chess]]
# A list of profiles for monitoring
profiles = ["Hikaru", "Firouzja2003", "Mackus"]
# list the leaderboard
leaderboard = true
# List the current streamers on chess.com
streamers = true
创建 README 时,首先简要描述您的插件及其应完成的任务。下一节是配置。它应该概述 TOML 并给出每个字段的描述。示例配置还应包括字段的默认值(如果用户未设置)。还应包括示例输出。我们使用了运行默认配置文件的输出。
将您的插件制作为外部插件
接下来,我们决定将我们的 chess 插件制作为外部插件。为此,我们遵循了官方存储库中列出的 Telegraf 外部插件指南。外部插件允许更大的灵活性,它们是在 Telegraf 外部构建的,通过 execd
插件运行。由于我们的插件是使用 Golang 编写的,我们将开始使用 execd go shim
。制作外部插件的第一部分是按照上述说明编写插件本身。接下来,将项目移动到外部 repo,确保保留路径结构;我们的路径是 plugins/inputs/chess
。将 main.go 复制到 cmd
文件夹下的项目中。这充当入口点。建议每个 repo 只有一个插件。将插件添加到 main.go
imports;我们添加了以下代码
_ "github.com/influxdata/telegraf/plugins/inputs/chess"
接下来是添加特定于插件的配置文件。这必须与 Telegraf 的其余配置分开。构建插件 go build -o chess cmd/main.go
并测试二进制文件 ./chess --config chess.conf
。最后一步是将 Telegraf 配置为调用 chess 插件。
[[inputs.execd]]
command = ["src/chess", "-config", "chess.conf"]
signal = "none"
"src/chess"
需要是可执行文件的路径,"chess.conf"
需要是配置文件的路径。外部插件现在可以使用了!下一步是考虑将您的外部插件发布到 GitHub 并打开一个拉取请求回到 Telegraf repo,以告知 Telegraf 团队。
最终想法
最终,开发 Telegraf 插件的过程并不太困难。Telegraf 提供的这种易用性和灵活性是最初吸引许多团队成员使用这项技术的原因。它是一个非常通用且强大的工具。读者可以使用我们的经验和我们经历的过程来开发他们自己的 Telegraf 插件。人们可以为其他服务、应用程序甚至游戏制作一个。可能性实际上是无限的,所以还在等什么?立即加入并开始吧!要了解有关 Telegraf 的更多信息,请在此处浏览文档。
特别感谢 Sebastian Spaink 在这个项目期间介入并指导我们。请务必关注 InfluxData 博客,了解其他项目、教程或故事。