开始使用:将数据流式传输到 InfluxDB
作者:Anais Dotis-Georgiou / 产品, 用例, 开发者, 入门指南
2020 年 8 月 9 日
导航至
这是 InfluxDB v2 入门教程的第二部分。如果您是 InfluxDB v2 的新手,我建议首先学习关于将静态数据批量写入 InfluxDB v2 的不同方法,请参阅本入门系列教程的第一部分。这是一个关于如何以及何时使用将实时数据写入 InfluxDB v2 的初学者教程
本教程的 repo 在此处。在本教程中,我使用了 Alpha Vantage 股票和加密货币 API 来获取日内时间序列数据。如果您想了解更多关于如何使用时间序列数据的信息,您可以查看关于市场数据 API 的这篇文章。在您声明您的密钥后,Alpha Vantage 提供 5 分钟分辨率的实时日内 BTC 价格和交易量数据。这里是输出数据的一个示例。
1. Telegraf 和 Exec 插件
将 Telegraf 与 Exec 输入插件 结合使用,用户可以按设定的时间间隔执行命令,以检索指标并将它们写入 InfluxDB。首先,我向 Alpha Vantage 发出请求,并将最后一个数据点转换为行协议,这是 InfluxDB 的数据摄取格式,并使用函数 data_requests()
实现了纳秒级精度。我只对收集最后一个数据点感兴趣,因为我不想每 5 分钟重写 24 小时的数据。
#exec.py
import pandas as pd
import requests
from alphavantage_auth import key
import datetime
import time
#Using Alpha Vantage to get BTC prices every 5 make_lines
#Get your key here: https://www.alphavantage.co/support/#api-key
apikey = key
url = "https://www.alphavantage.co/query?"
function = "DIGITAL_CURRENCY_INTRADAY"
symbol = "BTC"
market = "USD"
#build target url
target_url = url + "function=" + function + "&symbol=" + symbol + "&market=" + market + "&apikey=" + apikey
#make request
def data_request():
data = requests.get(target_url).json()
#data is returned in the following format: https://www.alphavantage.co/query?function=DIGITAL_CURRENCY_INTRADAY&symbol=BTC&market=EUR&apikey=demo
#we only want the last datapoint
t = [t for t in data['Time Series (Digital Currency Intraday)']]
t = t[0]
#convert human readable time to unix time
t = datetime.datetime.strptime(t, "%Y-%m-%d %H:%M:%S")
unix = int(t.strftime("%s"))
#convert timestamp to nanosecond precision
unix_ns = str(unix) + "000000000"
fields = [v for k, v in data['Time Series (Digital Currency Intraday)'].items()]
#convert to line protocol
line = str("price"
+ ",type=BTC"
+ " "
+ "price=" + str(fields[0]['1a. price (USD)']) + ","
+ "volume=" + str(fields[0]['2. volume'])
+ " " + unix_ns)
# print("data gathered and converted")
return(line)
现在我们有了一个返回行协议数据的 Python 脚本,创建一个包含 Exec 输入插件和 InfluxDB v2 输出插件 的 Telegraf 配置。请按照文档使用 UI 手动配置 Telegraf。确保您向插件添加以下更改
- 将数据收集间隔设置为 300 秒(5 分钟),因为 Alpha Vantage 仅提供 5 分钟分辨率的数据:
interval = "300s"
- 指定时间戳精度:
precision = "ns"
- 省略 “host” 标签:
omit_hostname = true
- 指定您的 InfluxDB 实例:
urls = ["https://us-west-2-1.aws.cloud2.influxdata.com"] # 必需
- 指定您的存储桶、令牌和组织
- 指定 exec 插件要执行的命令(第 228 行):
commands = ["python /Users/anaisdotis-georgiou/Desktop/GettingStarted_StreamingData/exec.py" ]
- 因为我只有一个命令,所以我可以注释掉名称后缀:
#name_suffix = "_mycollector"
- 指定要使用的数据格式。由于数据已转换为行协议,因此选择 influx:
data_format = "influx"
2. Telegraf 和 Tail 插件
Tail 输入插件 跟踪日志文件并解析指标,Telegraf 将数据写入 InfluxDB。Tail.py 与 exec.py 类似,只是每个新点都先附加到 tail.txt,而不是直接写入 InfluxDB。此外,运行延迟在 tail.py 中使用 time.sleep()
定义,而不是像 Exec 插件那样由配置中的执行间隔指定。
import pandas as pd
import requests
import time
from alphavantage_auth import key
import datetime
#Using Alphavantage to get BTC prices every 5 make_lines
#Get your key here: https://www.alphavantage.co/support/#api-key
apikey = key
url = "https://www.alphavantage.co/query?"
function = "DIGITAL_CURRENCY_INTRADAY"
symbol = "BTC"
market = "USD"
#build target url
target_url = url + "function=" + function + "&symbol=" + symbol + "&market=" + market + "&apikey=" + apikey
while True:
#make request
data = requests.get(target_url).json()
#data is returned in the following format: https://www.alphavantage.co/query?function=DIGITAL_CURRENCY_INTRADAY&symbol=BTC&market=EUR&apikey=demo
#we only want the last datapoint
t = [t for t in data['Time Series (Digital Currency Intraday)']]
t = t[0]
t = datetime.datetime.strptime(t, "%Y-%m-%d %H:%M:%S")
unix = int(t.strftime("%s"))
#convert to nanosecond precision
unix_ns = str(unix) + "000000000"
fields = [v for k, v in data['Time Series (Digital Currency Intraday)'].items()]
#convert to line protocol
line = ["price"
+ ",type=BTC"
+ " "
+ "price=" + str(fields[0]['1a. price (USD)']) + ","
+ "volume=" + str(fields[0]['2. volume'])
+ " " + unix_ns]
thefile = open('Data/tail.txt', 'a+')
for item in line:
thefile.write("%s\n" % item)
print("line added")
#Alphavantage only adds points every 5 min, so set script to sleep for 5 min as well
time.sleep(300)
现在我有一个 Python 脚本将行协议数据附加到 txt 文件,我手动配置 Telegraf 以运行包含 Tail 输入插件和 InfluxDB_v2 输出插件的配置,并进行以下更改
- 指定数据的精度(第 64 行)。对于此示例,我们的 BTC 数据已转换为纳秒精度,因此:
precision = "ns"
- 由于我们没有执行监视任务,因此我们不关心设置 “host” 标签。设置
omit_hostname = true
以便 Telegraf 不设置 “host” 标签(第 93 行)。 - 导航到 OUTPUT PLUGIN 部分。
- 指定您的 InfluxDB 实例(第 111 行):
urls = ["https://us-west-2-1.aws.cloud2.influxdata.com"] # 必需
- 指定您的存储桶、令牌和组织(第 113-120 行)
- 导航到 SERVICES INPUT PLUGIN 部分。
- 指定您的行协议 txt 文件的绝对路径(第 544 行)
- 确保 Tail 插件仅通过将 from_beginning 设置为 false 来读取新点(第 546 行)
from_beginning = false
- 指定数据摘要方法(最后一行)
data_format = "influx"
我希望您开始对使用 Telegraf 和 InfluxDB 感到更舒适。如果您有任何问题,请在社区站点上发布,或在 Twitter 上 @InfluxDB 上发推文给我们。谢谢!