使用 InfluxDB 创建比特币买卖提醒

导航至

本文最初发表于 The New Stack

鉴于比特币价格的波动性,自动化警报系统对于保持我们的注意力和理智非常有价值。我们可以在价格波动有趣时才关注比特币。动量,即购买过去表现良好的资产,一直是持续有效的交易策略之一——请参阅 Clif Asness:“Value and Momentum Everywhere”Tzouvanas (2019)。我们可以使用 InfluxDB 来帮助我们计算一个简单的动量信号,并在该信号指向“买入”或“卖出”时提醒我们。

在本教程中,我选择了一个常见的动量信号——短期/长期均线交叉。信号将是 1 天移动平均比特币价格(1 天均线)穿过 5 天移动平均线(5 天均线)时。当 1 天均线从下方穿过 5 天均线时,这是一个买入信号。当它从上方穿过时,这是一个卖出信号。

在步骤方面,我们首先需要使用 Telegraf 从 CoinMarketCap API 获取比特币价格并存储到 InfluxDB 中,然后使用 Flux(InfluxDB 的自定义语言)使用存储的比特币价格计算动量信号,然后根据动量信号的值设置警报。

初始设置

  1. 登录 InfluxDB Cloud 应用程序(或 在此处 创建帐户)。
  2. 创建一个名为“crypto_px”的 Bucket(数据集单元)。我们将比特币价格存储到这个 Bucket 中。[加载数据 > Buckets > 创建 Bucket]
  3. 创建一个 Influx API 令牌。[加载数据 > API 令牌 > 生成 API 令牌]
  4. 在您的机器上安装 Telegraf。[说明]
  5. 创建一个 CoinMarketCap API 令牌。[说明]

使用 Telegraf 拉取比特币价格

Telegraf 是 InfluxDB 的代理,用于从各种不同的来源(包括 API)收集数据。它通过终端在我们的收集机器(即我们的笔记本电脑)上本地运行,并将数据发送到中央 InfluxDB bucket 中。

现在我们可以设置我们的 Telegraf 代理,在本例中,它将收集“HTTP”数据并将数据存储到我们刚刚创建的“crypto_px”bucket 中。[转到 加载数据 > Telegraf > 创建配置]

Create Telegraf configuration

一旦您打开 Telegraf HTTP 模板,它看起来可能有些令人生畏。但我们实际上只需要自定义几个部分。

首先,在 [agent] 部分,我们想要指定 Telegraf 调用 API 的间隔。这里,间隔设置为“5m”或 5 分钟。

Configuration for Telegraf agent

接下来,我们需要更改 [[inputs.http]] 部分以指向正确的 CoinMarketCap API 地址。

Point to the right CoinMarketCap API address

输入配置说明

urls

CoinMarketCap API url 的格式为

https://pro-api.coinmarketcap.com/v1/cryptocurrency/listings/latest?CMC_PRO_API_KEY=[YOUR_CMC_API_TOKEN]&limit=10

CMC_PRO_API_KEY 必须用您的 CoinMarketCap API 令牌填充。

limit 变量是要拉取价格的加密资产的数量。这里,我们拉取前 10 名加密货币,其中将包括比特币。

json_query

由于 CoinMarketCap API 返回一个嵌套的 JSON 对象,我们可以使用 json_query 字段来指定和查询实际存放加密货币价格数据的 JSON 键。对于这个 JSON 对象,键名为“data”。

tag_keys

我们可以指定标签,概念上类似于分类变量。这里,将加密资产符号作为标签很有用,这样我们就可以过滤到特定的加密资产,例如“BTC”。

CoinMarketCap API 返回 JSON,因此我们将 data_format 更改为“json”。

由于 InfluxDB 是一个时间序列数据库,我们必须有一个时间序列列。为此,json_time_key 名为“last_updated”,其 json_time_format 为 “RF3339Nano”(您可以在 此处 查找您的时间格式)。

开始运行 Telegraf 代理

一旦我们正确配置了 Telegraf 配置文件,我们就可以使用终端在我们的收集机器(在本例中,我的笔记本电脑)上开始运行 Telegraf。

我们可以从云端或本地副本运行 Telegraf 配置文件。在本例中,我下载了 Telegraf 配置文件并将其保存为“coinmarketcap_api.conf”。

要运行 Telegraf,请在终端中键入以下命令

telegraf --config [YOUR_CONFIG_FOLDER]/coinmarketcap_api.conf

– config 告诉配置文件的位置

注意:您可以通过仅运行一次来测试您的配置文件是否正确

telegraf --once --config [YOUR_CONFIG_FOLDER]/coinmarketcap_api.conf

检查我们的数据是否已存储

现在 Telegraf 正在工作,我们应该以 5 分钟的间隔捕获比特币价格,并将数据存储在我们的 crypto_px bucket 中。我们可以通过可视化 Notebooks 选项卡中的数据来检查。(请注意,Notebooks 功能,类似于 Jupyter Notebooks,是存储和测试 Flux 代码的有用场所)。

我们使用 Flux 查询我们的 crypto_px bucket。价格存储在 _field “quote_USD_price” 中,我们过滤我们的符号标签为 “BTC” 以检索比特币价格。

from(bucket: "crypto_px")
|> range(start: -5d, stop: now())
|> filter(fn: (r) => r["_measurement"] == "coinmarketcap")
|> filter(fn: (r) => r["_field"] == "quote_USD_price")
|> filter(fn: (r) => r["symbol"] == "BTC")
|> yield(name: "PX_Last")

use-Flux-to-query-crypto_px-bucket

使用 Flux 计算动量信号

Flux 语言允许我们进行许多基本的 时间序列转换。要计算我们的动量信号(即 1 天/5 天移动平均线交叉),我们需要计算移动平均线,然后取它们之间的差值。

为了快速可视化移动平均线的外观,我们可以将它们添加到我们之前的查询中,即创建图形的查询。

from(bucket: "crypto_px")
|> range(start: -5d, stop: now())
|> filter(fn: (r) => r["_measurement"] == "coinmarketcap")
|> filter(fn: (r) => r["_field"] == "quote_USD_price")
|> filter(fn: (r) => r["symbol"] == "BTC")
|> yield(name: "PX_Last")

// Add Moving Average Lines
|> timedMovingAverage(every: 1h, period: 1d)
|> yield(name: "PX_MA1D")
|> timedMovingAverage(every: 1h, period: 5d)
|> yield(name: "PX_MA5D")

visualize-what-moving-averages-will-look-like

要绘制移动平均线图,我们使用 timedMovingAverage 函数。我们正在计算 1 天平均值(周期:1d),间隔为 1 小时(每:1h)。

我们可以使用 yield 函数将这条计算出的线在图表上命名为 “PX_MA1D”。

接下来,要将这些移动平均线保存 为新的 _fields 需要更多的 Flux 代码。

首先,我们可以创建另一个名为 “px_transformed” 的 bucket 来保存新计算出的移动平均线。

然后,我们必须将我们的 Flux 计算保存为周期性的 任务。任务计算这些 Flux 计算,并将结果定期(例如每 5 分钟)保存到我们的 px_transformed bucket 中。

我们转到云应用程序侧边栏中的任务,并安排一个新任务,并将其命名为 “Crypto PX Moving Average Calculations”。

我们将安排它每 5 分钟运行一次,偏移 20 秒(即任务每次在 mm:20s 开始)。

然后我们将我们的 Flux 转换放入任务代码面板。

// Task Options
option task = {name: "Crypto PX Moving Average Calculations", every: 5m, offset: 20s}
option v = {timeRangeStart: -30d, timeRangeStop: now()}

data1 = from(bucket: "crypto_px")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "coinmarketcap")
|> filter(fn: (r) => r["_field"] == "quote_USD_price")
|> filter(fn: (r) => r["symbol"] == "BTC")

|> pivot(
rowKey:["_time"],
columnKey: ["_field"],
valueColumn: "_value")

|> timedMovingAverage(column: "quote_USD_price", every: 5m, period: 1d)
|> rename(columns: {quote_USD_price: "PX_MA1d"})

data2 = from(bucket: "crypto_px")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "coinmarketcap")
|> filter(fn: (r) => r["_field"] == "quote_USD_price")
|> filter(fn: (r) => r["symbol"] == "BTC")

|> pivot(
rowKey:["_time"],
columnKey: ["_field"],
valueColumn: "_value")

|> timedMovingAverage(column: "quote_USD_price", every: 5m, period: 5d)
|> rename(columns: {quote_USD_price: "PX_MA5d"})

data3 = join(
tables: {t1: data1, t2: data2},
on: ["_time", "symbol", "_measurement"],
)
|> keep(columns: ["_time", "_measurement", "symbol", "PX_MA1d", "PX_MA5d"])
|> map(fn: (r) => ({ r with Diff_1d_5d: r.PX_MA1d - r.PX_MA5d }))
|> to(
bucket: "px_transformed",
fieldFn: (r) => ({"Diff_1d_5d": r.Diff_1d_5d, "PX_MA1d": r.PX_MA1d, "PX_MA5d": r.PX_MA5d}),
)

Flux 代码说明

我们必须将 2 个新变量保存到同一个数据集,然后取它们的差值。为此,我们需要 pivot 函数,它将每个 _field 透视为一个新列。

我们创建 2 个新的移动平均线字段,名为 “PX_MA1d” 和 “PX_MA5d”,并通过 pivot 函数将它们转换为列。

然后我们使用 join 函数将这两个列合并到一个主表中。

然后我们可以使用  map 函数对我们数据集中的 2 列执行操作。在这里,我们通过减去 “PX_MA1d” 和 “PX_MA5d” 来创建一个名为 “Diff_1d_5d” 的新列。

最后, to 函数将输出保存到指定的 bucket。

以下是生成的新的 px_transformed 表

resulting new px_transformed as a table

为了验证任务是否正确运行,我们可以绘制我们的动量信号图(即 _field “Diff_1d_5d”)。

from(bucket: "px_transformed") 
|> range(start: -7d, stop: now())
|> filter(fn: (r) => r["_measurement"] == "coinmarketcap")
|> filter(fn: (r) => r["_field"] == "Diff_1d_5d")
|> filter(fn: (r) => r["symbol"] == "BTC")

Graph-Variable-Diff_1d_5d

设置 1 天/5 天均线交叉为零时的警报

我们的任务现在已经创建了 1 天/5 天移动平均线信号作为 _field “Diff_1d_5d”,它保存在 px_transformed bucket 中。最后一步是在比特币的这个信号为零时提醒我们,这将是买入或卖出信号。

警报基本上是一个专门的任务。Notebooks 在 Flux 中有警报模板,以帮助我们设置警报。

但首先,我们需要设置我们的通知渠道。在免费帐户中,唯一的选择是使用 Slack 的 webhook 功能通知 Slack 频道。(您可以升级到警报到其他渠道,例如电子邮件)。

以下是设置步骤 [Slack 提供的教程 在此处]

  1. 创建一个 Slack App。我将其命名为 “Crypto Alerts”
  2. 在 App 上激活 Incoming Webhooks
  3. 点击 “Add New Webhook to Workspace” 以生成 webhook url

webhook url 应如下所示

https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX

现在我们将创建一个警报(如前所述,是一个专门的任务)以将警报消息发布到我们刚刚创建的通知端点(即我们的 Slack 频道)。

回到我们一直在处理的 Notebook,我们添加一些修改以进行警报

import "interpolate"

from(bucket: "px_transformed")
|> range(start: -15m, stop: now())
|> filter(fn: (r) => r["_measurement"] == "coinmarketcap")
|> filter(fn: (r) => r["_field"] == "Diff_1d_5d")
|> interpolate.linear(every: 30s)

第一个修改是将查询的时间范围更改为拉取最近 15 分钟的数据,以仅捕获最新的数据点。然后我们插值数据以平滑数据中的跳跃。

现在我们可以为我们刚刚构建的查询添加一个新的警报面板。

添加另一个面板 > 警报

Add Another Panel > Alert

这将打开警报模板

Alert template

我们将设置此警报任务以检查 _field “Diff_1d_5d” 是否在 -1 到 1 之间;即,当它接近 0 时。我们正在检查 1 天移动平均线是否穿过 5 天移动平均线。[注意:由于 “Diff_1d_5d” 很少完全等于 0,因此检查范围更稳健。]

警报将每 5 分钟检查一次,偏移 50 秒(在我们的 Crypto PX Moving Average Calculations 任务运行之后)。

然后放入您生成的 Slack webhook url,以及 webhook 指向的 Slack 频道名称。您可能需要为此创建一个新的 Slack 频道。我将其命名为 “BTC_alert_channel”。

您可以通过点击 “Test Endpoint” 来测试此频道。

您可以修改消息格式。这里是

"${r.symbol} 的买/卖信号在 ${time(v: r._source_timestamp)} 时触发!"

最后,导出警报。

现在我们应该设置好 Slack 上的自动警报,以便在是时候买入(或卖出)比特币时收到通知!

Slack 警报消息

Slack Alert Message

更多想法

当然,这仅仅是个开始。将当前设置扩展到更多币种(例如以太坊或 Solana)非常简单。也可以使用 Flux 为比特币构建更复杂的交易规则,例如 RSI(相对强度指数)、波动率调整动量(除以价格标准差)、均值回归指标(价格低时买入)或许多其他 时间序列预测方法。最后,我们可以通过使用其他 API(例如 Yahoo Finance)拉取股票价格来设置股票警报。