从Chess.com收集数据:编写自定义Telegraf插件的方法

导航至

本文由InfluxData 2021年夏季实习生Noé Garcia、Mya Longmire、Aidan Tai、Dane Strandboge和Merrick Tian撰写。

Telegraf是一个强大的工具,能够使您从堆栈、传感器或系统中收集指标和信息。Telegraf处理繁重的工作和计算,让您有更多时间专注于如何使用新收集的数据。因此,我们的团队决定为Telegraf构建一个简单的插件,从Chess.com收集信息和用户指标。我们使用官方Chess.com的发布数据API来收集信息。在本博文中,我们将介绍我们如何构建此插件,以指导读者通过此过程,以便他们可以构建自己的插件。

开始

为了开始我们的插件,我们从其官方GitHub仓库分叉并下载了Telegraf项目。所有Telegraf插件都位于plugins目录中,由于我们从外部来源收集系统信息,因此我们将源代码放在了plugins/inputs目录中。在那里,我们创建了一个名为chess的目录来存放所有代码。我们的源代码工作路径如下:telegraf/plugins/inputs/chess

文档提供了一个示例Telegraf配置,我们可以基于此配置构建我们的插件。以下是我们可以在插件目录中的chess.go文件中放置的示例配置

package simple

// simple.go

import (
   "github.com/influxdata/telegraf"
   "github.com/influxdata/telegraf/plugins/inputs"
)

type Simple struct {
   Ok  bool            `toml:"ok"`
   Log telegraf.Logger `toml:"-"`
}

func (s *Simple) Description() string {
   return "a demo plugin"
}

func (s *Simple) SampleConfig() string {
   return `
 ## Indicate if everything is fine
 ok = true
`
}

// Init is for setup, and validating config.
func (s *Simple) Init() error {
 return nil
}

func (s *Simple) Gather(acc telegraf.Accumulator) error {
   if s.Ok {
       acc.AddFields("state", map[string]interface{}{"value": "pretty good"}, nil)
   } else {
       acc.AddFields("state", map[string]interface{}{"value": "not great"}, nil)
   }

   return nil
}

func init() {
   inputs.Add("simple", func() telegraf.Input { return &Simple{} })
}

将示例代码放入我们的Go文件后,我们可以开始修改代码以适应我们的插件。首先,我们需要将我们的包名更新为插件的名称,通过写入package chess。接下来,我们需要更改import语句下面的数据结构,以表示我们的插件配置并与README匹配。最后,我们可以更新代码中更改的任何其他名称或数据类型。这样,我们就得到了以下内容

package chess

// chess.go

import (
   "github.com/influxdata/telegraf"
   "github.com/influxdata/telegraf/plugins/inputs"
)

type Chess struct {
   Profiles    []string        `toml:"ok"`
   Leaderboard bool            `toml:"ok"`
   Streamers   bool            `toml:"ok"`
   Countries   []string        `toml:"ok"`
   Log         telegraf.Logger `toml:"-"`
}

func (c *Chess) Description() string {
   return "a chess plugin"
}

func (c *Chess) SampleConfig() string {
   return `
 ## Indicate if everything is fine
 ok = true
`
}

// Init is for setup, and validating config.
func (c *Chess) Init() error {
 return nil
}

func (c *Chess) Gather(acc telegraf.Accumulator) error {
   if s.Ok {
       acc.AddFields("state", map[string]interface{}{"value": "pretty good"}, nil)
   } else {
       acc.AddFields("state", map[string]interface{}{"value": "not great"}, nil)
   }

   return nil
}

func init() {
   inputs.Add("chess", func() telegraf.Input { return &Chess{} })
}

收集数据

现在我们可以开始从Chess.com API收集数据了。在上面的代码中,有一个名为Gather()的方法。该方法将包含从API收集信息并将其传递给Telegraf的累积器的所有逻辑。为了说明如何做到这一点,以下将解释我们如何从API收集排行榜信息,将json响应反序列化为中间结构,并将这些数据传递给累积器。

为了存储从API调用传递过来的信息,我们创建了中间结构来保存数据。从API获取排行榜数据的调用返回以下格式的json

{
   "daily":[
      {
         "player_id": "integer",
         "@id": "URL",
         "url": "URL",
         "username": "string",
         "score": "integer",
         "rank": "integer" /* [1..50] */
      },
      [...]
   ],
   …
}

因此,我们构建了两个结构来保存API返回的数据

type ResponseLeaderboards struct {
   PlayerID int    `json:"player_id"`
   Username string `json:"username"`
   Rank     int    `json:"rank"`
   Score    int    `json:"score"`
}

type Leaderboards struct {
   Daily []ResponseLeaderboards `json:"daily"`
}

使用上述定义的结构,我们能够将响应中的json信息反序列化并传递给累加器。以下代码替换了当前的Gather()方法实现

func (c *Chess) Gather(acc telegraf.Accumulator) error {
   if c.Leaderboard {
       // Obtain all public leaderboard information from the
       // chess.com api

       var leaderboards Leaderboards
       // request and unmarshall leaderboard information
       // and add it to the accumulator
       resp, err := http.Get("https://api.chess.com/pub/leaderboards")
       if err != nil {
           c.Log.Errorf("failed to GET leaderboards json: %w", err)
           return err
       }

       data, err := io.ReadAll(resp.Body)
       defer resp.Body.Close()
       if err != nil {
           c.Log.Errorf("failed to read leaderboards json response body: %w", err)
           return err
       }

       //unmarshall the data
       err = json.Unmarshal(data, &leaderboards)
       if err != nil {
           c.Log.Errorf("failed to unmarshall leaderboards json: %w", err)
           return err
       }

       for _, stat := range leaderboards.Daily {
           var fields = make(map[string]interface{}, len(leaderboards.Daily))
           var tags = map[string]string{
               "playerId": strconv.Itoa(stat.PlayerID),
           }
           fields["username"] = stat.Username
           fields["rank"] = stat.Rank
           fields["score"] = stat.Score
           acc.AddFields("leaderboards", fields, tags)
       }
   }
   return nil;
}

在上面的代码中,我们从chess.com API调用排行榜端点,并将结果存储在resp变量中,最终将所有json信息读入到data变量中。从那里,我们可以使用json.Unmarshal()来提取data中的json信息,并将其存储在LeaderBoards结构中。一旦完成,我们遍历排行榜的Daily值(其中包含ResponseLeaderBoard结构的数组),将信息传递到fieldstags映射中。在这个例子中,我们选择玩家ID作为每个fields映射的标签。最后,我们将字符串“leaderboards”以及两个映射fieldstags一起传递给acc.AddFields()方法,将数据添加到累加器中。就这样,我们开始收集来自Chess.com的排行榜信息。

用README和配置文件记录你的插件

plugins/inputs/EXAMPLE_README.md中提供了一个模板README。我们创建了一个示例配置文件来展示插件提供的输入。以下是我们的示例配置文件

[[inputs.chess]]
# A list of profiles for monitoring 
  profiles = ["Hikaru", "Firouzja2003", "Mackus"]
# list the leaderboard 
  leaderboard = true
# List the current streamers on chess.com
  streamers = true

编写README时,首先简要描述你的插件及其预期实现的目标。下一部分是配置部分。它应该概述TOML并描述每个字段。示例配置还应包括用户未设置的默认字段值。还应包含一个示例输出。我们使用了运行默认配置文件输出的结果。

将你的插件做成外部插件

接下来,我们决定将我们的国际象棋插件做成外部插件。为了做到这一点,我们遵循了官方仓库中列出的Telegraf外部插件指南。外部插件提供了更多的灵活性,它们在Telegraf之外构建,并通过execd插件运行。由于我们的插件是用Golang编写的,我们将开始使用execd go shim。创建外部插件的第一步是按照上述描述编写插件本身。接下来,将项目移动到外部仓库,确保保留路径结构;我们的结构是plugins/inputs/chess。将项目下的main.go复制到cmd文件夹中。这充当入口点。建议每个仓库只包含一个插件。将插件添加到main.go导入中;我们添加了以下代码

_ "github.com/influxdata/telegraf/plugins/inputs/chess"

下一步是添加特定于插件的配置文件。这必须与Telegraf的其他配置分开。构建插件 go build -o chess cmd/main.go 并测试二进制文件 ./chess --config chess.conf。最后一步是配置Telegraf以调用象棋插件。

[[inputs.execd]]
  command = ["src/chess", "-config", "chess.conf"]
  signal = "none"

"src/chess" 需要指向可执行文件的路径,而 "chess.conf" 需要指向配置文件的路径。外部插件现在可以使用了!下一步是考虑将您的外部插件发布到GitHub,并向Telegraf仓库提交一个pull request,以便让Telegraf团队知道。

最后的想法

最终,开发Telegraf插件的过程并不太困难。Telegraf提供的这种易用性和灵活性是许多团队最初选择这项技术的原因。它是一个非常灵活且强大的工具。读者可以使用我们的经验和开发过程中我们所经历的来开发自己的Telegraf插件。可以为其他服务、应用程序甚至游戏制作一个。可能性几乎是无限的,那么你还在等什么?加入进来,开始吧!要了解更多关于Telegraf的信息,请在此处查看文档

特别感谢Sebastian Spaink在此项目中挺身而出并为我们提供指导。务必关注InfluxData博客,了解其他项目、教程或故事。