从Chess.com收集数据:编写自定义Telegraf插件的方法
作者:社区 / 用例,产品,开发者
2021年7月26日
导航至
本文由InfluxData 2021年夏季实习生Noé Garcia、Mya Longmire、Aidan Tai、Dane Strandboge和Merrick Tian撰写。
Telegraf是一个强大的工具,能够使您从堆栈、传感器或系统中收集指标和信息。Telegraf处理繁重的工作和计算,让您有更多时间专注于如何使用新收集的数据。因此,我们的团队决定为Telegraf构建一个简单的插件,从Chess.com收集信息和用户指标。我们使用官方Chess.com的发布数据API来收集信息。在本博文中,我们将介绍我们如何构建此插件,以指导读者通过此过程,以便他们可以构建自己的插件。
开始
为了开始我们的插件,我们从其官方GitHub仓库分叉并下载了Telegraf项目。所有Telegraf插件都位于plugins
目录中,由于我们从外部来源收集系统信息,因此我们将源代码放在了plugins/inputs
目录中。在那里,我们创建了一个名为chess
的目录来存放所有代码。我们的源代码工作路径如下:telegraf/plugins/inputs/chess
。
文档提供了一个示例Telegraf配置,我们可以基于此配置构建我们的插件。以下是我们可以在插件目录中的chess.go
文件中放置的示例配置
package simple
// simple.go
import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
type Simple struct {
Ok bool `toml:"ok"`
Log telegraf.Logger `toml:"-"`
}
func (s *Simple) Description() string {
return "a demo plugin"
}
func (s *Simple) SampleConfig() string {
return `
## Indicate if everything is fine
ok = true
`
}
// Init is for setup, and validating config.
func (s *Simple) Init() error {
return nil
}
func (s *Simple) Gather(acc telegraf.Accumulator) error {
if s.Ok {
acc.AddFields("state", map[string]interface{}{"value": "pretty good"}, nil)
} else {
acc.AddFields("state", map[string]interface{}{"value": "not great"}, nil)
}
return nil
}
func init() {
inputs.Add("simple", func() telegraf.Input { return &Simple{} })
}
将示例代码放入我们的Go文件后,我们可以开始修改代码以适应我们的插件。首先,我们需要将我们的包名更新为插件的名称,通过写入package chess
。接下来,我们需要更改import语句下面的数据结构,以表示我们的插件配置并与README匹配。最后,我们可以更新代码中更改的任何其他名称或数据类型。这样,我们就得到了以下内容
package chess
// chess.go
import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
type Chess struct {
Profiles []string `toml:"ok"`
Leaderboard bool `toml:"ok"`
Streamers bool `toml:"ok"`
Countries []string `toml:"ok"`
Log telegraf.Logger `toml:"-"`
}
func (c *Chess) Description() string {
return "a chess plugin"
}
func (c *Chess) SampleConfig() string {
return `
## Indicate if everything is fine
ok = true
`
}
// Init is for setup, and validating config.
func (c *Chess) Init() error {
return nil
}
func (c *Chess) Gather(acc telegraf.Accumulator) error {
if s.Ok {
acc.AddFields("state", map[string]interface{}{"value": "pretty good"}, nil)
} else {
acc.AddFields("state", map[string]interface{}{"value": "not great"}, nil)
}
return nil
}
func init() {
inputs.Add("chess", func() telegraf.Input { return &Chess{} })
}
收集数据
现在我们可以开始从Chess.com API收集数据了。在上面的代码中,有一个名为Gather()
的方法。该方法将包含从API收集信息并将其传递给Telegraf的累积器的所有逻辑。为了说明如何做到这一点,以下将解释我们如何从API收集排行榜信息,将json响应反序列化为中间结构,并将这些数据传递给累积器。
为了存储从API调用传递过来的信息,我们创建了中间结构来保存数据。从API获取排行榜数据的调用返回以下格式的json
{
"daily":[
{
"player_id": "integer",
"@id": "URL",
"url": "URL",
"username": "string",
"score": "integer",
"rank": "integer" /* [1..50] */
},
[...]
],
}
因此,我们构建了两个结构来保存API返回的数据
type ResponseLeaderboards struct {
PlayerID int `json:"player_id"`
Username string `json:"username"`
Rank int `json:"rank"`
Score int `json:"score"`
}
type Leaderboards struct {
Daily []ResponseLeaderboards `json:"daily"`
}
使用上述定义的结构,我们能够将响应中的json信息反序列化并传递给累加器。以下代码替换了当前的Gather()
方法实现
func (c *Chess) Gather(acc telegraf.Accumulator) error {
if c.Leaderboard {
// Obtain all public leaderboard information from the
// chess.com api
var leaderboards Leaderboards
// request and unmarshall leaderboard information
// and add it to the accumulator
resp, err := http.Get("https://api.chess.com/pub/leaderboards")
if err != nil {
c.Log.Errorf("failed to GET leaderboards json: %w", err)
return err
}
data, err := io.ReadAll(resp.Body)
defer resp.Body.Close()
if err != nil {
c.Log.Errorf("failed to read leaderboards json response body: %w", err)
return err
}
//unmarshall the data
err = json.Unmarshal(data, &leaderboards)
if err != nil {
c.Log.Errorf("failed to unmarshall leaderboards json: %w", err)
return err
}
for _, stat := range leaderboards.Daily {
var fields = make(map[string]interface{}, len(leaderboards.Daily))
var tags = map[string]string{
"playerId": strconv.Itoa(stat.PlayerID),
}
fields["username"] = stat.Username
fields["rank"] = stat.Rank
fields["score"] = stat.Score
acc.AddFields("leaderboards", fields, tags)
}
}
return nil;
}
在上面的代码中,我们从chess.com API调用排行榜端点,并将结果存储在resp
变量中,最终将所有json信息读入到data
变量中。从那里,我们可以使用json.Unmarshal()
来提取data
中的json信息,并将其存储在LeaderBoards
结构中。一旦完成,我们遍历排行榜的Daily
值(其中包含ResponseLeaderBoard结构的数组),将信息传递到fields
和tags
映射中。在这个例子中,我们选择玩家ID作为每个fields
映射的标签。最后,我们将字符串“leaderboards”以及两个映射fields
和tags
一起传递给acc.AddFields()
方法,将数据添加到累加器中。就这样,我们开始收集来自Chess.com的排行榜信息。
用README和配置文件记录你的插件
在plugins/inputs/EXAMPLE_README.md
中提供了一个模板README。我们创建了一个示例配置文件来展示插件提供的输入。以下是我们的示例配置文件
[[inputs.chess]]
# A list of profiles for monitoring
profiles = ["Hikaru", "Firouzja2003", "Mackus"]
# list the leaderboard
leaderboard = true
# List the current streamers on chess.com
streamers = true
编写README时,首先简要描述你的插件及其预期实现的目标。下一部分是配置部分。它应该概述TOML并描述每个字段。示例配置还应包括用户未设置的默认字段值。还应包含一个示例输出。我们使用了运行默认配置文件输出的结果。
将你的插件做成外部插件
接下来,我们决定将我们的国际象棋插件做成外部插件。为了做到这一点,我们遵循了官方仓库中列出的Telegraf外部插件指南。外部插件提供了更多的灵活性,它们在Telegraf之外构建,并通过execd
插件运行。由于我们的插件是用Golang编写的,我们将开始使用execd go shim
。创建外部插件的第一步是按照上述描述编写插件本身。接下来,将项目移动到外部仓库,确保保留路径结构;我们的结构是plugins/inputs/chess
。将项目下的main.go复制到cmd
文件夹中。这充当入口点。建议每个仓库只包含一个插件。将插件添加到main.go
导入中;我们添加了以下代码
_ "github.com/influxdata/telegraf/plugins/inputs/chess"
下一步是添加特定于插件的配置文件。这必须与Telegraf的其他配置分开。构建插件 go build -o chess cmd/main.go
并测试二进制文件 ./chess --config chess.conf
。最后一步是配置Telegraf以调用象棋插件。
[[inputs.execd]]
command = ["src/chess", "-config", "chess.conf"]
signal = "none"
"src/chess"
需要指向可执行文件的路径,而 "chess.conf"
需要指向配置文件的路径。外部插件现在可以使用了!下一步是考虑将您的外部插件发布到GitHub,并向Telegraf仓库提交一个pull request,以便让Telegraf团队知道。
最后的想法
最终,开发Telegraf插件的过程并不太困难。Telegraf提供的这种易用性和灵活性是许多团队最初选择这项技术的原因。它是一个非常灵活且强大的工具。读者可以使用我们的经验和开发过程中我们所经历的来开发自己的Telegraf插件。可以为其他服务、应用程序甚至游戏制作一个。可能性几乎是无限的,那么你还在等什么?加入进来,开始吧!要了解更多关于Telegraf的信息,请在此处查看文档。
特别感谢Sebastian Spaink在此项目中挺身而出并为我们提供指导。务必关注InfluxData博客,了解其他项目、教程或故事。