开始使用 Python 和 InfluxDB

导航至

本文由 Pravin Rahul Banerjee 撰写,最初由 The New Stack 发布。向下滚动查看作者的照片和简介。 

尽管时间序列数据可以存储在 MySQL 或 PostgreSQL 数据库中,但这效率不高。如果您想存储每分钟都在变化的数据(每年超过 50 万个数据点!),这些数据可能来自数千个不同的传感器、服务器、容器或设备,那么您不可避免地会遇到可扩展性问题。在使用关系数据库时,查询或对这些数据执行聚合也会导致性能问题。

另一方面,时间序列数据库 (TSDB) 经过优化,可以存储时间序列数据点。这在以下情况下尤其有用:

  • 分析股票价格的金融趋势。
  • 销售预测。
  • 监控 API 或 Web 服务的日志和指标。
  • 出于安全目的,监控来自汽车或飞机的传感器数据。
  • 跟踪物联网设备(如智能电网)中的功耗。
  • 跟踪运动员在比赛期间的生命体征和表现。

InfluxDB 创建了一个开源的 时间序列数据库 ,使开发人员更容易处理 时间序列数据。本文将向您展示如何使用 Python 设置 InfluxDB,并使用 Yahoo Finance API 获取股票数据。

您可以在 此存储库中访问本教程中编写的所有代码。

为什么使用 InfluxDB?

InfluxDB 附带一个预构建的仪表板,您可以在其中分析时间序列数据,而无需太多准备工作。而且,别忘了它 性能优于 Elasticsearch 和 Cassandra

它有一个免费的开源版本,您可以在本地运行,还有一个云版本,它 支持主要的云服务 ,如 AWS、GCP 和 Azure。

使用 Python 设置 InfluxDB

开始之前,请确保您的计算机上已安装 Python 3.6 或更高版本 。您还需要一个虚拟环境。本文使用 venv,但您也可以使用 conda、pipenv 或 pyenv。

最后,需要一些 Flux 查询的经验。

本指南使用模块 influxdb-client-python 与 InfluxDB 交互。该库仅支持 InfluxDB 2.x 和 InfluxDB 1.8+,并且需要 Python 3.6 或更高版本。

都准备好了吗?让我们开始安装和连接客户端库。

如果您的计算机上安装了 Docker,您可以简单地使用以下命令运行 InfluxDB 的 Docker 镜像

docker run --name influxdb -p 8086:8086 influxdb:2.1.0

如果您没有 Docker,请在此处下载适用于您操作系统的软件并安装它。如果您在 Mac 上运行 InfluxDB,可以使用 Homebrew 安装它

brew install influxdb

如果您正在运行 Docker 镜像,您可以直接访问 localhost 8086。但是,如果您下载并安装了该软件,则需要在命令行中输入以下内容

influxd

您应该在 localhost 8086 上看到以下屏幕

You should see the following screen

单击开始使用,这将重定向到以下页面

Click Get Started

在本教程中,选择快速启动并在本页上输入您的信息

choose Quick Start

您也可以稍后创建组织和存储桶,但现在,只需为这些字段选择一个简单的名称即可。

注册后,您应该会进入仪表板页面。单击加载您的数据,然后选择 Python 客户端库。

Load your data and choose Python client library

您现在应该看到以下屏幕

Python screen

令牌下,应该已经列出了一个令牌。但是,如果您愿意,您可以为此教程生成一个新令牌。单击生成令牌并选择所有访问令牌,因为您稍后将在教程中更新和删除数据。

请注意,InfluxDB 此时会发出警告,但您现在可以忽略它。

ignore warning

现在,您必须设置一个 Python 虚拟环境。为教程创建一个新文件夹

mkdir influxDB-Tutorial

然后将您的目录更改为新文件夹

cd influxDB-Tutorial

创建一个虚拟环境

python3 -m venv venv

激活它。

source venv/bin/activate

最后,安装 InfluxDB 的客户端库

pip install influxdb-client

创建一个名为 __init.py__ 的新文件,然后返回到 InfluxDB UI

Python - create new filename

选择适当的令牌和存储桶,然后复制初始化客户端下的代码片段,并将其粘贴到您的 Python 文件中。如果您更改令牌/存储桶选择,代码片段将自动更新。

接下来,运行您的 Python 文件

python3 __init__.py

如果在终端中未显示错误消息,则您已成功连接到 InfluxDB。

为了遵循最佳实践,您可以将凭据存储在 .env 文件中。创建一个名为 .env 的文件并存储以下信息

TOKEN = 'YOUR TOKEN'
ORG = 'YOUR ORG NAME'
BUCKET = 'YOUR BUCKET NAME'

然后安装 python-dotenv 模块以读取 .env 变量

pip3 install python-dotenv

最后,更新您的 Python 文件以从 .env 文件加载数据

from datetime import datetime
from dotenv import load_dotenv, main
import os
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS

load_dotenv()
# You can generate a Token from the "Tokens Tab" in the UI
token = os.getenv('TOKEN')
org = os.getenv('ORG')
bucket = os.getenv('BUCKET')

client = InfluxDBClient(url="http://localhost:8086", token=token)

请注意,如果您使用的是 InfluxDB Cloud 帐户,则需要更改 url 参数。URL 将取决于您选择的云区域。云 URL 可以在此处的文档中找到。

导入 DateTime 模块和 InfluxDB 库的行将在本教程的后面部分中需要。最好将所有导入语句放在开头。但是,如果您选择这样做,也可以在必要时导入它们。

或者,您可以将凭据存储在扩展名为 .ini.toml 的文件中,并使用 from_config_file 函数连接到 InfluxDB

使用 influxdb-client-python 进行 CRUD 操作

本文使用了 Python 中的 yfinance 模块来收集一些历史股票数据。使用以下命令安装它

pip install yfinance

您可以使用以下代码片段来获取数据

import yfinance as yf
data = yf.download("MSFT", start="2021-01-01", end="2021-10-30")
print(data.to_csv())

确保将文件名参数传递给 to_csv 方法;这将把 CSV 本地存储,以便您稍后可以读取数据。

或者,您可以从 GitHub 存储库获取 CSV 文件。

接下来,创建一个类并将 CRUD 操作添加为其方法

class InfluxClient:
    def __init__(self,token,org,bucket): 
        self._org=org 
        self._bucket = bucket
        self._client = InfluxDBClient(url="http://localhost:8086", token=token)

如果您使用的是 InfluxDB 的云实例,您将需要将 URL 参数替换为正确的 云区域。

要创建类的实例,请使用此命令

IC = InfluxClient(token,org,bucket)

写入数据

InfluxDBClient 有一个名为 write_api 的方法,用于将数据写入数据库。以下是此方法的代码片段

from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS

def write_data(self,data,write_option=SYNCHRONOUS):
    write_api = self._client.write_api(write_option)
    write_api.write(self._bucket, self._org , data,write_precision='s')

InfluxDBClient 支持异步和同步写入,您可以根据需要指定写入类型。

有关异步写入的更多信息,请参阅 如何在 influxdb-client 中使用 Asyncio。

data 参数可以通过三种不同的方式写入,如下所示

行协议字符串

# Data Write Method 1
IC.write_data(["MSFT,stock=MSFT Open=62.79,High=63.84,Low=62.13"])

请注意,字符串必须遵循特定格式

measurementName,tagKey=tagValue fieldKey1="fieldValue1",fieldKey2=fieldValue2 timestamp

在 tagValue 和第一个 fieldKey 之间有一个空格,在最后一个 fieldValue 和 timeStamp 之间有另一个空格。在解析时,这些空格用作分隔符;因此,您必须按照上面显示的方式对其进行格式化。另请注意,在这种情况下,我假设第一个字段值 fieldValue1 是一个字符串,而 fieldValue2 是一个数字。因此,fieldValue1 应该用引号引起来。

另请注意,时间戳是可选的。如果未提供时间戳,InfluxDB 将使用其主机系统的系统时间 (UTC)。您可以在此处阅读有关 行协议的更多信息

数据点结构

# Data Write Method 2
IC.write_data(
[
Point('MSFT')
.tag("stock","MSFT")
.field("Open",62.79)
.field("High",63.38)
.field("Low",62.13)
.time(int(datetime.strptime('2021-11-07','%Y-%m-%d').timestamp()))
],
)

如果您不想处理行协议字符串中的格式,可以使用 Point() 类。这可确保您的数据正确序列化为行协议。

字典样式

# Data Write Method 3
IC.write_data([
{
"measurement": "MSFT",
"tags": {"stock": "MSFT"},
"fields": {
"Open": 62.79,
"High": 63.38,
"Low": 62.13,
},
"time": int(datetime.strptime('2021-11-07','%Y-%m-%d').timestamp())
},
{
"measurement": "MSFT_DATE",
"tags": {"stock": "MSFT"},
"fields": {
"Open": 62.79,
"High": 63.38,
"Low": 62.13,
},
}
],write_option=ASYNCHRONOUS)

在这种方法中,您传递两个数据点并将写入选项设置为 ASYNCHRONOUS。这对于 Python 来说很友好,因为数据是作为字典传递的。

所有不同的数据写入方式都合并在下面的要点中

# Data Write Method 1
IC.write_data(["MSFT_2021-11-07_Line_Protocol,stock=MSFT Open=62.79,High=63.84,Low=62.13"])

# Data Write Method 2
IC.write_data(
    [
        Point('MSFT_2021-11-07_Point_Class')
        .tag("stock","MSFT")
        .field("Open",65)
        .field("High",63.38)
        .field("Low",62.13)
        .time(int(datetime.strptime('2021-11-07','%Y-%m-%d').timestamp()))
    ],
    )

# Data Write Method 3
IC.write_data([
    {
        "measurement": "MSFT_2021-11-07_Dictionary_Method", 
        "tags": {"stock": "MSFT"}, 
        "fields": {
                "Open": 66,
                "High": 63.38,
                "Low": 62.13,
                }, 
        "time": int(datetime.strptime('2021-11-07','%Y-%m-%d').timestamp())
    },
    {
        "measurement": "MSFT_DATE", 
        "tags": {"stock": "MSFT"}, 
        "fields": {
                "Open": 67,
                "High": 63.38,
                "Low": 62.13,
                }, 
    }
],write_option=ASYNCHRONOUS)

接下来,插入 MSFT 股票和 AAPL 股票的所有数据。由于数据存储在 CSV 文件中,因此您可以使用第一种方法——行协议字符串——来写入数据

import csv
MSFT_file = open('Data/MSFT.csv')
csvreader = csv.reader(MSFT_file)
header = next(csvreader)
rows = []
for row in csvreader:
        date,open,high,low = row[0],row[1],row[2],row[3]
        line_protocol_string = ''
        line_protocol_string+=f'MSFT_{date},'
        line_protocol_string+=f'stock=MSFT '
        line_protocol_string+=f'Open={open},High={high},Low={low} '
        line_protocol_string+=str(int(datetime.strptime(date,'%Y-%m-%d').timestamp()))
        rows.append(line_protocol_string)

IC.write_data(rows)

您可以通过更改文件路径并将字符串从 MSFT 更改为 AAPL 来插入 AAPL 股票的数据

AAPL_file = open('Data/AAPL.csv')
csvreader = csv.reader(AAPL_file)

读取数据

InfluxDBClient 还有一个名为 query_api 的方法,可用于读取数据。您可以使用查询来实现各种目的,例如根据特定日期过滤数据、在时间范围内聚合数据、查找时间范围内的最高/最低值等等。它们类似于您在 SQL 中使用的查询。从 InfluxDB 读取数据时,您需要使用查询。

以下代码是我们类的读取方法

def query_data(self,query):
    query_api = self._client.query_api()
    result = query_api.query(org=self._org, query=query)
    results = []
    for table in result:
        for record in table.records:
            results.append((record.get_value(), record.get_field()))
    print(results)
    return results

在这里,它接受一个查询,然后执行它。查询的返回值是与您的查询匹配的 Flux 对象集合。Flux 对象具有以下方法

.get_measurement()
.get_field()
.values.get("<your tags>")
.get_time()

下面显示了两个查询示例,演示了 query_data 函数的实际应用。第一个查询返回自 2021 年 10 月 1 日以来 MSFT 股票的最高值,第二个查询返回 2021-10-29 MSFT 股票的最高值。

'''
    Return the High Value for MSFT stock for since 1st October,2021
'''
query1 = 'from(bucket: "TestBucket")\
|> range(start: 1633124983)\
|> filter(fn: (r) => r._field == "High")\
|> filter(fn: (r) => r.stock == "MSFT")'
IC.query_data(query1)

'''
    Return the High Value for the MSFT stock on 2021-10-29
'''
query2 = 'from(bucket: "TestBucket")\
|> range(start: 1633124983)\
|> filter(fn: (r) => r._field == "High")\
|> filter(fn: (r) => r._measurement == "MSFT_2021-10-29")'
IC.query_data(query2)

请确保根据需要在查询开头更改存储桶名称。在我的例子中,我的存储桶名称是TestBucket

更新数据

与 Write 和 Query API 不同,InfluxDB 没有 Update API。以下声明取自他们关于如何处理重复数据点的文档。

对于具有相同测量名称、标签集和时间戳的点,InfluxDB 创建旧字段集和新字段集的并集。对于任何匹配的字段键,InfluxDB 使用新点的字段值

要更新数据点,您需要具有名称、标签集和时间戳,并简单地执行写入操作。

删除数据

您可以使用 delete_api 删除数据。以下是一些演示如何删除数据的代码

def delete_data(self,measurement):
    delete_api = self._client.delete_api()
    start = "1970-01-01T00:00:00Z"
    stop = "2021-10-30T00:00:00Z"
    delete_api.delete(start, stop, f'_measurement="{measurement}"', bucket=self._bucket, org=self._org)

删除函数需要数据点的测量值。以下代码显示了删除函数的简单用例

'''
  Delete Data Point with measurement = 2021-10-29
'''
IC.delete_data("MSFT_2021-10-29")

'''
    Return the High Value for the MSFT stock on 2021-10-29
'''
query2 = 'from(bucket: "TestBucket")\
|> range(start: 1633124983)\
|> filter(fn: (r) => r._field == "High")\
|> filter(fn: (r) => r._measurement == "MSFT_2021-10-29")'
IC.query_data(query2)

InfluxDB 的文档包含一份写入数据的最佳实践列表。还有一些数据布局和模式设计最佳实践,您应该遵循这些实践以获得最佳结果。

时间序列数据库的一些实际用例

本文研究了 TSDB 的一个简单用例,即存储股票值,以便您可以分析历史股价并预测未来值。但是,您也可以处理物联网设备、销售数据以及任何其他随时间变化的数据序列。

其他一些实际用例包括

  1. 使用 Tensorflow 和 InfluxDB 进行时间序列预测
  2. 将 InfluxDB 与 IFTTT 集成以监控您的智能家居
  3. 监控您的互联网速度

结论

希望本指南使您能够设置自己的 InfluxDB 实例。您学习了如何构建一个简单的应用程序,以使用 InfluxDB 的 Python 客户端库执行 CRUD 操作,但如果您想仔细查看任何内容,可以在此处找到包含完整源代码的存储库。

查看 InfluxDB 的开源 TSDB。它拥有包括 Python、C++ 和 JavaScript 在内的十种编程语言的客户端库,并且还拥有许多内置的可视化工具,因此您可以准确了解您的数据在做什么。

关于作者

Rahul Banerjee

Rahul 是一名计算机工程专业的学生,他喜欢摆弄不同的库/API。