构建更好的 Telegraf 插件(仍适用于初学者)
作者 David G. Simmons / 产品,用例,开发者
2017 年 9 月 30 日
导航至
感觉就像是在昨天,我还在写一篇关于 如何为初学者构建 Telegraf 插件 的博客。等等,那真的是昨天!然而,我现在又回来了,再次写一篇。如果你昨天读了那篇帖子,一直读到结尾,你会记得我说过
最终,我想扩展和泛化它,使其不仅仅针对我自己的需求——让它解析它发送的读取值,并根据 JSON 中的其他字段将它们放入数据库中
在那篇帖子的“发布”按钮刚刚按下后,我就决定“为什么不直接做呢?”所以我做了。而且,它比我想象的还要好!那个其他的插件我只发布了一个值,这实际上并不太有用。实际上,我的小传感器 发布了 6 个值,所以发布 6 个不同的消息,一个对应每个值,将会非常低效并且过于耗费资源。我必须在设备、Particle Cloud 和插件中进行更改,才能使这一切正常工作,所以我将带你一一了解。
在 Particle 设备和云平台
以前,我使用行协议一次性发布了所有的读取值和标签,所以我认为我会重新利用这一点来为这个插件。为了回顾一下,这是我之前发布的方式
request.body = String::format("influxdata_sensors,id=%s,location=%s temp_c=%f,temp_f=%f,humidity=%f,pressure=%f,altitude=%f,broadband=%d,infrared=%d,lux=%f", myID.c_str(), myName.c_str(), temperature, fTemp, humidity, pressure, altitude, broadband, infrared, lux);
http.post(request, response, headers);
所有标签、字段和值都整齐地放在一个HTTP POST请求中。实际上,从设备的角度来看,我所要做的就是重新格式化字符串,将其构建成正确的JSON格式,然后发布到Particle云
String data = String::format("{ \"tags\" : {\"id\": \"%s\", \"location\": \"%s\"}, \"values\": {\"temp_c\": %f, \"temp_f\": %f, \"humidity\": %f, \"pressure\": %f, \"altitude\": %f, \"broadband\": %d, \"infrared\": %d, \"lux\": %d}}", myID.c_str(), myName.c_str(), temperature, fTemp, humidity, pressure, altitude, broadband, infrared, lux);
Particle.publish("sensor", data, PRIVATE);
这将发布一个JSON字符串,经过Particle云处理后的样子如下
{"data":"{ "tags" : { "id": "2b123b123123123123123123", "location": "MyLocation" }, "values": { "temp_c": 23.459999, "temp_f": 74.227998, "humidity": 45.628906, "pressure": 1017.543274, "altitude": -35.683643, "broadband": 34, "infrared": 19, "lux": 0 } }", "ttl":60, "published_at":"2017-09-30T17:00:27.832Z", "coreid":"2b003b001047343438323536", "userid":"XXXX", "version":11, "public":false, "productID":5343, "name":"sensor", "influx_db":"influxdata_sensors" }
我在Particle.io集成页面添加了那个最后字段,方式如下
当谈到插件详情时,你就会明白为什么了。
这件事最好的部分是,我减少了设备上的代码大小约80%,因为我不需要再在循环中每次都创建和销毁自己的HTTP通信通道。直接使用内置的Particle云调用更有效率。
在插件方面
现在设备已经将所有读取数据一次性发送到Particle云,我也设置了Particle集成,所以我需要修复Telegraf插件以处理即将接收的所有新数据——以及更复杂的JSON。这并没有我想象的那么困难。首先,由于新的JSON有多个层级,我必须创建一个“虚拟”结构,以便将接收到的JSON解包到其中。从那里,我将部分结构解包到一个新的结构中,该结构包含标签、字段和值。
type DummyData struct { Event string `json:"event"` Data string `json:"data"` Ttl int `json:"ttl"` PublishedAt string `json:"published_at"` InfluxDB string `json:"influx_db"` }
我可以提取产品ID、版本和其他字段,但我对这些并不感兴趣。我将所有传感器数据拉出到“数据”元素中,然后我可以进入那里获取所需的所有数据。注意“InfluxDB”字段。这是我在Particle云上定义Webhook时插入的字段,我将使用它将数据插入我想要的InfluxDB数据库。
type ParticleData struct { Event string `json:"event"` Tags map[string]string `json:"tags"` Fields map[string]interface{} `json:"values"` }
由于我可以使用映射来处理标签和值,因为它们已经在JSON中正确设置,以便它们被插入到数据库中。这就是使这一切变得如此简单的原因。还有一件事我想做,因为多个设备将通过此插件推送数据,所以我想使用提交的时间戳而不是让插件对数据进行时间戳标记。为此,我需要将“PublishedAt”字段解析为正确的InfluxDB时间戳,如下所示
func (d *DummyData) Time() (time.Time, error) { return time.Parse("2006-01-02T15:04:05Z", d.PublishedAt) }
所以这是我创建的对象,这是使用它们的方式
func (rb *ParticleWebhook) eventHandler(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() data, err := ioutil.ReadAll(r.Body) if err != nil { w.WriteHeader(http.StatusBadRequest) return } dummy := &DummyData{} if err := json.Unmarshal(data, dummy); err != nil { w.WriteHeader(http.StatusBadRequest) return } pd := &ParticleData{} if err := json.Unmarshal([]byte(dummy.Data), pd); err != nil { w.WriteHeader(http.StatusBadRequest) return } pTime, err := dummy.Time() if err != nil { log.Printf("Time Conversion Error") } rb.acc.AddFields(dummy.InfluxDB, pd.Fields, pd.Tags, pTime) w.WriteHeader(http.StatusOK) }
如你所见,我将接收到的数据解包到DummyData结构中,然后将DummyData的Data字段解包到ParticleData结构中。我解析时间戳,然后将所有数据写入我选择的InfluxDB数据库。但这里真正酷的地方是,有两点。我可以只发送标签和字段的映射,它们都会被正确排序,我可以在Particle云级别选择使用哪个数据库。在我的情况下,我能够无缝地将设备过渡到使用插件,而无需在我的数据库中移动数据。我还可以通过Particle的一个JSON定义将其他Particle产品指向其他数据库。
几乎完成了
我现在已经完成了我的插件,我可以非常容易地使用它来处理几乎任何我想要的Particle项目。但还没有完全完成,因为你还不能使用它!所以我的待办事项列表中的最后一件事是完成此插件的单元测试,然后在GitHub上提交一个Pull Request,以便它可以在未来的版本中包含。同时,我会将它推送到我的GitHub,以便任何人都可以自行构建。