开始使用:将数据流式传输到 InfluxDB

导航至

这是 InfluxDB v2 入门教程的第二部分。如果您是 InfluxDB v2 的新手,我建议首先学习关于将静态数据批量写入 InfluxDB v2 的不同方法,请参阅本入门系列教程的第一部分。这是一个关于如何以及何时使用将实时数据写入 InfluxDB v2 的初学者教程

  1. Telegraf 和 Exec 插件
  2. Telegraf 和 Tail 插件

本教程的 repo 在此处。在本教程中,我使用了 Alpha Vantage 股票和加密货币 API 来获取日内时间序列数据。如果您想了解更多关于如何使用时间序列数据的信息,您可以查看关于市场数据 API 的这篇文章。在您声明您的密钥后,Alpha Vantage 提供 5 分钟分辨率的实时日内 BTC 价格和交易量数据。这里是输出数据的一个示例。

1. Telegraf 和 Exec 插件

将 Telegraf 与 Exec 输入插件 结合使用,用户可以按设定的时间间隔执行命令,以检索指标并将它们写入 InfluxDB。首先,我向 Alpha Vantage 发出请求,并将最后一个数据点转换为行协议,这是 InfluxDB 的数据摄取格式,并使用函数 data_requests() 实现了纳秒级精度。我只对收集最后一个数据点感兴趣,因为我不想每 5 分钟重写 24 小时的数据。

#exec.py
import pandas as pd
import requests
from alphavantage_auth import key
import datetime
import time

#Using Alpha Vantage to get BTC prices every 5 make_lines
#Get your key here: https://www.alphavantage.co/support/#api-key
apikey = key
url = "https://www.alphavantage.co/query?"
function = "DIGITAL_CURRENCY_INTRADAY"
symbol = "BTC"
market = "USD"
#build target url
target_url = url + "function=" + function + "&symbol=" + symbol + "&market=" + market + "&apikey=" + apikey
#make request
def data_request():
    data = requests.get(target_url).json()
    #data is returned in the following format: https://www.alphavantage.co/query?function=DIGITAL_CURRENCY_INTRADAY&symbol=BTC&market=EUR&apikey=demo
    #we only want the last datapoint
    t = [t for t in data['Time Series (Digital Currency Intraday)']]
    t = t[0]
    #convert human readable time to unix time
    t = datetime.datetime.strptime(t, "%Y-%m-%d %H:%M:%S")
    unix = int(t.strftime("%s"))
    #convert timestamp to nanosecond precision
    unix_ns = str(unix) + "000000000"
    fields = [v for k, v in data['Time Series (Digital Currency Intraday)'].items()]
    #convert to line protocol
    line = str("price"
         + ",type=BTC"
         + " "
         + "price=" + str(fields[0]['1a. price (USD)']) + ","
         + "volume=" + str(fields[0]['2. volume'])
         + " " + unix_ns)
    # print("data gathered and converted")
    return(line)

现在我们有了一个返回行协议数据的 Python 脚本,创建一个包含 Exec 输入插件和 InfluxDB v2 输出插件 的 Telegraf 配置。请按照文档使用 UI 手动配置 Telegraf。确保您向插件添加以下更改

  • 将数据收集间隔设置为 300 秒(5 分钟),因为 Alpha Vantage 仅提供 5 分钟分辨率的数据: interval = "300s"
  • 指定时间戳精度: precision = "ns"
  • 省略 “host” 标签: omit_hostname = true
  • 指定您的 InfluxDB 实例: urls = ["https://us-west-2-1.aws.cloud2.influxdata.com"] # 必需
  • 指定您的存储桶、令牌和组织
  • 指定 exec 插件要执行的命令(第 228 行): commands = ["python /Users/anaisdotis-georgiou/Desktop/GettingStarted_StreamingData/exec.py" ]
  • 因为我只有一个命令,所以我可以注释掉名称后缀: #name_suffix = "_mycollector"
  • 指定要使用的数据格式。由于数据已转换为行协议,因此选择 influx: data_format = "influx"

2. Telegraf 和 Tail 插件

Tail 输入插件 跟踪日志文件并解析指标,Telegraf 将数据写入 InfluxDB。Tail.py 与 exec.py 类似,只是每个新点都先附加到 tail.txt,而不是直接写入 InfluxDB。此外,运行延迟在 tail.py 中使用 time.sleep() 定义,而不是像 Exec 插件那样由配置中的执行间隔指定。

import pandas as pd
import requests
import time
from alphavantage_auth import key
import datetime

#Using Alphavantage to get BTC prices every 5 make_lines
#Get your key here: https://www.alphavantage.co/support/#api-key
apikey = key
url = "https://www.alphavantage.co/query?"
function = "DIGITAL_CURRENCY_INTRADAY"
symbol = "BTC"
market = "USD"
#build target url
target_url = url + "function=" + function + "&symbol=" + symbol + "&market=" + market + "&apikey=" + apikey
while True:
    #make request
    data = requests.get(target_url).json()
    #data is returned in the following format: https://www.alphavantage.co/query?function=DIGITAL_CURRENCY_INTRADAY&symbol=BTC&market=EUR&apikey=demo
    #we only want the last datapoint
    t = [t for t in data['Time Series (Digital Currency Intraday)']]
    t = t[0]
    t = datetime.datetime.strptime(t, "%Y-%m-%d %H:%M:%S")
    unix = int(t.strftime("%s"))
    #convert to nanosecond precision
    unix_ns = str(unix) + "000000000"
    fields = [v for k, v in data['Time Series (Digital Currency Intraday)'].items()]
    #convert to line protocol
    line = ["price"
         + ",type=BTC"
         + " "
         + "price=" + str(fields[0]['1a. price (USD)']) + ","
         + "volume=" + str(fields[0]['2. volume'])
         + " " + unix_ns]
    thefile = open('Data/tail.txt', 'a+')
    for item in line:
        thefile.write("%s\n" % item)
    print("line added")
    #Alphavantage only adds points every 5 min, so set script to sleep for 5 min as well
    time.sleep(300)

现在我有一个 Python 脚本将行协议数据附加到 txt 文件,我手动配置 Telegraf 以运行包含 Tail 输入插件和 InfluxDB_v2 输出插件的配置,并进行以下更改

  • 指定数据的精度(第 64 行)。对于此示例,我们的 BTC 数据已转换为纳秒精度,因此: precision = "ns"
  • 由于我们没有执行监视任务,因此我们不关心设置 “host” 标签。设置 omit_hostname = true 以便 Telegraf 不设置 “host” 标签(第 93 行)。
  • 导航到 OUTPUT PLUGIN 部分。
  • 指定您的 InfluxDB 实例(第 111 行): urls = ["https://us-west-2-1.aws.cloud2.influxdata.com"] # 必需
  • 指定您的存储桶、令牌和组织(第 113-120 行)
  • 导航到 SERVICES INPUT PLUGIN 部分。
  • 指定您的行协议 txt 文件的绝对路径(第 544 行)
  • 确保 Tail 插件仅通过将 from_beginning 设置为 false 来读取新点(第 546 行)
    • from_beginning = false
  • 指定数据摘要方法(最后一行)
    • data_format = "influx"

我希望您开始对使用 Telegraf 和 InfluxDB 感到更舒适。如果您有任何问题,请在社区站点上发布,或在 Twitter 上 @InfluxDB 上发推文给我们。谢谢!