从Python和InfluxDB开始

导航至

本文由Pravin Rahul Banerjee撰写,最初发布于The New Stack。向下滚动查看作者的图片和简介。

尽管时序数据可以存储在MySQL或PostgreSQL数据库中,但这并不特别高效。如果您想存储每分钟都会变化的(一年中超过五十万个数据点!)来自数千个不同传感器、服务器、容器或设备的数据,您不可避免地会遇到可扩展性问题。使用关系数据库查询或对此类数据进行聚合也会导致性能问题。

另一方面,时序数据库(TSDB)针对存储时序数据点进行了优化。这在以下情况下尤其有用:

  • 分析股价的金融趋势。
  • 销售预测。
  • 监控API或Web服务的日志和指标。
  • 为了安全目的监控汽车或飞机的传感器数据。
  • 跟踪物联网设备(如智能电网)的电力使用情况。
  • 跟踪运动员在比赛中的生命体征和表现。

InfluxDB已创建了一个开源的时序数据库,使开发者能够更容易地处理时序数据。本文将向您展示如何使用Python设置InfluxDB,并通过Yahoo Finance API获取股票数据。

您可以在这个repo中访问本教程中编写的所有代码。

为什么使用InfluxDB?

InfluxDB自带预构建的仪表板,您可以无需太多前期工作即可分析时序数据。别忘了它比Elasticsearch和Cassandra表现更好

它有一个免费的本地开源版本,还有一个支持主要云服务(如AWS、GCP和Azure)的云版本。

使用Python设置InfluxDB

在开始之前,请确保您的计算机上已安装Python 3.6或更高版本。您还需要一个虚拟环境。本文使用venv,但您也可以使用conda、pipenv或pyenv。

最后,您需要具备一些Flux查询经验。

本指南使用influxdb-client-python模块与InfluxDB交互。该库仅支持InfluxDB 2.x和InfluxDB 1.8+,且需要Python 3.6或更高版本。

一切准备就绪?让我们开始安装和连接客户端库。

如果您在计算机上安装了Docker,可以简单地使用以下命令运行InfluxDB的Docker镜像

docker run --name influxdb -p 8086:8086 influxdb:2.1.0

如果您没有Docker,请在此处下载您操作系统对应的软件并安装它。如果您在Mac上运行InfluxDB,可以使用Homebrew来安装。

brew install influxdb

如果您正在运行Docker镜像,可以直接访问localhost 8086。但是,如果您下载了软件并安装了它,您需要在命令行中输入以下内容

influxd

您应该在localhost 8086上看到以下屏幕

You should see the following screen

点击开始使用,这将带您到以下页面

Click Get Started

对于本教程,请选择快速开始并在该页面上填写您的信息

choose Quick Start

您还可以稍后创建组织和存储桶,但现在是,只需为每个这些字段选择一个简单的名称。

注册后,您应该会出现在仪表板页面上。点击加载数据然后选择Python客户端库。

Load your data and choose Python client library

现在您应该看到以下屏幕

Python screen

令牌下,应该已经列出了一个令牌。但是,如果您愿意,您可以为本教程生成一个新的令牌。点击生成令牌并选择所有访问令牌,因为您将在教程中稍后更新和删除数据。

请注意,InfluxDB在此点会发出警告,但您现在可以忽略它。

ignore warning

现在,您需要设置一个Python虚拟环境。为教程创建一个新文件夹

mkdir influxDB-Tutorial

然后切换到新文件夹

cd influxDB-Tutorial

创建一个虚拟环境

python3 -m venv venv

激活它。

source venv/bin/activate

最后,安装InfluxDB的客户端库

pip install influxdb-client

创建一个名为__init__.py的新文件,然后回到InfluxDB UI

Python - create new filename

选择合适的令牌和存储桶,然后复制初始化客户端下的代码片段并将其粘贴到Python文件中。如果您更改了令牌/存储桶选择,代码片段将自动更新。

接下来,运行您的Python文件

python3 __init__.py

如果终端中没有显示错误消息,则您已成功连接到InfluxDB。

为了遵循最佳实践,您可以将凭据存储在.env文件中。创建一个名为.env的文件并存储以下信息

TOKEN = 'YOUR TOKEN'
ORG = 'YOUR ORG NAME'
BUCKET = 'YOUR BUCKET NAME'

然后安装python-dotenv模块来读取.env变量

pip3 install python-dotenv

最后,更新您的Python文件以从.env文件加载数据

from datetime import datetime
from dotenv import load_dotenv, main
import os
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS

load_dotenv()
# You can generate a Token from the "Tokens Tab" in the UI
token = os.getenv('TOKEN')
org = os.getenv('ORG')
bucket = os.getenv('BUCKET')

client = InfluxDBClient(url="https://127.0.0.1:8086", token=token)

请注意,如果您正在使用InfluxDB云账户,则需要更改url参数。URL将取决于您选择的云区域。云URL可以在文档中找到。

导入DateTime模块和InfluxDB库的行将在教程的稍后部分需要。将所有导入语句放在开始处是一个好习惯。但是,如果您选择的话,也可以在需要时导入。

或者,您可以将凭据存储在扩展名为.ini.toml的文件中,并使用from_config_file函数连接到InfluxDB

使用influxdb-client-python进行CRUD操作

本文使用Python中的yfinance模块收集了一些历史股票数据。使用以下命令安装它

pip install yfinance

您可以使用以下代码片段来获取数据

import yfinance as yf
data = yf.download("MSFT", start="2021-01-01", end="2021-10-30")
print(data.to_csv())

确保将文件名参数传递给to_csv方法;这将本地存储CSV文件,以便您稍后读取数据。

或者,您可以从GitHub仓库获取CSV文件。

接下来,创建一个类并添加CRUD操作为其方法

class InfluxClient:
    def __init__(self,token,org,bucket): 
        self._org=org 
        self._bucket = bucket
        self._client = InfluxDBClient(url="https://127.0.0.1:8086", token=token)

如果您使用的是InfluxDB的云实例,则需要将URL参数替换为正确的云区域。

要创建类的实例,请使用以下命令

IC = InfluxClient(token,org,bucket)

写入数据

InfluxDBClient有一个名为write_api的方法,用于将数据写入您的数据库。以下是此方法的代码片段

from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS

def write_data(self,data,write_option=SYNCHRONOUS):
    write_api = self._client.write_api(write_option)
    write_api.write(self._bucket, self._org , data,write_precision='s')

InfluxDBClient支持异步和同步写入,并且您可以根据需要指定写入类型。

有关异步写入的更多信息,请参阅如何在influxdb-client中使用Asyncio

参数data可以以下三种方式之一进行写入,如下所示

行协议字符串

# Data Write Method 1
IC.write_data(["MSFT,stock=MSFT Open=62.79,High=63.84,Low=62.13"])

请注意,字符串必须遵循特定的格式

measurementName,tagKey=tagValue fieldKey1="fieldValue1",fieldKey2=fieldValue2 timestamp

在tagValue和第一个fieldKey之间有一个空格,在最后一个fieldValue和timeStamp之间也有一个空格。在解析时,这些空格用作分隔符;因此,您必须按照上述方式格式化它。请注意,在这种情况下,我假设第一个字段值fieldValue1是字符串,而fieldValue2是数字。因此,fieldValue1应该出现在引号内。

请注意,时间戳是可选的。如果没有提供时间戳,InfluxDB将使用其主机的系统时间(UTC)。您可以在这里了解更多有关行协议的信息。

数据点结构

# Data Write Method 2
IC.write_data(
[
Point('MSFT')
.tag("stock","MSFT")
.field("Open",62.79)
.field("High",63.38)
.field("Low",62.13)
.time(int(datetime.strptime('2021-11-07','%Y-%m-%d').timestamp()))
],
)

如果您不想处理行协议字符串中的格式,则可以使用Point()类。这确保了您的数据以正确的行协议进行了适当的序列化。

字典样式

# Data Write Method 3
IC.write_data([
{
"measurement": "MSFT",
"tags": {"stock": "MSFT"},
"fields": {
"Open": 62.79,
"High": 63.38,
"Low": 62.13,
},
"time": int(datetime.strptime('2021-11-07','%Y-%m-%d').timestamp())
},
{
"measurement": "MSFT_DATE",
"tags": {"stock": "MSFT"},
"fields": {
"Open": 62.79,
"High": 63.38,
"Low": 62.13,
},
}
],write_option=ASYNCHRONOUS)

在此方法中,您正在传递两个数据点并设置写入选项为ASYNCHRONOUS。这对于Python来说很友好,因为数据是以字典的形式传递的。

所有不同的数据写入方式都汇总在下面的代码片段中

# Data Write Method 1
IC.write_data(["MSFT_2021-11-07_Line_Protocol,stock=MSFT Open=62.79,High=63.84,Low=62.13"])

# Data Write Method 2
IC.write_data(
    [
        Point('MSFT_2021-11-07_Point_Class')
        .tag("stock","MSFT")
        .field("Open",65)
        .field("High",63.38)
        .field("Low",62.13)
        .time(int(datetime.strptime('2021-11-07','%Y-%m-%d').timestamp()))
    ],
    )

# Data Write Method 3
IC.write_data([
    {
        "measurement": "MSFT_2021-11-07_Dictionary_Method", 
        "tags": {"stock": "MSFT"}, 
        "fields": {
                "Open": 66,
                "High": 63.38,
                "Low": 62.13,
                }, 
        "time": int(datetime.strptime('2021-11-07','%Y-%m-%d').timestamp())
    },
    {
        "measurement": "MSFT_DATE", 
        "tags": {"stock": "MSFT"}, 
        "fields": {
                "Open": 67,
                "High": 63.38,
                "Low": 62.13,
                }, 
    }
],write_option=ASYNCHRONOUS)

接下来,插入MSFT股票和AAPL股票的所有数据。由于数据存储在CSV文件中,因此您可以使用第一种方法——行协议字符串——来写入数据

import csv
MSFT_file = open('Data/MSFT.csv')
csvreader = csv.reader(MSFT_file)
header = next(csvreader)
rows = []
for row in csvreader:
        date,open,high,low = row[0],row[1],row[2],row[3]
        line_protocol_string = ''
        line_protocol_string+=f'MSFT_{date},'
        line_protocol_string+=f'stock=MSFT '
        line_protocol_string+=f'Open={open},High={high},Low={low} '
        line_protocol_string+=str(int(datetime.strptime(date,'%Y-%m-%d').timestamp()))
        rows.append(line_protocol_string)

IC.write_data(rows)

您可以通过更改文件路径和字符串从MSFT更改为AAPL来插入AAPL股票的数据

AAPL_file = open('Data/AAPL.csv')
csvreader = csv.reader(AAPL_file)

读取数据

InfluxDBClient还有一个名为query_api的方法,可用于读取数据。您可以使用查询执行各种操作,例如根据特定日期过滤数据、在时间范围内聚合数据、查找时间范围内的最高/最低值等。它们类似于您在SQL中使用的查询。在从InfluxDB读取数据时,您需要使用查询。

以下代码是我们的类读取方法

def query_data(self,query):
    query_api = self._client.query_api()
    result = query_api.query(org=self._org, query=query)
    results = []
    for table in result:
        for record in table.records:
            results.append((record.get_value(), record.get_field()))
    print(results)
    return results

在这里,它接受一个查询并执行它。查询的返回值是匹配您查询的Flux对象的集合。Flux对象有以下方法

.get_measurement()
.get_field()
.values.get("<your tags>")
.get_time()

以下显示了两个查询示例,展示了query_data函数的作用。第一个查询返回自2021年10月1日以来MSFT股票的最高值,第二个查询返回2021年10月29日MSFT股票的最高值。

'''
    Return the High Value for MSFT stock for since 1st October,2021
'''
query1 = 'from(bucket: "TestBucket")\
|> range(start: 1633124983)\
|> filter(fn: (r) => r._field == "High")\
|> filter(fn: (r) => r.stock == "MSFT")'
IC.query_data(query1)

'''
    Return the High Value for the MSFT stock on 2021-10-29
'''
query2 = 'from(bucket: "TestBucket")\
|> range(start: 1633124983)\
|> filter(fn: (r) => r._field == "High")\
|> filter(fn: (r) => r._measurement == "MSFT_2021-10-29")'
IC.query_data(query2)

确保根据需要更改查询开头处的bucket名称。在我的情况下,我的bucket名称是TestBucket

更新数据

与写入和查询API不同,InfluxDB没有更新API。以下语句摘自他们关于如何处理重复数据点的文档。

对于具有相同测量名称、标签集和时间的点,InfluxDB会创建旧字段集和新字段集的并集。对于任何匹配的字段键,InfluxDB使用新点的字段值。

要更新数据点,您需要具有名称、标签集和时间,然后简单地进行写入操作。

删除数据

您可以使用delete_api来删除数据。以下是一些演示如何删除数据的代码。

def delete_data(self,measurement):
    delete_api = self._client.delete_api()
    start = "1970-01-01T00:00:00Z"
    stop = "2021-10-30T00:00:00Z"
    delete_api.delete(start, stop, f'_measurement="{measurement}"', bucket=self._bucket, org=self._org)

删除函数需要数据点的测量值。以下代码显示了删除函数的一个简单用例。

'''
  Delete Data Point with measurement = 2021-10-29
'''
IC.delete_data("MSFT_2021-10-29")

'''
    Return the High Value for the MSFT stock on 2021-10-29
'''
query2 = 'from(bucket: "TestBucket")\
|> range(start: 1633124983)\
|> filter(fn: (r) => r._field == "High")\
|> filter(fn: (r) => r._measurement == "MSFT_2021-10-29")'
IC.query_data(query2)

InfluxDB的文档包括一份关于如何写入数据的最佳实践列表。还有一些关于数据布局和模式设计的最佳实践,您应遵循这些最佳实践以获得最佳结果。

时间序列数据库的一些实际用例

本文探讨了TSDB存储股票价值的简单用例,以便您可以分析历史股票价格并预测未来值。但是,您也可以与物联网设备、销售数据以及任何其他随时间变化的数据系列一起工作。

其他一些实际用例包括

  1. 使用Tensorflow和InfluxDB进行时间序列预测
  2. 将InfluxDB与IFTTT集成以监控您的智能家居
  3. 监控您的网络速度

结论

希望本指南使您能够设置自己的InfluxDB实例。您学习了如何使用InfluxDB的Python客户端库构建一个简单的应用程序以执行CRUD操作,但如果您想深入了解任何内容,您可以在这里找到包含整个源代码的repo。

查看InfluxDB的开源TSDB。它为Python、C++和JavaScript等十种编程语言提供了客户端库,并且它还包含许多内置的可视化工具,您可以查看您的数据究竟在做什么。

关于作者

Rahul Banerjee

Rahul是一名喜欢摆弄不同库和API的计算机工程学生。