面向初学者的 Telegraf 插件编写指南
作者:David G. Simmons / 产品, 用例, 开发者
2017 年 9 月 28 日
导航至
也许您的所有数据都来自已经有现有 Telegraf 插件的源,但我想情况并非如此。到目前为止,我广泛使用了 http Telegraf 插件,并且我正在进行一个使用 MQTT 插件的项目,但我一直在考虑编写自己的插件。
众所周知,我在这里广泛使用了 Particle.io Photon——主要是因为我手头有很多,但请期待在不久的将来与其他物联网设备合作!我主要通过破解 Particle Cloud 来实现从 Photon 设备直接将数据插入到 InfluxDB 中,但为了这次练习,我考虑构建一个 Telegraf 插件,该插件可以直接与 Particle Cloud 的 Webhooks 架构进行交互。
首先,我们将查看现有的 Webhooks 插件,了解它们的构建和结构方式,然后我们将修改其中一个插件以创建我们自己的 Particle.io Webhooks Telegraf 插件。
Particle.io Webhooks
让我们首先看看 Particle.io Webhooks,这样我们稍后就知道如何构建我们的 Telegraf 插件。正如他们的 文档 中概述的那样
非常简单明了,我们将构建的是上面显示的“Web 服务”之一。因此,我们前往 Particle.io 控制台创建一个新的集成,并选择“Webhook”作为集成类型。我将把这个 Web Hook 添加到我现有的 物联网演示项目 中,因此我将事件名称设置为“temperature”,对于 URL,我将使用我在云中的 Telegraf 实例的 URL。
对于 Particle 来说,可能有一件事是独一无二的,那就是不支持发布浮点数。为了解决这个问题,我只是创建了一个宏
#define temp(x) String(x)
这样我就可以将浮点数转换为字符串。然后,我将发布调用添加到我的程序循环中
Particle.publish("temperature", temp(fTemp), PRIVATE);
我准备好了。然后我可以在 Particle Web 控制台中看到每秒都在发布“temperature”事件。现在,是时候制作一个 Telegraf 插件,将这些事件拉入 InfluxDB 了!
Telegraf Webhooks 插件
在 GitHub 上,有一个 整个部分 都是 Webhooks 插件,用于 GitHub 本身、Rollbar 等服务。因此,我们将从那里开始,查看 Rollbar 插件。我将 Telegraf 存储库分叉到我自己的帐户中,以便我可以更改它以添加 Particle 插件。(注意: 我必须分叉所有 的 Telegraf 才能做到这一点,但这没什么大不了的。)现在我已经拥有了所有这些,是时候深入研究了。
首先,我只是简单地复制了整个 Rollbar 目录并将其重命名为“particle”,然后开始进行我想要的更改。我需要 更改的两个文件是“rollbar_webhooks_events.go”和“rollbar_webhooks.go”,但首要任务是将所有说“Rollbar”的内容更改为“particle”。在所有文件中全局替换“Rollbar”为“particle”和“Rollbar”为“Particle”也是有必要的。
接下来,我从“rollbar_webhooks.go”中取出了一堆我不需要的东西。具体来说是
func NewEvent(dummyEvent *dummyEvent, data []byte) (Event, error) {
switch dummyEvent.EventName {
case "new_item":
return generateEvent(&NewItem{}, data)
case "deploy":
return generateEvent(&Deploy{}, data)
default:
return nil, errors.New("Not implemented type: " + dummyEvent.EventName)
}
}
因为我在这里只生成一种类型的事件,为了简单起见。这是我的整个“particle_webhooks.go”文件
package particle
import (
"encoding/json"
"github.com/gorilla/mux"
"github.com/influxdata/telegraf"
"io/ioutil"
"log"
"net/http"
"time"
)
type ParticleWebhook struct {
Path string
acc telegraf.Accumulator
}
func (rb *ParticleWebhook) Register(router *mux.Router, acc telegraf.Accumulator) {
router.HandleFunc(rb.Path, rb.eventHandler).Methods("POST")
log.Printf("I! Started the webhooks_particle on %s\n", rb.Path)
rb.acc = acc
}
func (rb *ParticleWebhook) eventHandler(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
data, err := ioutil.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
var event ParticleData
err = json.Unmarshal(data, &event)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
fields, err := event.Fields()
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
rb.acc.AddFields("particle_webhooks", fields, event.Tags(), time.Now())
w.WriteHeader(http.StatusOK)
}
非常简单明了,即使作为一名非 Go 程序员,我也能够弄清楚这里发生了什么并使其工作。对于我这个非 Go 程序员(目前!)来说,更棘手的部分是“particle_webhooks_events.go”中的下一部分,所以我得到了一些帮助。
基本上,我需要定义一些结构,这些结构将对应于来自 Particle 的传入 json 中的字段——稍后会详细介绍 Particle json。这就是我们最终得到的结果
package particle
import (
"strconv"
)
type Event interface {
Tags() map[string]string
Fields() map[string]interface{}
}
type ParticleData struct {
Event string `json:"name"`
Data string `json:"data"`
coreid string `json:"coreid"`
}
func (pd *ParticleData) Tags() map[string]string {
return map[string]string{
"id": pd.coreid,
}
}
func (pd *ParticleData) Fields() (map[string]interface{}, error) {
f, err := strconv.ParseFloat(pd.Data, 64)
if err != nil {
return nil, err
}
return map[string]interface{}{
"temp_f": f,
}, nil
}
让我们稍微了解一下,以防您像我一样感到困惑。首先是 ParticleData struct,它列出了我对传入 json 对象感兴趣的字段。这对我来说足够清楚了。两个映射函数不太清楚。基本上,它们将 ParticleData 对象中的“coreid”字段映射到我数据库中的“id”标签,然后将“Data”字段映射到我数据库中的“temp_f”字段。这里有一个小问题,我必须将 Data 字段从字符串转换为双精度值,因为 Particle 设备只能发布 INTs 和字符串。
完成后,我必须返回到 Telegraf 源代码树的顶层,并使用“make”构建 Telegraf,然后将生成的二进制文件部署到我的 InfluxDB 服务器。(提示: 如果像我一样,您在 Mac 上开发并在 Linux 盒子上部署,请设置两个环境变量“GOOS=linux”和“GOARCH=amd64”,您将获得一个交叉编译版本。我喜欢这样!)
调整 Particle json
现在是时候谈谈 particle 设备本身,以及来自 Particle 的整个 json 业务了。我们已经更改了 Particle 设备本身的输出以发布温度数据,因此这可以正常工作。接下来,我们必须进入 Particle 控制台并为发布的事件定义 Webhook。
您可以看到我已经定义了它,但我也会引导您完成定义。重要的是要记住,我们之前构建的 Particle Webhook 将在 http://<yourhost>:1619/particle 上运行,因此在您的 Webhook 定义中,您将把它作为 URL 放置。现在,这是棘手的部分,我花了一段时间才调试出来,所以请继续关注。 Webhook 的默认设置是以 webform 编码的数据发送您的数据,而这不是 您想要的。但是,如果您去查看控制台事件查看器上的事件,您不会知道它是 webform 编码的,因为它们被列为
{"data":"78.763999","ttl":60,"published_at":"2017-09-28T18:22:00.701Z","coreid":"xxxxxxxxx","userid":"yyyyyyyyy","version":10,"public":false,"productID":5343,"name":"temperature"}
这看起来非常像 json 数据。但这不是 实际发送的内容!因此,当您定义 Webhook 时,请查看“高级设置”并在 json 字段中定义某些内容,任何内容
这将强制数据以 json 形式发送到您的端点。我不知道它为什么会这样工作,但它确实有效。
现在,使用我自定义构建的 Telegraf 版本,以及我从 Particle 新定义的 Webhook,我的设备正在通过我的 Particle Telegraf 插件将数据发送到 InfluxDB 实例。
最终想法
除了由于 json 恶作剧而导致的奇怪行为调试之外,这是一项非常容易的任务。在一位 GO 方面的同事的帮助下,以及随后的一些进一步工作,编写 Telegraf 插件花了我大约一天左右的时间。最终,我想稍微扩展和概括它,使其不那么特定于我的 需求——让它解析发送给它的读数,并根据 json 中的其他字段将其放入数据库中——但就目前而言,我有一个 Particle Telegraf 插件,可以记录我想要的数据。