使用InfluxDB创建比特币买卖警报

导航至

本文最初发表在 The New Stack

鉴于比特币价格波动剧烈,一个自动警报系统对于保持我们的注意力和理智非常有价值。我们只有在价格动作有趣时才关注比特币。动量,即购买过去表现良好的资产,一直是效果最持久的交易策略之一——参见 Clif Asness: “Value and Momentum Everywhere”Tzouvanas (2019)。我们可以使用 InfluxDB 帮助我们计算一个简单的动量信号,并在这个信号指向“买入”或“卖出”时发出警报。

对于这个教程,我选择了一个常见的动量信号——短期/长期交叉。这个信号是当 1 天移动平均比特币价格(1 天 MA)穿过 5 天移动平均(5 天 MA)时。当 1 天 MA 从下方穿过 5 天 MA,这是一个买入信号。当它从上方穿过,这是一个卖出信号。

在步骤方面,我们首先需要使用 Telegraf 从 CoinMarketCap API 获取比特币价格并将其存储到 InfluxDB 中,然后使用 Flux(InfluxDB 的自定义语言)使用存储的比特币价格计算动量信号,然后根据动量信号值设置警报。

初始设置

  1. 登录到 InfluxDB Cloud 应用程序(或在此处创建账户 here)。
  2. 创建一个名为“crypto_px”的数据集单元(Bucket)。我们将把比特币价格存储在这个 Bucket 中。[加载数据 > Bucket > 创建 Bucket]
  3. 创建一个 Influx API 令牌。[加载数据 > API 令牌 > 生成 API 令牌]
  4. 在您的机器上安装 Telegraf。[说明]
  5. 创建一个 CoinMarketCap API 令牌。[说明]

使用 Telegraf 拉取比特币价格

Telegraf 是 InfluxDB 的代理,用于从各种不同的来源收集数据,包括 API。它通过终端在我们的收集机器(即我们的笔记本电脑)上本地运行,并将数据发送到中央 InfluxDB Bucket。

现在我们可以设置我们的 Telegraf 代理,在这种情况下,它将收集“HTTP”数据并将数据存储到我们刚刚创建的“crypto_px”Bucket 中。[转到 加载数据 > Telegraf > 创建配置]

Create Telegraf configuration

一旦打开 Telegraf HTTP 模板,它可能看起来有些令人生畏。但实际上我们只需要自定义几个部分。

首先,在 [agent] 部分,我们希望指定 Telegraf 调用 API 的间隔。在这里,间隔设置为“5m”或 5 分钟。

Configuration for Telegraf agent

接下来,我们需要更改 [[inputs.http]] 部分以指向正确的 CoinMarketCap API 地址。

Point to the right CoinMarketCap API address

输入配置说明

urls

CoinMarketCap API url 的格式为

https://pro-api.coinmarketcap.com/v1/cryptocurrency/listings/latest?CMC_PRO_API_KEY=[YOUR_CMC_API_TOKEN]&limit=10

CMC_PRO_API_KEY 必须用您的 CoinMarketCap API 令牌填充。

limit 变量是要拉取价格的加密货币资产的数量。在这里,我们拉取前 10 个加密货币的价格,这将包括比特币。

json_query

由于CoinMarketCap API返回嵌套的JSON对象,我们可以使用json_query字段来指定和查询实际存在的加密货币价格数据的JSON键。对于这个JSON对象,键称为“data”。

标签键

我们可以指定标签,类似于分类变量。在这里,将加密货币资产符号作为标签很有用,这样我们就可以过滤到特定的加密货币资产,如“BTC”。

CoinMarketCap API返回JSON,因此我们将data_format更改为“json”。

由于InfluxDB是时间序列数据库,我们必须有一个时间序列列。为此,json_time_key称为“last_updated”,其json_time_format为“RF3339Nano”(您可以在这里查找您的时间格式)。

启动Telegraf代理

一旦我们正确配置了Telegraf配置文件,我们就可以使用终端在我们的收集机器(在这种情况下,我的笔记本电脑)上启动Telegraf。

我们可以从云端或本地副本运行Telegraf配置文件。在这种情况下,我下载了Telegraf配置文件并将其保存为“coinmarketcap_api.conf”。

要运行Telegraf,请在终端中输入以下命令

telegraf --config [YOUR_CONFIG_FOLDER]/coinmarketcap_api.conf

– config指定了配置文件的位置

注意:您可以通过只运行一次来测试您的配置文件是否正确

telegraf --once --config [YOUR_CONFIG_FOLDER]/coinmarketcap_api.conf

检查我们的数据是否已存储

现在,Telegraf正在运行,我们应该以5分钟为间隔捕获比特币价格,并将这些数据存储在我们的crypto_px桶中。我们可以通过在Notebooks选项卡中可视化数据来检查。(注意,Notebooks功能,类似于Jupyter Notebooks,是一个存储和测试Flux代码的有用地方)。

我们使用Flux查询我们的crypto_px桶。价格存储在_field“quote_USD_price”中,我们过滤我们的符号标签为“BTC”以检索比特币价格。

from(bucket: "crypto_px")
|> range(start: -5d, stop: now())
|> filter(fn: (r) => r["_measurement"] == "coinmarketcap")
|> filter(fn: (r) => r["_field"] == "quote_USD_price")
|> filter(fn: (r) => r["symbol"] == "BTC")
|> yield(name: "PX_Last")

use-Flux-to-query-crypto_px-bucket

使用Flux计算动量信号

Flux语言允许我们执行许多基本时间序列转换。为了计算我们的动量信号(即1日/5日移动平均交叉),我们需要计算移动平均并计算它们之间的差异。

为了快速可视化移动平均将如何看起来,我们可以将它们添加到我们之前的查询中,即创建图表的查询。

from(bucket: "crypto_px")
|> range(start: -5d, stop: now())
|> filter(fn: (r) => r["_measurement"] == "coinmarketcap")
|> filter(fn: (r) => r["_field"] == "quote_USD_price")
|> filter(fn: (r) => r["symbol"] == "BTC")
|> yield(name: "PX_Last")

// Add Moving Average Lines
|> timedMovingAverage(every: 1h, period: 1d)
|> yield(name: "PX_MA1D")
|> timedMovingAverage(every: 1h, period: 5d)
|> yield(name: "PX_MA5D")

visualize-what-moving-averages-will-look-like

要绘制移动平均线,我们使用timedMovingAverage函数。我们计算1天的平均(period: 1d)以1小时为间隔(every: 1h)。

我们可以使用yield 函数在图表上将此计算的行命名为“PX_MA1D”。

接下来,为了将这些移动平均线作为新_field保存,需要更多的Flux代码。

首先,我们可以创建另一个名为“px_transformed”的桶来保存新的计算移动平均线。

然后,我们必须将我们的Flux计算保存为一个重复的任务。任务计算这些Flux计算,并将结果定期保存到我们的px_transformed桶中,例如每5分钟一次。

我们转到云应用程序侧边栏中的任务并安排一个新的任务,将其称为“Crypto PX移动平均计算”。

我们将它安排为每5分钟运行一次,有20秒的偏移(即任务每次在mm:20s开始)。

然后我们将我们的Flux转换放入任务代码面板。

// Task Options
option task = {name: "Crypto PX Moving Average Calculations", every: 5m, offset: 20s}
option v = {timeRangeStart: -30d, timeRangeStop: now()}

data1 = from(bucket: "crypto_px")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "coinmarketcap")
|> filter(fn: (r) => r["_field"] == "quote_USD_price")
|> filter(fn: (r) => r["symbol"] == "BTC")

|> pivot(
rowKey:["_time"],
columnKey: ["_field"],
valueColumn: "_value")

|> timedMovingAverage(column: "quote_USD_price", every: 5m, period: 1d)
|> rename(columns: {quote_USD_price: "PX_MA1d"})

data2 = from(bucket: "crypto_px")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "coinmarketcap")
|> filter(fn: (r) => r["_field"] == "quote_USD_price")
|> filter(fn: (r) => r["symbol"] == "BTC")

|> pivot(
rowKey:["_time"],
columnKey: ["_field"],
valueColumn: "_value")

|> timedMovingAverage(column: "quote_USD_price", every: 5m, period: 5d)
|> rename(columns: {quote_USD_price: "PX_MA5d"})

data3 = join(
tables: {t1: data1, t2: data2},
on: ["_time", "symbol", "_measurement"],
)
|> keep(columns: ["_time", "_measurement", "symbol", "PX_MA1d", "PX_MA5d"])
|> map(fn: (r) => ({ r with Diff_1d_5d: r.PX_MA1d - r.PX_MA5d }))
|> to(
bucket: "px_transformed",
fieldFn: (r) => ({"Diff_1d_5d": r.Diff_1d_5d, "PX_MA1d": r.PX_MA1d, "PX_MA5d": r.PX_MA5d}),
)

Flux代码的解释

我们必须将2个新变量保存到同一数据集中,然后取它们的差异。为此,我们需要pivot 函数,它将每个_field旋转到新列。

我们创建2个新的移动平均字段,分别称为“PX_MA1d”和“PX_MA5d”,并通过旋转函数将它们转换为列。

然后我们使用join函数将这两个列合并到一个主表中。

然后我们可以使用  map 函数对我们的数据集中的 2 列进行操作。在这里,我们创建了一个名为“Diff_1d_5d”的新列,通过减去“PX_MA1d”和“PX_MA5d”。

最后, to 函数将输出保存到指定的桶中。

以下是作为表格显示的结果新 px_transformed

resulting new px_transformed as a table

为了验证任务是否正确运行,我们可以绘制我们的动量信号(即 _field “Diff_1d_5d”)。

from(bucket: "px_transformed") 
|> range(start: -7d, stop: now())
|> filter(fn: (r) => r["_measurement"] == "coinmarketcap")
|> filter(fn: (r) => r["_field"] == "Diff_1d_5d")
|> filter(fn: (r) => r["symbol"] == "BTC")

Graph-Variable-Diff_1d_5d

当 1-D / 5-D 交叉为零时设置警报

我们的任务现在已创建 1-D / 5-D 移动平均信号,作为 _field “Diff_1d_5d”,并保存在 px_transformed 桶中。最后一步是当这个比特币信号为零时提醒我们,这将是一个买入或卖出的信号。

警报基本上是一个专业的任务。Notebooks 提供了 Flux 的警报模板,帮助我们设置警报。

但是首先,我们需要 设置我们的通知渠道。在免费账户中,唯一的选择是使用 Slack 的 webhook 功能来通知 Slack 通道。(您可以升级以将警报发送到其他渠道,例如电子邮件)。

以下是设置步骤 [由 Slack 提供的教程在这里 这里]

  1. 创建一个 Slack 应用。我称之为“Crypto Alerts”
  2. 在应用中激活入站 webhook
  3. 点击“添加新 webhook 到工作区”以生成 webhook url

webhook url 应该看起来像这样

https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX

现在我们将创建一个警报(如前所述,一个专业的任务),以将警报消息发布到我们刚刚创建的通知端点(即我们的 Slack 通道)。

回到我们之前一直在工作的笔记本中,我们添加了一些针对警报的修改

import "interpolate"

from(bucket: "px_transformed")
|> range(start: -15m, stop: now())
|> filter(fn: (r) => r["_measurement"] == "coinmarketcap")
|> filter(fn: (r) => r["_field"] == "Diff_1d_5d")
|> interpolate.linear(every: 30s)

第一个修改是将查询的时间范围更改为拉取最后 15 分钟的数据,以捕获仅最新的数据点。然后我们对数据进行插值,以平滑数据跳跃。

现在我们可以在我们刚刚构建的查询中添加一个新的警报面板。

添加另一个面板 > 警报

Add Another Panel > Alert

这打开了一个警报模板

Alert template

我们将把这个警报任务设置为检查 _field “Diff_1d_5d”是否在 -1 到 1 之间;即,当它接近 0。我们正在检查 1 日移动平均是否穿过 5 日移动平均。[注意:由于“Diff_1d_5d”很少恰好等于 0,因此检查范围更稳健。]

警报将每 5 分钟检查一次,偏移量为 50 秒(在我们的加密 PX 移动平均计算任务运行后)。

然后输入您生成的 Slack webhook url,以及 webhook 指向的 Slack 通道名称。您可能需要为这个创建一个新的 Slack 通道。我称之为“BTC_alert_channel”。

您可以通过点击“测试端点”来测试这个通道。

您可以修改消息格式。如下所示

"${r.symbol} 在 ${time(v: r._source_timestamp)} 触发买入/卖出信号!"

最后,导出警报。

现在,当是时候买入(或卖出)比特币时,我们应该已经设置了在 Slack 上自动警报!

Slack 警报消息

Slack Alert Message

其他想法

当然,这只是开始。将当前设置扩展到更多货币,如以太坊或索拉纳,非常简单。还可以使用Flux为比特币构建更复杂的交易规则,例如RSI(相对强弱指数)、波动率调整的动量(除以价格标准差)、均值回归指标(在价格低时买入)或许多其他时间序列预测方法。最后,我们可以通过使用其他API(例如雅虎财经)拉取股票价格来设置股票警报。