使用InfluxDB创建比特币买卖警报
作者:Grace Ma / 用例,产品,开发者
2022年5月25日
导航至
本文最初发表在 The New Stack。
鉴于比特币价格波动剧烈,一个自动警报系统对于保持我们的注意力和理智非常有价值。我们只有在价格动作有趣时才关注比特币。动量,即购买过去表现良好的资产,一直是效果最持久的交易策略之一——参见 Clif Asness: “Value and Momentum Everywhere” 和 Tzouvanas (2019)。我们可以使用 InfluxDB 帮助我们计算一个简单的动量信号,并在这个信号指向“买入”或“卖出”时发出警报。
对于这个教程,我选择了一个常见的动量信号——短期/长期交叉。这个信号是当 1 天移动平均比特币价格(1 天 MA)穿过 5 天移动平均(5 天 MA)时。当 1 天 MA 从下方穿过 5 天 MA,这是一个买入信号。当它从上方穿过,这是一个卖出信号。
在步骤方面,我们首先需要使用 Telegraf 从 CoinMarketCap API 获取比特币价格并将其存储到 InfluxDB 中,然后使用 Flux(InfluxDB 的自定义语言)使用存储的比特币价格计算动量信号,然后根据动量信号值设置警报。
初始设置
- 登录到 InfluxDB Cloud 应用程序(或在此处创建账户 here)。
- 创建一个名为“crypto_px”的数据集单元(Bucket)。我们将把比特币价格存储在这个 Bucket 中。[加载数据 > Bucket > 创建 Bucket]
- 创建一个 Influx API 令牌。[加载数据 > API 令牌 > 生成 API 令牌]
- 在您的机器上安装 Telegraf。[说明]
- 创建一个 CoinMarketCap API 令牌。[说明]
使用 Telegraf 拉取比特币价格
Telegraf 是 InfluxDB 的代理,用于从各种不同的来源收集数据,包括 API。它通过终端在我们的收集机器(即我们的笔记本电脑)上本地运行,并将数据发送到中央 InfluxDB Bucket。
现在我们可以设置我们的 Telegraf 代理,在这种情况下,它将收集“HTTP”数据并将数据存储到我们刚刚创建的“crypto_px”Bucket 中。[转到 加载数据 > Telegraf > 创建配置]
一旦打开 Telegraf HTTP 模板,它可能看起来有些令人生畏。但实际上我们只需要自定义几个部分。
首先,在 [agent] 部分,我们希望指定 Telegraf 调用 API 的间隔。在这里,间隔设置为“5m”或 5 分钟。
接下来,我们需要更改 [[inputs.http]] 部分以指向正确的 CoinMarketCap API 地址。
输入配置说明
urls
CoinMarketCap API url 的格式为
https://pro-api.coinmarketcap.com/v1/cryptocurrency/listings/latest?CMC_PRO_API_KEY=[YOUR_CMC_API_TOKEN]&limit=10
CMC_PRO_API_KEY 必须用您的 CoinMarketCap API 令牌填充。
limit 变量是要拉取价格的加密货币资产的数量。在这里,我们拉取前 10 个加密货币的价格,这将包括比特币。
json_query
由于CoinMarketCap API返回嵌套的JSON对象,我们可以使用json_query字段来指定和查询实际存在的加密货币价格数据的JSON键。对于这个JSON对象,键称为“data”。
标签键
我们可以指定标签,类似于分类变量。在这里,将加密货币资产符号作为标签很有用,这样我们就可以过滤到特定的加密货币资产,如“BTC”。
CoinMarketCap API返回JSON,因此我们将data_format更改为“json”。
由于InfluxDB是时间序列数据库,我们必须有一个时间序列列。为此,json_time_key称为“last_updated”,其json_time_format为“RF3339Nano”(您可以在这里查找您的时间格式)。
启动Telegraf代理
一旦我们正确配置了Telegraf配置文件,我们就可以使用终端在我们的收集机器(在这种情况下,我的笔记本电脑)上启动Telegraf。
我们可以从云端或本地副本运行Telegraf配置文件。在这种情况下,我下载了Telegraf配置文件并将其保存为“coinmarketcap_api.conf”。
要运行Telegraf,请在终端中输入以下命令
telegraf --config [YOUR_CONFIG_FOLDER]/coinmarketcap_api.conf
– config指定了配置文件的位置
注意:您可以通过只运行一次来测试您的配置文件是否正确
telegraf --once --config [YOUR_CONFIG_FOLDER]/coinmarketcap_api.conf
检查我们的数据是否已存储
现在,Telegraf正在运行,我们应该以5分钟为间隔捕获比特币价格,并将这些数据存储在我们的crypto_px桶中。我们可以通过在Notebooks选项卡中可视化数据来检查。(注意,Notebooks功能,类似于Jupyter Notebooks,是一个存储和测试Flux代码的有用地方)。
我们使用Flux查询我们的crypto_px桶。价格存储在_field“quote_USD_price”中,我们过滤我们的符号标签为“BTC”以检索比特币价格。
from(bucket: "crypto_px")
|> range(start: -5d, stop: now())
|> filter(fn: (r) => r["_measurement"] == "coinmarketcap")
|> filter(fn: (r) => r["_field"] == "quote_USD_price")
|> filter(fn: (r) => r["symbol"] == "BTC")
|> yield(name: "PX_Last")
使用Flux计算动量信号
Flux语言允许我们执行许多基本时间序列转换。为了计算我们的动量信号(即1日/5日移动平均交叉),我们需要计算移动平均并计算它们之间的差异。
为了快速可视化移动平均将如何看起来,我们可以将它们添加到我们之前的查询中,即创建图表的查询。
from(bucket: "crypto_px")
|> range(start: -5d, stop: now())
|> filter(fn: (r) => r["_measurement"] == "coinmarketcap")
|> filter(fn: (r) => r["_field"] == "quote_USD_price")
|> filter(fn: (r) => r["symbol"] == "BTC")
|> yield(name: "PX_Last")
// Add Moving Average Lines
|> timedMovingAverage(every: 1h, period: 1d)
|> yield(name: "PX_MA1D")
|> timedMovingAverage(every: 1h, period: 5d)
|> yield(name: "PX_MA5D")
要绘制移动平均线,我们使用timedMovingAverage函数。我们计算1天的平均(period: 1d)以1小时为间隔(every: 1h)。
我们可以使用yield
函数在图表上将此计算的行命名为“PX_MA1D”。
接下来,为了将这些移动平均线作为新_field保存,需要更多的Flux代码。
首先,我们可以创建另一个名为“px_transformed”的桶来保存新的计算移动平均线。
然后,我们必须将我们的Flux计算保存为一个重复的任务。任务计算这些Flux计算,并将结果定期保存到我们的px_transformed桶中,例如每5分钟一次。
我们转到云应用程序侧边栏中的任务并安排一个新的任务,将其称为“Crypto PX移动平均计算”。
我们将它安排为每5分钟运行一次,有20秒的偏移(即任务每次在mm:20s开始)。
然后我们将我们的Flux转换放入任务代码面板。
// Task Options
option task = {name: "Crypto PX Moving Average Calculations", every: 5m, offset: 20s}
option v = {timeRangeStart: -30d, timeRangeStop: now()}
data1 = from(bucket: "crypto_px")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "coinmarketcap")
|> filter(fn: (r) => r["_field"] == "quote_USD_price")
|> filter(fn: (r) => r["symbol"] == "BTC")
|> pivot(
rowKey:["_time"],
columnKey: ["_field"],
valueColumn: "_value")
|> timedMovingAverage(column: "quote_USD_price", every: 5m, period: 1d)
|> rename(columns: {quote_USD_price: "PX_MA1d"})
data2 = from(bucket: "crypto_px")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "coinmarketcap")
|> filter(fn: (r) => r["_field"] == "quote_USD_price")
|> filter(fn: (r) => r["symbol"] == "BTC")
|> pivot(
rowKey:["_time"],
columnKey: ["_field"],
valueColumn: "_value")
|> timedMovingAverage(column: "quote_USD_price", every: 5m, period: 5d)
|> rename(columns: {quote_USD_price: "PX_MA5d"})
data3 = join(
tables: {t1: data1, t2: data2},
on: ["_time", "symbol", "_measurement"],
)
|> keep(columns: ["_time", "_measurement", "symbol", "PX_MA1d", "PX_MA5d"])
|> map(fn: (r) => ({ r with Diff_1d_5d: r.PX_MA1d - r.PX_MA5d }))
|> to(
bucket: "px_transformed",
fieldFn: (r) => ({"Diff_1d_5d": r.Diff_1d_5d, "PX_MA1d": r.PX_MA1d, "PX_MA5d": r.PX_MA5d}),
)
Flux代码的解释
我们必须将2个新变量保存到同一数据集中,然后取它们的差异。为此,我们需要pivot
函数,它将每个_field旋转到新列。
我们创建2个新的移动平均字段,分别称为“PX_MA1d”和“PX_MA5d”,并通过旋转函数将它们转换为列。
然后我们使用join
函数将这两个列合并到一个主表中。
然后我们可以使用 map
函数对我们的数据集中的 2 列进行操作。在这里,我们创建了一个名为“Diff_1d_5d”的新列,通过减去“PX_MA1d”和“PX_MA5d”。
最后, to
函数将输出保存到指定的桶中。
以下是作为表格显示的结果新 px_transformed
为了验证任务是否正确运行,我们可以绘制我们的动量信号(即 _field “Diff_1d_5d”)。
from(bucket: "px_transformed")
|> range(start: -7d, stop: now())
|> filter(fn: (r) => r["_measurement"] == "coinmarketcap")
|> filter(fn: (r) => r["_field"] == "Diff_1d_5d")
|> filter(fn: (r) => r["symbol"] == "BTC")
当 1-D / 5-D 交叉为零时设置警报
我们的任务现在已创建 1-D / 5-D 移动平均信号,作为 _field “Diff_1d_5d”,并保存在 px_transformed 桶中。最后一步是当这个比特币信号为零时提醒我们,这将是一个买入或卖出的信号。
警报基本上是一个专业的任务。Notebooks 提供了 Flux 的警报模板,帮助我们设置警报。
但是首先,我们需要 设置我们的通知渠道。在免费账户中,唯一的选择是使用 Slack 的 webhook 功能来通知 Slack 通道。(您可以升级以将警报发送到其他渠道,例如电子邮件)。
以下是设置步骤 [由 Slack 提供的教程在这里 这里]
- 创建一个 Slack 应用。我称之为“Crypto Alerts”
- 在应用中激活入站 webhook
- 点击“添加新 webhook 到工作区”以生成 webhook url
webhook url 应该看起来像这样
https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX
现在我们将创建一个警报(如前所述,一个专业的任务),以将警报消息发布到我们刚刚创建的通知端点(即我们的 Slack 通道)。
回到我们之前一直在工作的笔记本中,我们添加了一些针对警报的修改
import "interpolate"
from(bucket: "px_transformed")
|> range(start: -15m, stop: now())
|> filter(fn: (r) => r["_measurement"] == "coinmarketcap")
|> filter(fn: (r) => r["_field"] == "Diff_1d_5d")
|> interpolate.linear(every: 30s)
第一个修改是将查询的时间范围更改为拉取最后 15 分钟的数据,以捕获仅最新的数据点。然后我们对数据进行插值,以平滑数据跳跃。
现在我们可以在我们刚刚构建的查询中添加一个新的警报面板。
添加另一个面板 > 警报
这打开了一个警报模板
我们将把这个警报任务设置为检查 _field “Diff_1d_5d”是否在 -1 到 1 之间;即,当它接近 0。我们正在检查 1 日移动平均是否穿过 5 日移动平均。[注意:由于“Diff_1d_5d”很少恰好等于 0,因此检查范围更稳健。]
警报将每 5 分钟检查一次,偏移量为 50 秒(在我们的加密 PX 移动平均计算任务运行后)。
然后输入您生成的 Slack webhook url,以及 webhook 指向的 Slack 通道名称。您可能需要为这个创建一个新的 Slack 通道。我称之为“BTC_alert_channel”。
您可以通过点击“测试端点”来测试这个通道。
您可以修改消息格式。如下所示
"${r.symbol} 在 ${time(v: r._source_timestamp)} 触发买入/卖出信号!"
最后,导出警报。
现在,当是时候买入(或卖出)比特币时,我们应该已经设置了在 Slack 上自动警报!
Slack 警报消息
其他想法
当然,这只是开始。将当前设置扩展到更多货币,如以太坊或索拉纳,非常简单。还可以使用Flux为比特币构建更复杂的交易规则,例如RSI(相对强弱指数)、波动率调整的动量(除以价格标准差)、均值回归指标(在价格低时买入)或许多其他时间序列预测方法。最后,我们可以通过使用其他API(例如雅虎财经)拉取股票价格来设置股票警报。