Telegraf 插件入门指南

导航到

Telegraf 宠物透明 png也许你们的数据都来自已经有 Telegraf 插件的来源,但我觉得情况并非如此。我迄今为止广泛使用了 http Telegraf 插件,目前正在做一个使用 MQTT 插件的项目,但我一直在考虑自己编写一个插件。

正如你们所知,我大量使用了这里的 Particle.io Photon——主要是因为我有一堆方便的工具,但未来不久我将与其他 IoT 设备进行一些工作!我主要在 Particle Cloud 上进行了一些修改,以便能够直接从 Photon 设备中插入数据到 InfluxDB,但为了这次练习,我想构建一个可以直接与 Particle Cloud 的 Webhooks 架构交互的 Telegraf 插件。

首先,我们将查看现有的 Webhooks 插件,以了解它们的构建和结构,然后我们将修改其中一个来创建我们自己的 Particle.io Webhooks Telegraf 插件。

Particle.io Webhooks

首先,让我们看看 Particle.io Webhooks,这样我们就可以在稍后构建我们的 Telegraf 插件时知道如何操作。正如他们在 文档 中概述的那样

SafariScreenSnapz028

非常直接,我们将会构建上面显示的“Web 服务”之一。因此,让我们去 Particle.io 控制台创建一个新的集成,并将集成类型选择为“Webhook”。我将把这个 webhook 添加到我的现有 IoT 示例项目 中,所以我将事件名称设置为“temperature”,并将 URL 设置为我在云中的 Telegraf 实例的 URL。

对于 Particle 来说,可能有一个独特之处是发布浮点数不受支持。为了解决这个问题,我创建了一个宏

#define temp(x) String(x)

这样我就可以将浮点数转换为字符串。然后我在程序循环中添加了发布调用

Particle.publish("temperature", temp(fTemp), PRIVATE);

我已经准备好了。我可以在 Particle Web 控制台中看到每秒都在发布 'temperature' 事件。所以接下来就是制作一个 Telegraf 插件,将这些事件拉入 InfluxDB!

Telegraf Webhooks 插件

在 GitHub 上有一个专门的部分,里面全是用于 GitHub、Rollbar 等服务的 Webhooks 插件。所以我们将从这里开始,查看 Rollbar 插件。我将 Telegraf 仓库分叉到了我的账户下,以便我可以添加 Particle 插件。(注意: 我不得不将 Telegraf 的所有内容都分叉,但这并不是什么大问题。)现在所有东西都在这里了,是时候深入研究一下了。

首先,我只是简单地复制了整个 Rollbar 目录,并将其重命名为 'particle',然后开始进行我想要的更改。我需要更改的两个文件是 'rollbar_webhooks_events.go' 和 'rollbar_webhooks.go',但首要任务是更改所有提到 'Rollbar'' 的地方,使其变为 'particle'。在所有文件中进行全局替换,将 'Rollbar'' 替换为 'particle',以及将 'Rollbar'' 替换为 'Particle'。

接下来,我从 'rollbar_webhooks.go' 中移除了一堆我不需要的东西。

func NewEvent(dummyEvent *dummyEvent, data []byte) (Event, error) {
 switch dummyEvent.EventName {
 case "new_item":
 return generateEvent(&NewItem{}, data)
 case "deploy":
 return generateEvent(&Deploy{}, data)
 default:
 return nil, errors.New("Not implemented type: " + dummyEvent.EventName)
 }
}

由于这里只生成一种类型的事件,为了简单起见。下面是我的整个 'particle_webhooks.go' 文件

package particle
import (
   "encoding/json"
   "github.com/gorilla/mux"
   "github.com/influxdata/telegraf"
   "io/ioutil"
   "log"
   "net/http"
   "time"
)
type ParticleWebhook struct {
   Path string
   acc telegraf.Accumulator
}
func (rb *ParticleWebhook) Register(router *mux.Router, acc telegraf.Accumulator) {
   router.HandleFunc(rb.Path, rb.eventHandler).Methods("POST")
   log.Printf("I! Started the webhooks_particle on %s\n", rb.Path)
   rb.acc = acc
}
func (rb *ParticleWebhook) eventHandler(w http.ResponseWriter, r *http.Request) {
   defer r.Body.Close()
   data, err := ioutil.ReadAll(r.Body)
   if err != nil {
       w.WriteHeader(http.StatusBadRequest)
       return
   }
   var event ParticleData
   err = json.Unmarshal(data, &event)
   if err != nil {
       w.WriteHeader(http.StatusBadRequest)
       return
   }
   fields, err := event.Fields()
   if err != nil {
       w.WriteHeader(http.StatusBadRequest)
       return
   }
   rb.acc.AddFields("particle_webhooks", fields, event.Tags(), time.Now())
   w.WriteHeader(http.StatusOK)
}

相当直接,即使是作为一个非 Go 程序员,我也能够理解这里发生了什么并使其工作。对我来说作为非 Go 程序员(目前还不是!)更棘手的是 'particle_webhooks_events.go' 中的下一部分,所以我得到了一些帮助。

基本上,我需要定义一些结构,这些结构与来自 Particle 的 incoming json 中的字段相对应——稍后我会详细介绍 Particle 的 json。下面是我们最终得到的结果

package particle

import (
    "strconv"
)

type Event interface {
    Tags() map[string]string
    Fields() map[string]interface{}
}

type ParticleData struct {
    Event string `json:"name"`
    Data  string `json:"data"`
    coreid string `json:"coreid"`
}

func (pd *ParticleData) Tags() map[string]string {
    return map[string]string{
	"id": pd.coreid,
    }
}

func (pd *ParticleData) Fields() (map[string]interface{}, error) {
    f, err := strconv.ParseFloat(pd.Data, 64)
    if err != nil {
	return nil, err
    }
    return map[string]interface{}{
	"temp_f": f,
    }, nil
}

让我们简要地过一下,以防它对你和我一样让人困惑。首先是 ParticleData struct,它列出了我从 incoming json 对象中感兴趣的字段。对我来说这很清楚。两个映射函数则不太清楚。基本上,它们将 ParticleData 对象中的 “coreid” 字段映射到数据库中的 'id' 标签,然后将 'Data' 字段映射到数据库中的 'temp_f' 字段。这里有一点小麻烦,因为我不得不将 Data 字段从字符串转换为双精度值,因为 Particle 设备只能发布 INTs 和字符串。

完成这些后,我必须回到 Telegraf 源树的顶层,使用 'make' 构建Telegraf,然后将生成的二进制文件部署到我的 InfluxDB 服务器上。(提示: 如果你像我一样在 Mac 上开发并在 Linux 服务器上部署,设置两个环境变量 “GOOS=linux” 和 “GOARCH=amd64”,你将得到一个交叉编译的版本。我爱这个!)

调整粒子 json

现在我们来说说粒子设备本身,以及来自粒子的整个 json 业务。我们已经将粒子设备的输出更改为发布温度数据,所以它运行良好。接下来,我们需要进入粒子控制台,为已发布的活动定义 Webhook

SafariScreenSnapz029

您可以看到我已经定义了它,但我也会带您了解定义过程。重要的是要记住,我们之前构建的粒子 Webhook 将在 http://<yourhost>:1619/particle 上运行,所以在您的 Webhook 定义中,您需要将其作为 URL。现在,这里有一个难点,我调试了这个问题花了很长时间,所以请跟随我的步骤。默认情况下,Webhook 将以 webform-编码的数据发送您的数据,但这并不是您想要的。但是,如果您去查看控制台的事件查看器中的事件,您不会知道它们是 webform-编码的,因为它们被列出来

{"data":"78.763999","ttl":60,"published_at":"2017-09-28T18:22:00.701Z","coreid":"xxxxxxxxx","userid":"yyyyyyyyy","version":10,"public":false,"productID":5343,"name":"temperature"}

这看起来可疑地像 json 数据。但实际上发送的不是这些!所以,当您定义您的 Webhook 时,请查看“高级设置”,并在 json 字段中定义 任何 内容。

SafariScreenSnapz030

这将强制将数据以 json 格式发送到您的端点。我不知道为什么它会这样工作,但它确实有效。

现在,使用我自定义的 Telegraf 版本以及新定义的来自粒子的 Webhook,我的设备通过我的粒子 Telegraf 插件将数据发送到 InfluxDB 实例。

最终思考

这真是一个意外的简单任务,除了由于 json 问题导致的奇怪行为调试之外。在一位同事的帮助下,以及一些进一步的工作,编写 Telegraf 插件花费了我大约一天的时间。最终,我希望对其进行扩展和泛化,以便它不仅仅针对 的需求——解析它发送的读数,并根据 json 中的其他字段将它们放入数据库——但到目前为止,我有一个粒子 Telegraf 插件可以记录我想要的数据。

SafariScreenSnapz031