使用 InfluxDB 和 Telegraf 监控比特币和加密货币
作者:Russ Savage / 产品, 开发者
2017 年 11 月 29 日
导航至
监控比特币
加密货币市场比以往任何时候都更加火爆。比特币和以太坊是两种最流行的加密货币,它们的价格受到新闻报道和思想领袖公开声明推动的投机行为的严重影响。为了帮助您跟踪瞬息万变的加密货币世界,我将引导您了解如何为 Telegraf 构建自定义服务输入插件,该插件将监控 Twitter 直播中的关键字并将它们发送到 InfluxDB。我们将使用 Chronograf 来探索数据并构建一个简单的仪表板。最后,我们将得到一个类似于以下的仪表板。
当然,除非您是少数幸运儿之一,可以访问 Twitter 消防水管,否则这一切都只是为了好玩,但这是一种很好的入门方式,可以学习为任何您需要监控的系统或设备编写自定义 Telegraf 插件。
设置 InfluxDB 和 TICK 堆栈
我们需要做的第一件事是在本地机器上启动并运行 TICK 堆栈。如果您刚开始使用 TICK 堆栈,我建议您通读最新的 Chronograf “入门” 指南。这将使您在短时间内启动并运行 Telegraf、InfluxDB、Chronograf 和 Kapacitor。
开发者环境设置
Telegraf 目前没有附带用于监控 Twitter 推文的插件。但开源软件的伟大之处在于能够自定义和扩展它,因此让我们前往 Telegraf GitHub 仓库 并开始吧。
此仓库保存了 Telegraf 和所有默认可用插件的代码。如果您还没有这样做,现在是 fork Telegraf 仓库 到您自己的 GitHub 帐户的好时机。有关如何 fork 仓库的说明可以在 GitHub 文档 中找到。
Telegraf 完全用 Go 编写,这意味着为了为其做出贡献,您需要安装 Go 并配置您的开发环境。我使用的是 Mac,所以我 使用 Homebrew 安装了 Go,并使用 a href=”https://atom.io/packages/go-plus”>go-plus 插件为开发设置了 Atom(当然,您可以使用您最喜欢的 IDE)。
如果您使用 GitHub 用其他语言进行开发,您可能会想前往您的 fork 并将其克隆到您的机器上。由于包路径的设置方式,这在 Go 中不起作用。在 Go 中执行此操作的方法是使用 git 远程仓库。Scott Mansfield 在 在 Go 中使用 fork 上发表了一篇很好的博客文章,其中更详细地描述了这个过程。简而言之,您将运行以下命令以启动并运行您的 fork。 go get github.com/influxdata/telegraf cd $GOPATH/src/github.com/influxdata/telegraf git remote rename origin upstream git remote add origin https://github.com/[YOUR-GITHUB-ID]/telegraf
有关从头开始构建 Telegraf 的说明可以在 主 README 文件 中找到,但它归结为只需在 telegraf 目录中运行 make
。在进行更改之前,请确保您可以成功构建 Telegraf。如果您遇到任何问题,快速 Google 搜索通常可以解决大多数问题。如果不能,请随时在 InfluxData 社区 论坛上寻求帮助。
将输入插件包添加到 Telegraf
向 Telegraf 添加输入插件需要几个步骤,然后才能编写实际代码。首先,我们需要在 plugins
文件夹中,在我们正在构建的特定插件类型下添加一个目录。由于我们正在构建输入插件,让我们在 plugins/inputs
目录中添加一个名为 twitter
的文件夹,然后在其中创建一个名为 twitter.go
的文件。这是我们将为插件添加代码的文件。
接下来,我们需要在 plugins/inputs/all/all.go
文件中添加一行,以便 Telegraf 知道它需要加载我们的 twitter 包的代码。将行 _ "github.com/influxdata/telegraf/plugins/inputs/twitter"
添加到文件的导入部分,并按字母顺序排列。
输入插件和服务输入插件
现在我们准备开始编写插件的代码了。Telegraf 中有两种类型的输入插件,它们以 Go 接口的形式实现。标准输入插件用于以规则的时间间隔轮询特定源。如果我们想每分钟搜索 Twitter 上的特定关键字,这可能是一个不错的选择。第二种类型是服务输入插件,它启动一个长时间运行的进程来监听发送给它的数据。由于我们对与我们的加密货币相关的任何新推文感兴趣,并且我们想使用 Twitter Streaming API,因此此选项对我们来说更有意义。
项目根目录中的 input.go
文件为我们提供了服务输入插件的 Go 接口的定义。与标准输入插件的唯一区别是添加了两个函数:Start
和 Stop
。
Telegraf 仓库中的 CONTRIBUTING 文件 为我们提供了输入插件的基本结构,因此让我们将其复制到我们的 twitter.go
文件中并开始吧。我们将随着进行添加额外的 Start
和 Stop
函数。
导入正确的 Go 库
快速搜索 Twitter API 的 Go 库,我们得到了三个不同的库可以查看。我选择了 Anaconda,因为它看起来是最受欢迎的仓库。让我们浏览一下我们的服务输入插件的每个部分并进行一些更改。
文件中的第一行告诉 Go 此源代码的包是什么。这需要与我们稍早在 all.go
中添加的导入行匹配,因此让我们将其称为 twitter
包。
接下来是 import
部分,我们可以在其中告诉 Go 引入我们稍后将使用的 Anaconda 库。我们的插件已经依赖于两个 Telegraf 库,但我们也需要添加 Anaconda 库。此外,还需要名为 context
、net/url
、strings
和 sync
的通用 Go 库。Go 中的通用库通常具有简短或缩写的名称。当您开始编码时,您的 IDE 很可能会将这些库引入。
插件配置和 Twitter API 密钥
config struct
是下一个,它将是保存我们插件的所有配置选项的对象。根据 Anaconda README 文件,我们需要四个 ID 才能使用 Twitter API 进行身份验证:Consumer Key、Consumer Secret、Access Token 和 Access Token Secret。当您 创建新应用 时,您将从 Twitter 获得这些值。请注意,这些名称以大写字母开头,表示这些值可以从另一个包设置。这是必需的,以便 Telegraf 可以根据用户的 telegraf.conf
文件管理填充这些值。
我们还需要用户感兴趣跟踪的关键字列表。我只是使用了一个简单的逗号分隔列表,稍后可以将其拆分为数组。
最后,您会注意到我们配置中一些较短的、小写的变量。这些内部变量用于在我们包中的不同函数之间传递信息。名称是小写的,因为它们应该只由我们在我们的包代码中设置。我们正在存储对 TwitterApi 库的引用,然后是其他三个变量,用于管理我们稍后将讨论的长时间运行的 goroutine。
接下来的两个函数 Description
和 SampleConfig
是帮助函数,用于使用 Telegraf 文档 中看到的 config
选项自动生成插件的配置。我们可以通过返回简单的字符串来填写这些函数,这些字符串描述了插件的功能以及包含用户可以为此插件设置的所有选项的空配置块。可以使用 #
注释掉选项,以便用户了解这些选项,并在需要时取消注释它们。
Gather、Start 和 Stop
最后,我们开始接触插件的真正核心。对于普通输入插件,Telegraf 每间隔调用一次 Gather
函数,该函数可用于轮询指标并通过 Accumulator 将其发送到输出插件。由于我们正在编写服务输入插件,因此我们将重点关注 Start
和 Stop
函数,而不是 Gather
函数。我们可以让我们的 Gather
函数返回 nil
。
Start
函数不是我们原始模板的一部分,但我们可以添加它以及 Stop
函数,以将此插件转换为服务输入插件。Start
函数的签名与 Gather
函数相同。在 Start
函数中,我们将设置我们的 Anaconda 库以验证 Twitter,然后为 fetchTweets
函数触发单独的 goroutine,因为它将持续运行并监听新推文的到达。由于 fetchTweets
函数旨在持续运行,我们需要设置一些工具来帮助我们在退出 Telegraf 时终止它。context
对象 可用于在用户停止 Telegraf 时向我们的 goroutine 发出停止信号。结合 WaitGroup,这有助于我们的插件在关闭期间优雅地退出。然后,Stop
函数可以简单地使用 cancel
函数来通知 goroutine 是时候退出了,然后等待它优雅地关闭。Go 并发模型超出了本文的范围,但如果您要编写自己的服务输入插件,那么您需要熟悉它。
从 Streaming API 获取新推文
我找到了一篇博客文章,其中包含有关如何使用 Anaconda 读取流式推文的示例。我们代码的主循环基于该文章以及来自其他插件的一些逻辑,用于将字段和标签发送到累加器。我不是 Go 专家,因此可能还有更优化的代码编写方式。
首先,我们用逗号分隔关键字列表,以便稍后我们可以在推文中搜索每个关键字。接下来,我们设置 Twitter Streaming API 所需的 URL 参数。在这种情况下,我们只是在查找关键字列表,但您可以利用任何 可用的 API 选项。
最后,我们连接到流式端点并持续循环,直到我们获得一条新推文。此处使用 select
子句,以便我们可以捕获任何关闭和优雅退出的尝试。默认行为是循环,直到我们从流中获得与推文匹配的事件。然后,我们可以为测量设置我们的字段和标签,检查推文中的关键字,并将其他一些推文属性添加到我们的点中。这将仅插入在推文中找到的最后一个关键字。如果需要,可以轻松修改此代码以更改该行为。
最后但并非最不重要的一点是,我们需要插件的 init
函数。这是 Telegraf 将调用以创建配置对象的函数。
如您所见,此代码需要进行更多的错误检查,但目前它可以完成任务。您可以在此 Gist 中找到此插件完整代码的副本。
构建和运行我们的新插件
完成编码后,让我们构建我们的 Telegraf 版本并更新我们的配置。构建说明可以在 contributing.md
文件中找到,但截至撰写本文时,您只需在目录中运行 make
即可。
要创建配置文件以测试您的插件,您可以利用我们之前添加的 SampleConfig
函数,并使用 sample-config
标志自动创建配置文件。
>telegraf -sample-config -input-filter twitter -output-filter influxdb > telegraf.conf
打开新创建的 telegraf.conf
文件,添加您的 Twitter 应用程序凭据,然后添加您要跟踪的关键字列表。我使用了 bitcoin,$btc,ethereum,$eth
,但您可以添加任何其他您想要的加密货币。最后,使用以下命令启动 Telegraf。
>./telegraf --config telegraf.conf
如果一切顺利,您现在应该正在监听与您的关键字匹配的新推文,并将该信息发送到默认 telegraf
数据库中的 InfluxDB。
在 Chronograf 中探索结果
验证一切是否正常工作的最简单方法是跳转到 Chronograf 并检查您的数据是否在那里。单击数据浏览器,并在 telegraf.autogen 数据库中找到 tweets 测量。一旦您开始看到数据,您就可以开始创建一些仪表板。
结论
这只是启动并运行服务输入 Telegraf 插件所需的最低限度。我们仍然需要添加单元测试,验证我们的标签和值是否合适,并根据仓库中 示例 整理一个漂亮的 README 文件。
希望这为您构建自己的输入插件提供了一个良好的起点。趁您还在做这件事,为什么不成为一名开源开发者并将您的插件与世界分享呢?
您可以在此视频中了解有关用例的更多信息