从Python和InfluxDB开始
由社区 / 产品,用例,开发者,入门
2022年3月14日
导航至
本文由Pravin Rahul Banerjee撰写,最初发布于The New Stack。向下滚动查看作者的图片和简介。
尽管时序数据可以存储在MySQL或PostgreSQL数据库中,但这并不特别高效。如果您想存储每分钟都会变化的(一年中超过五十万个数据点!)来自数千个不同传感器、服务器、容器或设备的数据,您不可避免地会遇到可扩展性问题。使用关系数据库查询或对此类数据进行聚合也会导致性能问题。
另一方面,时序数据库(TSDB)针对存储时序数据点进行了优化。这在以下情况下尤其有用:
- 分析股价的金融趋势。
- 销售预测。
- 监控API或Web服务的日志和指标。
- 为了安全目的监控汽车或飞机的传感器数据。
- 跟踪物联网设备(如智能电网)的电力使用情况。
- 跟踪运动员在比赛中的生命体征和表现。
InfluxDB已创建了一个开源的时序数据库,使开发者能够更容易地处理时序数据。本文将向您展示如何使用Python设置InfluxDB,并通过Yahoo Finance API获取股票数据。
您可以在这个repo中访问本教程中编写的所有代码。
为什么使用InfluxDB?
InfluxDB自带预构建的仪表板,您可以无需太多前期工作即可分析时序数据。别忘了它比Elasticsearch和Cassandra表现更好。
它有一个免费的本地开源版本,还有一个支持主要云服务(如AWS、GCP和Azure)的云版本。
使用Python设置InfluxDB
在开始之前,请确保您的计算机上已安装Python 3.6或更高版本。您还需要一个虚拟环境。本文使用venv,但您也可以使用conda、pipenv或pyenv。
最后,您需要具备一些Flux查询经验。
本指南使用influxdb-client-python模块与InfluxDB交互。该库仅支持InfluxDB 2.x和InfluxDB 1.8+,且需要Python 3.6或更高版本。
一切准备就绪?让我们开始安装和连接客户端库。
如果您在计算机上安装了Docker,可以简单地使用以下命令运行InfluxDB的Docker镜像
docker run --name influxdb -p 8086:8086 influxdb:2.1.0
如果您没有Docker,请在此处下载您操作系统对应的软件并安装它。如果您在Mac上运行InfluxDB,可以使用Homebrew来安装。
brew install influxdb
如果您正在运行Docker镜像,可以直接访问localhost 8086。但是,如果您下载了软件并安装了它,您需要在命令行中输入以下内容
influxd
您应该在localhost 8086
上看到以下屏幕
点击开始使用,这将带您到以下页面
对于本教程,请选择快速开始并在该页面上填写您的信息
您还可以稍后创建组织和存储桶,但现在是,只需为每个这些字段选择一个简单的名称。
注册后,您应该会出现在仪表板页面上。点击加载数据然后选择Python客户端库。
现在您应该看到以下屏幕
在令牌下,应该已经列出了一个令牌。但是,如果您愿意,您可以为本教程生成一个新的令牌。点击生成令牌并选择所有访问令牌,因为您将在教程中稍后更新和删除数据。
请注意,InfluxDB在此点会发出警告,但您现在可以忽略它。
现在,您需要设置一个Python虚拟环境。为教程创建一个新文件夹
mkdir influxDB-Tutorial
然后切换到新文件夹
cd influxDB-Tutorial
创建一个虚拟环境
python3 -m venv venv
激活它。
source venv/bin/activate
最后,安装InfluxDB的客户端库
pip install influxdb-client
创建一个名为__init__.py
的新文件,然后回到InfluxDB UI
选择合适的令牌和存储桶,然后复制初始化客户端
下的代码片段并将其粘贴到Python文件中。如果您更改了令牌/存储桶选择,代码片段将自动更新。
接下来,运行您的Python文件
python3 __init__.py
如果终端中没有显示错误消息,则您已成功连接到InfluxDB。
为了遵循最佳实践,您可以将凭据存储在.env
文件中。创建一个名为.env的文件并存储以下信息
TOKEN = 'YOUR TOKEN'
ORG = 'YOUR ORG NAME'
BUCKET = 'YOUR BUCKET NAME'
然后安装python-dotenv
模块来读取.env变量
pip3 install python-dotenv
最后,更新您的Python文件以从.env文件加载数据
from datetime import datetime
from dotenv import load_dotenv, main
import os
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
load_dotenv()
# You can generate a Token from the "Tokens Tab" in the UI
token = os.getenv('TOKEN')
org = os.getenv('ORG')
bucket = os.getenv('BUCKET')
client = InfluxDBClient(url="https://127.0.0.1:8086", token=token)
请注意,如果您正在使用InfluxDB云账户,则需要更改url
参数。URL将取决于您选择的云区域。云URL可以在文档中找到。
导入DateTime模块和InfluxDB库的行将在教程的稍后部分需要。将所有导入语句放在开始处是一个好习惯。但是,如果您选择的话,也可以在需要时导入。
或者,您可以将凭据存储在扩展名为.ini
或.toml
的文件中,并使用from_config_file
函数连接到InfluxDB。
使用influxdb-client-python进行CRUD操作
本文使用Python中的yfinance模块收集了一些历史股票数据。使用以下命令安装它
pip install yfinance
您可以使用以下代码片段来获取数据
import yfinance as yf
data = yf.download("MSFT", start="2021-01-01", end="2021-10-30")
print(data.to_csv())
确保将文件名参数传递给to_csv
方法;这将本地存储CSV文件,以便您稍后读取数据。
或者,您可以从GitHub仓库获取CSV文件。
接下来,创建一个类并添加CRUD操作为其方法
class InfluxClient:
def __init__(self,token,org,bucket):
self._org=org
self._bucket = bucket
self._client = InfluxDBClient(url="https://127.0.0.1:8086", token=token)
如果您使用的是InfluxDB的云实例,则需要将URL参数替换为正确的云区域。
要创建类的实例,请使用以下命令
IC = InfluxClient(token,org,bucket)
写入数据
InfluxDBClient有一个名为write_api
的方法,用于将数据写入您的数据库。以下是此方法的代码片段
from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS
def write_data(self,data,write_option=SYNCHRONOUS):
write_api = self._client.write_api(write_option)
write_api.write(self._bucket, self._org , data,write_precision='s')
InfluxDBClient支持异步和同步写入,并且您可以根据需要指定写入类型。
有关异步写入的更多信息,请参阅如何在influxdb-client中使用Asyncio。
参数data
可以以下三种方式之一进行写入,如下所示
行协议字符串
# Data Write Method 1
IC.write_data(["MSFT,stock=MSFT Open=62.79,High=63.84,Low=62.13"])
请注意,字符串必须遵循特定的格式
measurementName,tagKey=tagValue fieldKey1="fieldValue1",fieldKey2=fieldValue2 timestamp
在tagValue和第一个fieldKey之间有一个空格,在最后一个fieldValue和timeStamp之间也有一个空格。在解析时,这些空格用作分隔符;因此,您必须按照上述方式格式化它。请注意,在这种情况下,我假设第一个字段值fieldValue1
是字符串,而fieldValue2
是数字。因此,fieldValue1
应该出现在引号内。
请注意,时间戳是可选的。如果没有提供时间戳,InfluxDB将使用其主机的系统时间(UTC)。您可以在这里了解更多有关行协议的信息。
数据点结构
# Data Write Method 2
IC.write_data(
[
Point('MSFT')
.tag("stock","MSFT")
.field("Open",62.79)
.field("High",63.38)
.field("Low",62.13)
.time(int(datetime.strptime('2021-11-07','%Y-%m-%d').timestamp()))
],
)
如果您不想处理行协议字符串中的格式,则可以使用Point()类。这确保了您的数据以正确的行协议进行了适当的序列化。
字典样式
# Data Write Method 3
IC.write_data([
{
"measurement": "MSFT",
"tags": {"stock": "MSFT"},
"fields": {
"Open": 62.79,
"High": 63.38,
"Low": 62.13,
},
"time": int(datetime.strptime('2021-11-07','%Y-%m-%d').timestamp())
},
{
"measurement": "MSFT_DATE",
"tags": {"stock": "MSFT"},
"fields": {
"Open": 62.79,
"High": 63.38,
"Low": 62.13,
},
}
],write_option=ASYNCHRONOUS)
在此方法中,您正在传递两个数据点并设置写入选项为ASYNCHRONOUS
。这对于Python来说很友好,因为数据是以字典的形式传递的。
所有不同的数据写入方式都汇总在下面的代码片段中
# Data Write Method 1
IC.write_data(["MSFT_2021-11-07_Line_Protocol,stock=MSFT Open=62.79,High=63.84,Low=62.13"])
# Data Write Method 2
IC.write_data(
[
Point('MSFT_2021-11-07_Point_Class')
.tag("stock","MSFT")
.field("Open",65)
.field("High",63.38)
.field("Low",62.13)
.time(int(datetime.strptime('2021-11-07','%Y-%m-%d').timestamp()))
],
)
# Data Write Method 3
IC.write_data([
{
"measurement": "MSFT_2021-11-07_Dictionary_Method",
"tags": {"stock": "MSFT"},
"fields": {
"Open": 66,
"High": 63.38,
"Low": 62.13,
},
"time": int(datetime.strptime('2021-11-07','%Y-%m-%d').timestamp())
},
{
"measurement": "MSFT_DATE",
"tags": {"stock": "MSFT"},
"fields": {
"Open": 67,
"High": 63.38,
"Low": 62.13,
},
}
],write_option=ASYNCHRONOUS)
接下来,插入MSFT股票和AAPL股票的所有数据。由于数据存储在CSV文件中,因此您可以使用第一种方法——行协议字符串——来写入数据
import csv
MSFT_file = open('Data/MSFT.csv')
csvreader = csv.reader(MSFT_file)
header = next(csvreader)
rows = []
for row in csvreader:
date,open,high,low = row[0],row[1],row[2],row[3]
line_protocol_string = ''
line_protocol_string+=f'MSFT_{date},'
line_protocol_string+=f'stock=MSFT '
line_protocol_string+=f'Open={open},High={high},Low={low} '
line_protocol_string+=str(int(datetime.strptime(date,'%Y-%m-%d').timestamp()))
rows.append(line_protocol_string)
IC.write_data(rows)
您可以通过更改文件路径和字符串从MSFT更改为AAPL来插入AAPL股票的数据
AAPL_file = open('Data/AAPL.csv')
csvreader = csv.reader(AAPL_file)
读取数据
InfluxDBClient
还有一个名为query_api
的方法,可用于读取数据。您可以使用查询执行各种操作,例如根据特定日期过滤数据、在时间范围内聚合数据、查找时间范围内的最高/最低值等。它们类似于您在SQL中使用的查询。在从InfluxDB读取数据时,您需要使用查询。
以下代码是我们的类读取方法
def query_data(self,query):
query_api = self._client.query_api()
result = query_api.query(org=self._org, query=query)
results = []
for table in result:
for record in table.records:
results.append((record.get_value(), record.get_field()))
print(results)
return results
在这里,它接受一个查询并执行它。查询的返回值是匹配您查询的Flux对象的集合。Flux对象有以下方法
.get_measurement()
.get_field()
.values.get("<your tags>")
.get_time()
以下显示了两个查询示例,展示了query_data
函数的作用。第一个查询返回自2021年10月1日以来MSFT股票的最高值,第二个查询返回2021年10月29日MSFT股票的最高值。
'''
Return the High Value for MSFT stock for since 1st October,2021
'''
query1 = 'from(bucket: "TestBucket")\
|> range(start: 1633124983)\
|> filter(fn: (r) => r._field == "High")\
|> filter(fn: (r) => r.stock == "MSFT")'
IC.query_data(query1)
'''
Return the High Value for the MSFT stock on 2021-10-29
'''
query2 = 'from(bucket: "TestBucket")\
|> range(start: 1633124983)\
|> filter(fn: (r) => r._field == "High")\
|> filter(fn: (r) => r._measurement == "MSFT_2021-10-29")'
IC.query_data(query2)
确保根据需要更改查询开头处的bucket名称。在我的情况下,我的bucket名称是TestBucket。
更新数据
与写入和查询API不同,InfluxDB没有更新API。以下语句摘自他们关于如何处理重复数据点的文档。
对于具有相同测量名称、标签集和时间的点,InfluxDB会创建旧字段集和新字段集的并集。对于任何匹配的字段键,InfluxDB使用新点的字段值。
要更新数据点,您需要具有名称、标签集和时间,然后简单地进行写入操作。
删除数据
您可以使用delete_api
来删除数据。以下是一些演示如何删除数据的代码。
def delete_data(self,measurement):
delete_api = self._client.delete_api()
start = "1970-01-01T00:00:00Z"
stop = "2021-10-30T00:00:00Z"
delete_api.delete(start, stop, f'_measurement="{measurement}"', bucket=self._bucket, org=self._org)
删除函数需要数据点的测量值。以下代码显示了删除函数的一个简单用例。
'''
Delete Data Point with measurement = 2021-10-29
'''
IC.delete_data("MSFT_2021-10-29")
'''
Return the High Value for the MSFT stock on 2021-10-29
'''
query2 = 'from(bucket: "TestBucket")\
|> range(start: 1633124983)\
|> filter(fn: (r) => r._field == "High")\
|> filter(fn: (r) => r._measurement == "MSFT_2021-10-29")'
IC.query_data(query2)
InfluxDB的文档包括一份关于如何写入数据的最佳实践列表。还有一些关于数据布局和模式设计的最佳实践,您应遵循这些最佳实践以获得最佳结果。
时间序列数据库的一些实际用例
本文探讨了TSDB存储股票价值的简单用例,以便您可以分析历史股票价格并预测未来值。但是,您也可以与物联网设备、销售数据以及任何其他随时间变化的数据系列一起工作。
其他一些实际用例包括
结论
希望本指南使您能够设置自己的InfluxDB实例。您学习了如何使用InfluxDB的Python客户端库构建一个简单的应用程序以执行CRUD操作,但如果您想深入了解任何内容,您可以在这里找到包含整个源代码的repo。
查看InfluxDB的开源TSDB。它为Python、C++和JavaScript等十种编程语言提供了客户端库,并且它还包含许多内置的可视化工具,您可以查看您的数据究竟在做什么。
关于作者
Rahul是一名喜欢摆弄不同库和API的计算机工程学生。