入门教程:将数据流式传输到 InfluxDB

导航至

这是 InfluxDB v2 入门教程的第二部分。如果您是 InfluxDB v2 的初学者,我建议您首先在 本入门系列的第一部分 中了解将批量静态数据写入 InfluxDB v2 的不同方法。这是一个关于如何以及何时使用 Telegraf 和 Exec 插件 将实时数据写入 InfluxDB v2 的入门教程。

  1. Telegraf 和 Exec 插件
  2. Telegraf 和 Tail 插件

本教程的代码库位于 此处。对于本教程,我使用了 Alpha Vantage 股票和加密货币 API 来获取日内时间序列数据。如果您想了解更多关于如何处理时间序列数据的一般信息,您可以查看有关市场数据 API 的这篇文章 在您 申请密钥 后,Alpha Vantage 提供了 5 分钟分辨率的实时日内 BTC 价格和成交量数据。以下是一个输出数据的示例

1. Telegraf 和 Exec 插件

使用 Telegraf 与 Exec 输入插件 允许用户在设定的时间间隔执行命令以检索指标并将它们写入 InfluxDB。首先,我向 Alpha Vantage 发出请求,并使用 data_requests() 函数将最后的数据点转换为行协议,这是 InfluxDB 的数据摄取格式,具有纳秒级精度。我只对收集最后的数据点感兴趣,因为我不想每 5 分钟就重写 24 小时的数据。

#exec.py
import pandas as pd
import requests
from alphavantage_auth import key
import datetime
import time

#Using Alpha Vantage to get BTC prices every 5 make_lines
#Get your key here: https://www.alphavantage.co/support/#api-key
apikey = key
url = "https://www.alphavantage.co/query?"
function = "DIGITAL_CURRENCY_INTRADAY"
symbol = "BTC"
market = "USD"
#build target url
target_url = url + "function=" + function + "&symbol=" + symbol + "&market=" + market + "&apikey=" + apikey
#make request
def data_request():
    data = requests.get(target_url).json()
    #data is returned in the following format: https://www.alphavantage.co/query?function=DIGITAL_CURRENCY_INTRADAY&symbol=BTC&market=EUR&apikey=demo
    #we only want the last datapoint
    t = [t for t in data['Time Series (Digital Currency Intraday)']]
    t = t[0]
    #convert human readable time to unix time
    t = datetime.datetime.strptime(t, "%Y-%m-%d %H:%M:%S")
    unix = int(t.strftime("%s"))
    #convert timestamp to nanosecond precision
    unix_ns = str(unix) + "000000000"
    fields = [v for k, v in data['Time Series (Digital Currency Intraday)'].items()]
    #convert to line protocol
    line = str("price"
         + ",type=BTC"
         + " "
         + "price=" + str(fields[0]['1a. price (USD)']) + ","
         + "volume=" + str(fields[0]['2. volume'])
         + " " + unix_ns)
    # print("data gathered and converted")
    return(line)

现在我们有一个返回行协议数据的 Python 脚本,创建一个 Telegraf 配置,使用 Exec 输入插件和 InfluxDB v2 输出插件。请按照文档在 使用 UI 手动配置 Telegraf。确保您在插件中添加以下更改:

  • 将数据收集间隔设置为 300 秒(5 分钟),因为 Alpha Vantage 只提供 5 分钟分辨率的数据:interval = "300s"
  • 指定时间戳精度:precision = "ns"
  • 省略 "host" 标签:omit_hostname = true
  • 指定您的InfluxDB实例:urls = ["https://us-west-2-1.aws.cloud2.influxdata.com"] # 必需
  • 指定您的bucket、token和组织
  • 指定exec插件要执行的命令(第228行):commands = ["python /Users/anaisdotis-georgiou/Desktop/GettingStarted_StreamingData/exec.py" ]
  • 因为我只有一个命令,所以我可以取消注释名称后缀:#name_suffix = "_mycollector"
  • 指定要消费的数据格式。由于数据已被转换为行协议,因此选择了influx:data_format = "influx"

2. Telegraf和Tail插件

Tail输入插件会跟踪日志文件并解析指标,然后Telegraf将数据写入InfluxDB。Tail.py与exec.py类似,但是每个新点首先追加到tail.txt而不是直接写入InfluxDB。此外,运行延迟在tail.py中通过time.sleep() 定义,而不是像Exec插件那样在配置中指定执行间隔。

import pandas as pd
import requests
import time
from alphavantage_auth import key
import datetime

#Using Alphavantage to get BTC prices every 5 make_lines
#Get your key here: https://www.alphavantage.co/support/#api-key
apikey = key
url = "https://www.alphavantage.co/query?"
function = "DIGITAL_CURRENCY_INTRADAY"
symbol = "BTC"
market = "USD"
#build target url
target_url = url + "function=" + function + "&symbol=" + symbol + "&market=" + market + "&apikey=" + apikey
while True:
    #make request
    data = requests.get(target_url).json()
    #data is returned in the following format: https://www.alphavantage.co/query?function=DIGITAL_CURRENCY_INTRADAY&symbol=BTC&market=EUR&apikey=demo
    #we only want the last datapoint
    t = [t for t in data['Time Series (Digital Currency Intraday)']]
    t = t[0]
    t = datetime.datetime.strptime(t, "%Y-%m-%d %H:%M:%S")
    unix = int(t.strftime("%s"))
    #convert to nanosecond precision
    unix_ns = str(unix) + "000000000"
    fields = [v for k, v in data['Time Series (Digital Currency Intraday)'].items()]
    #convert to line protocol
    line = ["price"
         + ",type=BTC"
         + " "
         + "price=" + str(fields[0]['1a. price (USD)']) + ","
         + "volume=" + str(fields[0]['2. volume'])
         + " " + unix_ns]
    thefile = open('Data/tail.txt', 'a+')
    for item in line:
        thefile.write("%s\n" % item)
    print("line added")
    #Alphavantage only adds points every 5 min, so set script to sleep for 5 min as well
    time.sleep(300)

现在我有一个Python脚本,将行协议数据追加到txt文件中,我手动配置Telegraf运行一个包含Tail输入插件和InfluxDB_v2输出插件的配置,并进行这些更改

  • 指定数据精度(第64行)。对于此示例,我们的BTC数据已转换为ns精度,因此:precision = "ns"
  • 由于我们不是执行监控任务,我们不关心设置'host'标签。将omit_hostname = true 设置为true,以便Telegraf不设置'host'标签(第93行)。
  • 导航到OUTPUT PLUGIN部分。
  • 指定您的InfluxDB实例(第111行):urls = ["https://us-west-2-1.aws.cloud2.influxdata.com"] # 必需
  • 指定您的bucket、token和组织(第113-120行)
  • 导航到SERVICES INPUT PLUGIN部分。
  • 指定您的行协议txt文件的绝对路径(第544行)
  • 确保Tail插件仅通过设置from_beginning为false来读取新点(第546行)
    • from_beginning = false
  • 指定数据消化的方法(最后一行)
    • data_format = "influx"

我希望您开始感到更熟悉使用Telegraf和InfluxDB。如果您有任何问题,请在社区网站上发布,或者给我们@InfluxDB发推文。谢谢!