入门教程:将数据流式传输到 InfluxDB
作者:Anais Dotis-Georgiou / 产品,用例,开发者,入门
2020 年 8 月 9 日
导航至
这是 InfluxDB v2 入门教程的第二部分。如果您是 InfluxDB v2 的初学者,我建议您首先在 本入门系列的第一部分 中了解将批量静态数据写入 InfluxDB v2 的不同方法。这是一个关于如何以及何时使用 Telegraf 和 Exec 插件 将实时数据写入 InfluxDB v2 的入门教程。
本教程的代码库位于 此处。对于本教程,我使用了 Alpha Vantage 股票和加密货币 API 来获取日内时间序列数据。如果您想了解更多关于如何处理时间序列数据的一般信息,您可以查看有关市场数据 API 的这篇文章 。 在您 申请密钥 后,Alpha Vantage 提供了 5 分钟分辨率的实时日内 BTC 价格和成交量数据。以下是一个输出数据的示例 。
1. Telegraf 和 Exec 插件
使用 Telegraf 与 Exec 输入插件 允许用户在设定的时间间隔执行命令以检索指标并将它们写入 InfluxDB。首先,我向 Alpha Vantage 发出请求,并使用 data_requests()
函数将最后的数据点转换为行协议,这是 InfluxDB 的数据摄取格式,具有纳秒级精度。我只对收集最后的数据点感兴趣,因为我不想每 5 分钟就重写 24 小时的数据。
#exec.py
import pandas as pd
import requests
from alphavantage_auth import key
import datetime
import time
#Using Alpha Vantage to get BTC prices every 5 make_lines
#Get your key here: https://www.alphavantage.co/support/#api-key
apikey = key
url = "https://www.alphavantage.co/query?"
function = "DIGITAL_CURRENCY_INTRADAY"
symbol = "BTC"
market = "USD"
#build target url
target_url = url + "function=" + function + "&symbol=" + symbol + "&market=" + market + "&apikey=" + apikey
#make request
def data_request():
data = requests.get(target_url).json()
#data is returned in the following format: https://www.alphavantage.co/query?function=DIGITAL_CURRENCY_INTRADAY&symbol=BTC&market=EUR&apikey=demo
#we only want the last datapoint
t = [t for t in data['Time Series (Digital Currency Intraday)']]
t = t[0]
#convert human readable time to unix time
t = datetime.datetime.strptime(t, "%Y-%m-%d %H:%M:%S")
unix = int(t.strftime("%s"))
#convert timestamp to nanosecond precision
unix_ns = str(unix) + "000000000"
fields = [v for k, v in data['Time Series (Digital Currency Intraday)'].items()]
#convert to line protocol
line = str("price"
+ ",type=BTC"
+ " "
+ "price=" + str(fields[0]['1a. price (USD)']) + ","
+ "volume=" + str(fields[0]['2. volume'])
+ " " + unix_ns)
# print("data gathered and converted")
return(line)
现在我们有一个返回行协议数据的 Python 脚本,创建一个 Telegraf 配置,使用 Exec 输入插件和 InfluxDB v2 输出插件。请按照文档在 使用 UI 手动配置 Telegraf。确保您在插件中添加以下更改:
- 将数据收集间隔设置为 300 秒(5 分钟),因为 Alpha Vantage 只提供 5 分钟分辨率的数据:
interval = "300s"
- 指定时间戳精度:
precision = "ns"
- 省略 "host" 标签:
omit_hostname = true
- 指定您的InfluxDB实例:
urls = ["https://us-west-2-1.aws.cloud2.influxdata.com"] # 必需
- 指定您的bucket、token和组织
- 指定exec插件要执行的命令(第228行):
commands = ["python /Users/anaisdotis-georgiou/Desktop/GettingStarted_StreamingData/exec.py" ]
- 因为我只有一个命令,所以我可以取消注释名称后缀:
#name_suffix = "_mycollector"
- 指定要消费的数据格式。由于数据已被转换为行协议,因此选择了influx:
data_format = "influx"
2. Telegraf和Tail插件
Tail输入插件会跟踪日志文件并解析指标,然后Telegraf将数据写入InfluxDB。Tail.py与exec.py类似,但是每个新点首先追加到tail.txt而不是直接写入InfluxDB。此外,运行延迟在tail.py中通过time.sleep()
定义,而不是像Exec插件那样在配置中指定执行间隔。
import pandas as pd
import requests
import time
from alphavantage_auth import key
import datetime
#Using Alphavantage to get BTC prices every 5 make_lines
#Get your key here: https://www.alphavantage.co/support/#api-key
apikey = key
url = "https://www.alphavantage.co/query?"
function = "DIGITAL_CURRENCY_INTRADAY"
symbol = "BTC"
market = "USD"
#build target url
target_url = url + "function=" + function + "&symbol=" + symbol + "&market=" + market + "&apikey=" + apikey
while True:
#make request
data = requests.get(target_url).json()
#data is returned in the following format: https://www.alphavantage.co/query?function=DIGITAL_CURRENCY_INTRADAY&symbol=BTC&market=EUR&apikey=demo
#we only want the last datapoint
t = [t for t in data['Time Series (Digital Currency Intraday)']]
t = t[0]
t = datetime.datetime.strptime(t, "%Y-%m-%d %H:%M:%S")
unix = int(t.strftime("%s"))
#convert to nanosecond precision
unix_ns = str(unix) + "000000000"
fields = [v for k, v in data['Time Series (Digital Currency Intraday)'].items()]
#convert to line protocol
line = ["price"
+ ",type=BTC"
+ " "
+ "price=" + str(fields[0]['1a. price (USD)']) + ","
+ "volume=" + str(fields[0]['2. volume'])
+ " " + unix_ns]
thefile = open('Data/tail.txt', 'a+')
for item in line:
thefile.write("%s\n" % item)
print("line added")
#Alphavantage only adds points every 5 min, so set script to sleep for 5 min as well
time.sleep(300)
现在我有一个Python脚本,将行协议数据追加到txt文件中,我手动配置Telegraf运行一个包含Tail输入插件和InfluxDB_v2输出插件的配置,并进行这些更改
- 指定数据精度(第64行)。对于此示例,我们的BTC数据已转换为ns精度,因此:
precision = "ns"
- 由于我们不是执行监控任务,我们不关心设置'host'标签。将
omit_hostname = true
设置为true,以便Telegraf不设置'host'标签(第93行)。 - 导航到OUTPUT PLUGIN部分。
- 指定您的InfluxDB实例(第111行):
urls = ["https://us-west-2-1.aws.cloud2.influxdata.com"] # 必需
- 指定您的bucket、token和组织(第113-120行)
- 导航到SERVICES INPUT PLUGIN部分。
- 指定您的行协议txt文件的绝对路径(第544行)
- 确保Tail插件仅通过设置from_beginning为false来读取新点(第546行)
from_beginning = false
- 指定数据消化的方法(最后一行)
data_format = "influx"
我希望您开始感到更熟悉使用Telegraf和InfluxDB。如果您有任何问题,请在社区网站上发布,或者给我们@InfluxDB发推文。谢谢!