PHP 和 InfluxDB 入门

导航至

本文由 Cameron Pavey 撰写,他是一位在墨尔本生活和工作的全栈开发人员。向下滚动查看他的照片和简介。

作为一名开发人员,您很可能会最终遇到传统关系数据库的文档存储不太够用的情况。如果您需要存储随时间变化的数据点,则可能需要时间序列数据库。

时间序列数据库非常适合存储系统监控数据、分析报告数据以及来自传感器(例如心率传感器和温度传感器)的数据。任何时候您想要查看数据随时间变化的趋势,时间序列数据库都能满足您的需求。

InfluxDB 是一个强大、开源的时间序列平台,具有适用于多种语言的各种客户端库,可以轻松地为您的应用程序带来由时间序列数据驱动的功能。

在本指南中,您将学习如何使用 PHP 应用程序设置 InfluxDB,连接到数据库、写入数据、读取数据以及在所有设置完成后可以执行的操作的基础知识。

InfluxDB 是一个强大、开源的时间序列平台,具有适用于多种语言的各种客户端库,可以轻松地为您的应用程序带来由时间序列数据驱动的功能。

InfluxDB 客户端库

在本指南中,您将使用官方 PHP 库连接到您的 InfluxDB 实例。此库适用于 InfluxDB 2.x 和 1.8+ 版本。对于 1.7 及更早版本,您需要使用旧的 InfluxDB 客户端库;但是,它与本指南不兼容。

要开始使用,您需要满足一些先决条件。自然地,您需要有一个正在运行的 InfluxDB 实例来指向您的应用程序。本指南是使用 InfluxDB v2.0 和通过 Homebrew 安装在 macOS 系统上的 PHP 8 编写的。

不过,这些步骤也应适用于其他操作系统和安装来源,例如 Docker。您可以参考官方文档以获取 InfluxDBPHP 的正确安装选项。

如果您尚未创建目录来存放本指南的代码,请立即创建一个。有了目录后,您可以使用 composer 通过从终端执行以下命令来安装 InfluxDB 客户端

$ composer require influxdata/influxdb-client-php

您还需要在 InfluxDB 中创建一个存储桶来存储您的数据。本指南将假定您的组织和存储桶都名为“influx”,但您可以随意命名它们。

如果您以前没有使用过 InfluxDB,安装完成后,请导航到 [localhost:8086],系统将提示您设置初始用户详细信息。然后,您可以在本指南的其余部分中使用这些详细信息。

建立连接

要在您的 PHP 应用程序中使用您的 InfluxDB 实例,您需要使用您通过 Composer 下载的客户端库建立连接。

首先,您需要创建客户端的实例,并为其提供您的实例的详细信息。为此,您可以创建一个如下所示的变量

# First, let's make use of Composer so that we can access the Library
require __DIR__ . '/vendor/autoload.php';

# You can generate a Token from the "Tokens Tab" in the UI
$token = '...';
$org = 'influx';
$bucket = 'influx';

# Next, we will instantiate the client and establish a connection
$client = new InfluxDB2\Client([
	"url" => "http://localhost:8086", // url and port of your instance
	"token" => $token,
	"bucket" => $bucket,
	"org" => $org,
	"precision" => InfluxDB2\Model\WritePrecision::NS,
]);

$writeApi = $client->createWriteApi();

要检查 InfluxDB 服务器的状态以确保一切正常运行,您可以使用 $client->health();。此函数将执行 influx ping 并返回一个描述服务器状态以及它是否已准备好接收查询的对象。

在本指南中,您将看到在读取和写入 InfluxDB 数据时可用的各种选项。为了帮助构建这些示例,请参阅以下 JSON 对象。这是我们想要添加到数据库的数据点的粗略估计。

{
  "measurement": "temp_c",
  "tags": [
	"location": "Melbourne"
  ],
  "fields": [
	"degrees": 14,
	"humidity": 0.57
  ]
}

插入数据

设置好实例并将初始化代码添加到您的 PHP 应用程序后,就可以开始插入数据了。作为时间序列数据库,InfluxDB 的行为方式与关系数据库略有不同。您可以更流畅地插入数据,而不是遵循严格预定义的模式。数据被分隔到存储桶中。在给定的存储桶中,您有充当数据点的度量,对于给定的度量,您可以拥有字段值和标签。

值得注意的是,度量和标签是索引的,而字段值没有索引,因此最好以这样一种方式构建您的数据,即您的常用查询元数据编码在标签中。围绕设计有效的数据布局有很多细微之处,因此建议您在设计模式之前阅读 InfluxData 官方文档中关于该主题的内容

不过,就目前而言,我们并不真正关心这些更精细的细节。我们的目的是了解如何将 InfluxDB 的强大功能引入我们的 PHP 应用程序。因此,我们将坚持使用简单的模式。

当通过 PHP 客户端插入数据时,主要有三种方法可以做到这一点。

使用 InfluxDB Line Protocol

Line Protocol 是一种由字符串表示的标准数据格式。Line Protocol 由一些离散元素组成,您可以使用这些元素来提供您希望插入的数据。

measurementName,tagKey=tagValue fieldKey="fieldValue" 1465839830100400200
--------------- --------------- --------------------- -------------------
   	|           	|              	|                	|
  Measurement   	Tag set       	Field set        	Timestamp

在这里您可以看到 Line Protocol 的元素。第一个值是度量,后跟逗号分隔的标签键值对列表,然后是逗号分隔的字段键值对列表,最后是数据点的时间戳。

$data = "temp_c,location=Melbourne degrees=14,humidity=0.57";

$writeApi->write($data, WritePrecision::S, $bucket, $org);

这种方法在概念上最简单,当您已经将数据处理成 Line Protocol 格式时(例如来自另一个 API 或文本文件),它可能很有用。

使用数据点

如果您不想手动构建 Line Protocol 字符串,您的下一个选择是使用客户端库提供的数据点构建器,以 PHP 原生方式正确序列化您的数据。这是一种简单、声明式的方式来构建单个数据点,如果您拥有的标签和字段数量有限,这是一个插入一些数据的好方法。

$point = Point::measurement('temp_c')
  ->addTag('location', 'Melbourne')
  ->addField('degrees', 14)
  ->addField('humidity', 0.57)
  ->time(microtime(true));

$writeApi->write($point, WritePrecision::S, $bucket, $org);

使用数组结构

您还可以使用数组结构插入数据。这与之前使用数据点的选项类似,但更有利于添加多个标签和字段。这是一个不错的选择,特别是当与 PHP 的数组函数结合使用时,PHP 的数组函数允许您轻松地从任何来源提取和转换数据,并将其映射到更适合在 InfluxDB 中存储的内容。

$dataArray = ['name' => 'temp_c',
  'tags' => ['location' => 'Melbourne'],
  'fields' => ['degrees' => 14, 'humidity' => 0.57],
  'time' => microtime(true)];

$writeApi->write($dataArray, WritePrecision::S, $bucket, $org);

查询数据

现在您已经在 InfluxDB 中拥有了一些数据,是时候将其查询出来了。执行此操作的两种主要方法是使用表结构或流。无论哪种方式,我们都必须首先实例化一个查询客户端。

$queryApi = $client->createQueryApi();

InfluxDB 查询是使用 Flux 查询语言编写的。如果您不熟悉它或需要复习,官方文档是一个很好的起点。

表结构

您可以查询数据的第一种方式是作为表结构。

$query = "from(bucket: \"$bucket\")
	|> range(start: -1h)
	|> filter(fn: (r) => r._measurement == \"temp_c\")";

$records = $queryApi->query($query, $org);

这将返回 InfluxTable 对象数组,表示查询数据的 CSV 格式。

queryStream 方法

您的另一个选择是使用 queryStream 方法,它将返回一个对象,允许您使用 生成器模式访问 CSV 数据。这非常适合由于资源有限、数据集庞大或仅仅是偏好而导致内存成为问题的情况。

$query = "from(bucket: \"$bucket\")
	|> range(start: -1h)
	|> filter(fn: (r) => r._measurement == \"temp_c\")";

$parser = $queryApi->queryStream($query, $org);

foreach($parser->each() as $record) {
  ...
}

将所有内容组合在一起

现在您已经了解了如何设置客户端库、写入数据,然后将数据读回,是时候将所有部分组合在一起了。在脚本中尝试一下,该脚本写入一些数据点,将其读回,并对其执行一些逻辑。

<?php
use InfluxDB2\Model\WritePrecision;

require __DIR__ . '/vendor/autoload.php';

# You can generate a Token from the "Tokens Tab" in the UI
$token = '...'; # your token here
$org = 'influx';
$bucket = 'influx';

$client = new InfluxDB2\Client([
	"url" => "http://localhost:8086", // url and port of your instance
	"token" => $token,
]);

// The base data we will be inserting
$rawData = [
	["temp" => 8, 'humidity' => .57],
	["temp" => 8, 'humidity' => .59],
	["temp" => 7, 'humidity' => .60],
	["temp" => 7, 'humidity' => .58],
	["temp" => 7, 'humidity' => .54],
	["temp" => 9, 'humidity' => .53],
	["temp" => 10, 'humidity' => .55],
	["temp" => 10, 'humidity' => .59],
	["temp" => 11, 'humidity' => .60],
];

$writeApi = $client->createWriteApi();
foreach ($rawData as $index => $datum) {
	$dataArray = [
    	'name' => 'temp_c',
    	'tags' => ['location' => 'Melbourne'],
    	'fields' => ['degrees' => $datum['temp'], 'humidity' => $datum['humidity']],
    	'time' => microtime(true) - (7200 * $index), // we will populate data going back by the hour
	];
	$writeApi->write($dataArray, WritePrecision::S, $bucket, $org);
}

$queryApi = $client->createQueryApi();
$query = "from(bucket: \"$bucket\")
	|> range(start: -12h)
	|> filter(fn: (r) => r._measurement == \"temp_c\")";

$tables = $queryApi->query($query, $org);

$records = [];
foreach ($tables as $table) {
	foreach ($table->records as $record) {
    	// because we will have multiple fields at the same second in time, we need to merge the data into a single array after we query it out
    	$row = key_exists($record->getTime(), $records) ? $records[$record->getTime()] : [];
    	$records[$record->getTime()] = array_merge($row, [$record->getField() => $record->getValue()]);
	}
}

结论

希望本文能让您对将 InfluxDB 与 PHP 应用程序或脚本一起使用有所了解。InfluxDB 和 PHP 客户端还有很多功能可以实现,因此如果您有兴趣了解更多信息,请参阅官方文档。

关于作者

Cmeraon Pavey shows how to get started with InfluxDB and PHP

Cameron 是一位在墨尔本生活和工作的全栈开发人员。他致力于永无止境的旅程,理解高质量代码、开发者生产力和工作满意度的复杂性。