使用C++和InfluxDB入门

导航至

本文由Pravin Kumar Sinha撰写,最初由The New Stack发布。向下滚动查看作者简介。

虽然关系型数据库管理系统(RDBMS)在以电子表格架构存储表、列和主键方面效率较高,但在长时间内接收到大量数据输入时,它们会变得效率低下。

专门设计用于存储时序数据的数据库称为时序数据库(TSDB)。

例如,一个RDBMS可能看起来像这样

------------------------------------------------------------------
  | Id |   Name         | Age  | Phonenumber     |    email        |
  ------------------------------------------------------------------
  |  0 | Jonn           | 26   | xxxxxx          | [email protected]  |
  |  1 | Susan          | 44   | xxxxxx          | [email protected] |
  |  2 | Alice          | 19   | xxxxxx          | [email protected] |
  |  3 | Jennifer       | 32   | xxxxxx          | [email protected] |
  ---------------------------------------------------------------------

在TSDB中,数据按测量值分类。测量值是一个容器,大致相当于RDBMS中的表,包含时间字段数据元组。每个条目都是一个(时间,字段数据)元组,被称为点。点可以包含一个字符串标签。

TSDB可能看起来更像是这样

----------------------------------------------------
  |       Tag           |   Field     |   Timestamp  |
  |city   device name   | temperature |              |
  ------------------------------------------------------------------
  |HUSTON    deviceX    | 23          | 2021-11-20T12-31-18.865954 |
  |MIAMI     deviceX    | 26          | 2021-11-20T12-31-18.965954 |
  |MIAMI     deviceY    | 23          | 2021-11-20T12-31-19.965954 |
  |SEATTLE   deviceY    | 18          | 2021-11-20T12-31-19.965954 |
  |SEATTLE   deviceZ    | 19          | 2021-11-20T12-31-20.814952 |
  |ATLANTA   deviceA    | 20          | 2021-11-20T12-31-21.665954 |
  |DALLAS    deviceC    | 28          | 2021-11-20T12-31-22.485954 |
  ------------------------------------------------------------------

上面显示的示例TSDB存储了在一段时间内通过多个公司制造的各种设备(标签)测量的温度(测量值)和值(字段数据),测量了不同城市的温度。

但TSDB可以促进对各种数据的持续监控,如物联网传感器、市场交易数据和证券交易所数据。本文将向您展示如何使用C++库设置、配置、连接、写入和查询InfluxDB数据库。

InfluxDB是一个开源的时序数据库,旨在处理高写入和读取/查询时间戳数据负载。除了提供服务器度量或应用程序监控等持续数据流外,InfluxDB还提供事件生成、实时分析、自动过期和删除备份存储的数据以及轻松预测分析时序数据的方法。

您将学习如何将真实的物联网传感器数据存储在InfluxDB数据库中,然后分析存储的数据作为时间段的样本块。最后,我们将使用图表来绘制数据。

设置InfluxDB服务器

本教程基于Ubuntu 20.04 LTS设置,但您可以在InfluxDB文档中找到其他安装支持。您可以在此处找到本教程的源代码

sudo apt-get install influxdb
 sudo systemctl start influxdb
 sudo systemctl status influxdb

systemctl status influxdb 应显示运行状态

influxdb.service - InfluxDB is an open-source, distributed, time series database
 Loaded: loaded (/lib/systemd/system/influxdb.service; enabled; vendor preset: enabled)
 Active: active (running) since Sat 2021-11-20 14:24:56 IST; 39s ago
 Docs: man:influxd(1)
 Main PID: 23892 (influxd)
 Tasks: 13 (limit: 4505)
 Memory: 4.5M
 CGroup: /system.slice/influxdb.service
 ??23892 /usr/bin/influxd -config /etc/influxdb/influxdb.conf

本教程的InfluxDB服务器版本为1.6.4。您可以通过运行$influxd获取有关服务器配置的更多信息,并使用/etc/influxdb/influxdb.conf调整您的配置。

设置influxdb-cxx C++客户端库

本教程使用influxdb-cxx客户端库,版本0.6.7,用于连接、填充和查询InfluxDB服务器。InfluxDB服务器正在监听IP地址127.0.0.1和TCP端口8086。

先决条件

如前所述,本教程基于Ubuntu 20.04 LTS设置。以下为其他软件依赖列表

  • C++ 17编译器。您可以通过sudo apt-get install g++安装g++ 9.3.0。
  • CMake 3.12+ 工具。您可以通过以下命令安装 CMake 3.16.3:sudo apt-get install cmake
  • Curl 库。您可以通过以下命令安装 curl4-openssl:sudo apt-get install libcurl4-openssl-dev
  • Boost 1.57+ 库(可选)。通过以下命令安装 Boost 1.71 c:sudo apt-get install libboost1.71-all-dev

获取 influxdb-cxx 客户端库

创建一个新的目录并执行以下命令

$git clone https://github.com/offa/influxdb-cxx

构建 influxdb-cxx 客户端库

$cd influxdb-cxx
 $mkdir build && cd build

如果不需要测试套件,可以在运行 cmake 时将以下选项设置为 OFF

$cmake -D INFLUXCXX_TESTING:BOOL=OFF ..

安装 influxdb-cxx 客户端库

$sudo make install

客户端库构建 libInfluxDB.so 并安装到 /usr/local/lib/libInfluxDB.so。头文件 InfluxDBFactory.hInfluxDB.Point.hTransport.h 安装到 /usr/local/include/

编译和链接源代码时分别需要头文件和库文件。

C++ 应用程序在链接时使用 libInfluxDB.so 以生成可执行二进制文件。

-----------------
 |libInfluxDB.so |
 -----------------
 ^
 | <<link>>
 |
 --------------- <<compile>>
 | C++ program | ----------> InfluxDBFactory.h
 --------------- | InfluxDB.h
 | | Point.h
 |<<generates +--> Transport.h
 | executable>>
 ------------
 |Executable|
 ------------

influxdb-cxx 客户端库类设计

influxdb-cxx 客户端通过类 InfluxDB 与 InfluxDB 服务器交互。所有 influxdb-cxx 类 都定义在全局命名空间 influxdb 下。

--------------------
 | InfluxDBFactory |---o get() [protocol]://[username:password@]host:port[?db=database]
 -------------------- \ --------------
 | \<<connects>> ---->| TSDB |
 |<<returns>> v / |Time-Series |
 | ------------------- / | Database |
 | +---------->| InfluxDB server | -- --------------
 | / -------------------
 | +--+
 | |
 ------------
 | | ----o createDatabaseIfNotExists()
 | | ----o write()
 | InfluxDB | ----o query()
 | | --o batchOf()
 | | --o flushBatch()
 ------------
 |
 v
 <<Measurement>>
 ----------------
 | Point | ----o addField()
 |--------------|
 | +measurement | ----o setTimestamp()|
 ----------------
 |
 o
 addTag()

InfluxDBFactory 类提供了 get() 方法,该方法接受一个包含 IP 地址、端口(可选协议名称)、数据库名称、数据库用户和密码的 URI 字符串。HTTP URI 默认为 tcpInfluxDBFactory 返回一个连接到 InfluxDB 服务器的 InfluxDB 类。

InfluxDB 类提供了创建数据库、写入数据库和查询数据库的方法。InfluxDB 类将点(或点列表)写入数据库——点类代表一个测量容器。

由于测量容器包含 tagfieldtimestamp,因此 point 类提供了添加 tagfieldtimestamp 的方法。在 InfluxDB 类上调用 write() 方法并将 point 作为参数传递,首先根据行协议规则序列化点类数据。

然后将最终的行协议数据发送到 InfluxDB 服务器。InfluxDB 类也支持批量模式,其中 batchOf() 方法指定批量大小。addPointToBatch() 方法接受要添加的点列表,而 flushBatch() 方法最终写入数据库。

建立连接

使用 URI 字符串调用 InfluxDBFactory.get() 方法创建与 InfluxDB 服务器的连接。指定 URI 如下:[protocol]://[username:password@]host:port[?db=database]

例如,您可以通过以下方式调用 get() 方法:influxdb::InfluxDBFactory::Get("https://127.0.0.1:8086?db=temperature_db")

在这里,HTTP (TCP) 是协议,localhost (127.0.0.1) 是 IP 地址,8086 是 TCP 端口,而 temperature_db 是数据库名称。您可以使用 HTTP (TCP)、UDP 和 Unix Socket 作为协议。IP 地址、协议、用户名、密码、端口号和数据库名称也可以通过 配置文件 /etc/influxdb/influxdb.conf 进行配置。

在此示例中,您可以通过指定数据库名称建立连接。如果不存在,则创建数据库。

auto db = influxdb::InfluxDBFactory::Get("https://127.0.0.1:8086?db=temperature_db");
db->createDatabaseIfNotExists();
#include <iostream>
#include <InfluxDBFactory.h>
int main(int argc,char *argv[]) {
 auto db = influxdb::InfluxDBFactory::Get("https://127.0.0.1:8086?db=temperature_db");
 db->createDatabaseIfNotExists();
 for (auto i: db->query("SHOW DATABASES")) std::cout<<i.getTags()<<std::endl;
 return 0;
}

编译、链接和创建可执行文件

在新的目录中,创建一个名为 CMakeLists.txt 的文件。

$mkdir cxx-influxdb-example && cd cxx-influxdb-example

将以下行添加到 CMakeLists.txt 中。

cmake_minimum_required(VERSION 3.12.0)
project(cxx-influxdb-example)
find_package(InfluxDB)
add_executable(example-influx-cxx main.cpp)
target_link_libraries(example-influx-cxx PRIVATE InfluxData::InfluxDB)

cxx-influxdb-example 是项目名称,而 example-influx-cxx 是可执行文件名称。

编写您的 main.cpp 代码,包括 influxdb-cxx 客户端头文件。

$mkdir build && cd build
 $cmake ..
 $make
 $./example-influx-cxx

输出

name=_internal
name=temperature_db

使用 SSL/TLS 连接

默认情况下禁用了 HTTP。您需要购买 SSL 密钥和证书。修改配置文件

/etc/influxdb/influxdb.conf

并修改或添加以下键值对

https-enabled=true
https-certificate=/etc/ssl/<signed-certificate-file>.crt (or to /etc/ssl/<bundled-certificate-file>.pem)
https-private-key=/etc/ssl/<private-key-file>.key (or to /etc/ssl/<bundled-certificate-file>.pem

接下来,运行以下命令

$sudo chown influxdb:influxdb /etc/ssl/<CA-certificate-file>
$sudo chmod 644 /etc/ssl/<CA-certificate-file>
$sudo chmod 600 /etc/ssl/<private-key-file>
$sudo systemctl restart influxdb

您可以在 InfluxDBFactory.get() 的 URI 参数中指定 https 而不是 http 来获取 TLS(SSL) 加密。

influxdb::InfluxDBFactory::Get("https://127.0.0.1:8086?db=temperature_db");

写入 InfluxDB 数据库

InfluxDB 数据库组织成各种度量。如前所述,度量就像一个容器,相当于 RDBMS 中的表。每个度量记录是一个点,是一个 (度量,标签,字段值,时间) 的元组。

Measurement - String
 Tag name,value - String
 Field name - String
 Filed value - String, Integer, Float and Boolean
 Timestamp - Unix nanosecond timestamps

您可以通过指定度量名称、标签名称、可选的值对列表、字段名称、值对列表和可选的时间戳来使用 InfluxDB.write() 方法将点写入数据库。缺少的时间戳将由 InfluxDB 数据库服务器添加。

auto db = influxdb::InfluxDBFactory::Get("https://127.0.0.1:8086?db=temperature_db");
db->write(influxdb::Point{"temperature"}.addTag("city","DALLAS").addTag("device","companyX").addField("value",28));

查询数据

您可以通过调用 InfluxDB.query() 方法并传递 SQL 查询字符串作为参数来查询 InfluxDB。InfluxDB 中的数据库记录以 JSON 格式返回,其中每个记录由 point 类表示。 Point.getName()Point.getTags()Point.getFields()Point.getTimestamp() 分别返回度量、标签、字段和时间戳。

#include <iostream>
#include <InfluxDBFactory.h>
int main(int argc,char *argv[]) {
 auto db = influxdb::InfluxDBFactory::Get("https://127.0.0.1:8086?db=temperature_db");
 for (auto i: db->query("select * from temperature")) {
 std::cout<<i.getName()<<":";
 std::cout<<i.getTags()<<":";
 std::cout<<i.getFields()<<":";
 std::cout<<std::to_string(std::chrono::duration_cast<std::chrono::nanoseconds>(i.getTimestamp().time_since_epoch()).count())<<std::endl;
 }
 return 0;
}

输出

temperature:city=DALLAS,device=companyX:value=28.000000000000000000:1637659246724538477

您可以通过设置 influxdb::Point::floatsPrecision=4 来设置浮点类型精度。

int main(int argc,char *argv[]) {
 auto db = influxdb::InfluxDBFactory::Get("https://127.0.0.1:8086?db=temperature_db");
 influxdb::Point::floatsPrecision=4;

输出

temperature:city=DALLAS,device=companyX:value=28.0000:1637659246724538477

批量写入 InfluxDB

在提及批量大小时,您可以批量写入数据。调用 flushBatch() 方法将数据刷新到数据库。

#include <iostream>
#include <InfluxDBFactory.h>
int main(int argc,char *argv[]) {
 auto db = influxdb::InfluxDBFactory::Get("https://127.0.0.1:8086?db=temperature_db");
 influxdb::Point::floatsPrecision=4;
 db->batchOf(10);
 for (int i=0;i<10;i++)
 db->write(influxdb::Point{"temperature"}.addTag("city","SEATTLE").addTag("device","companyY").addField("value",28+i));
 db->flushBatch();
 return 0;
}

使用 influxdb-client 工具进行调试

您还可以通过 Ubuntu 20.04 上可用的 influxdb-client 工具验证 InfluxDB 中可用的数据,通过 $sudo apt-get install influxdb-client

$ influx
Connected to https://127.0.0.1:8086 version 1.6.4
InfluxDB shell version: 1.6.4
> SHOW DATABASES
name: databases
name
----
_internal
temperature_db
> use temperature_db
Using database temperature_db
> SELECT * FROM temperature
name: temperature
time city device value
---- ---- ------ -----
1637661663965328258 SEATTLE companyY 28
1637661663965352861 SEATTLE companyY 29
1637661663965357288 SEATTLE companyY 30
 .....

完成的应用程序

完整的程序如下

#include <iostream>
#include <InfluxDBFactory.h>
int main(int argc,char *argv[]) {
 auto db = influxdb::InfluxDBFactory::Get("https://127.0.0.1:8086?db=temperature_db");
 influxdb::Point::floatsPrecision=4;
 db->createDatabaseIfNotExists();
 for (auto i: db->query("SHOW DATABASES")) std::cout<<i.getTags()<<std::endl;
 db->write(influxdb::Point{"temperature"}.addTag("city","DALLAS").addTag("device","companyX").addField("value",28));
 db->batchOf(10);
 for (int i=0;i<10;i++)
 db->write(influxdb::Point{"temperature"}.addTag("city","SEATTLE").addTag("device","companyY").addField("value",28+i));
 db->flushBatch();
 for (auto i: db->query("select * from temperature")) {
 std::cout<<i.getName()<<":";
 std::cout<<i.getTags()<<":";
 std::cout<<i.getFields()<<":";
 std::cout<<std::to_string(std::chrono::duration_cast<std::chrono::nanoseconds>(i.getTimestamp().time_since_epoch()).count())<<std::endl;
 }
return 0;

实验性物联网传感器的实时数据

测量功率、温度、湿度、光线、CO2 和灰尘的物联网传感器数据在 IoTSenser 中可用。每个数据样本都记录了时间戳

"2015-08-01 00:00:28",0,32,40,0,973,27.8
"2015-08-01 00:00:58",0,32,40,0,973,27.09
"2015-08-01 00:01:28",0,32,40,0,973,34.5
"2015-08-01 00:01:58",0,32,40,0,973,28.43
"2015-08-01 00:02:28",0,32,40,0,973,27.58
"2015-08-01 00:02:59",0,32,40,0,971,29.35
"2015-08-01 00:03:29",0,32,40,0,971,26.46
...

在示例代码中,使用 iotsensor 作为度量字段创建数据库 iotsensor_db

插入数据库中的数据并绘制图表的完整代码如下。请注意,图表是从 pbPlots 工具绘制的,该工具位于 Cpp 目录中。将 pbPlots.cpppbPlots.hppsupportLib.cppsupportLib.hpp 文件从 cxx-influxdb-example 目录复制并添加到 CMakeLists.txt 文件

cmake_minimum_required(VERSION 3.12.0)
project(cxx-influxdb-example)
find_package(InfluxDB)
add_executable(example-influx-cxx supportLib.cpp pbPlots.cpp demomain.cpp)
target_link_libraries(example-influx-cxx PRIVATE InfluxData::InfluxDB)
#include <iostream>
#include <string>
#include <fstream>
#include <boost/algorithm/string.hpp>
#include <InfluxDBFactory.h>
#include "pbPlots.h"
#include "supportLib.h"
#include <chrono>
std::chrono::system_clock::time_point createDateTime(int year, int month, int day, int hour, int minute, int second) // these are UTC values
{
 tm timeinfo1 = tm();
 timeinfo1.tm_year = year - 1900;
 timeinfo1.tm_mon = month - 1;
 timeinfo1.tm_mday = day;
 timeinfo1.tm_hour = hour;
 timeinfo1.tm_min = minute;
 timeinfo1.tm_sec = second;
 tm timeinfo = timeinfo1;
 time_t tt = timegm(&timeinfo);
 return std::chrono::system_clock::from_time_t(tt);
}
int main(int argc,char *argv[]) {
 auto db = influxdb::InfluxDBFactory::Get("https://127.0.0.1:8086?db=iotsensor_db");
 influxdb::Point::floatsPrecision=4;
 db->createDatabaseIfNotExists();
 for (auto i: db->query("SHOW DATABASES")) std::cout<<i.getTags()<<std::endl;
 std::ifstream file("sensor-data.csv");
 if (!file) {
 std::cout<<"Unable to open file"<<std::endl;
 return -1;
 }
 std::string dataline;
 std::vector<std::string> sensordatavector, linevector, timestampvector;
 while(getline(file,dataline)) {
 dataline.erase(remove( dataline.begin(), dataline.end(), '\"' ),dataline.end());
 sensordatavector.push_back(dataline);
 }
 std::cout<<"number of entries in sensor data "<<sensordatavector.size()<<std::endl;
 auto t_start = std::chrono::high_resolution_clock::now();
 db->batchOf(10000);
 int counter=0;
 for (std::string i : sensordatavector) {
 boost::algorithm::split(linevector, i, boost::is_any_of(","));
 boost::algorithm::split(timestampvector, linevector[0], boost::is_any_of(":- "));
 std::chrono::system_clock::time_point tss=createDateTime(std::stod(timestampvector[0]),std::stod(timestampvector[1]),std::stod(timestampvector[2]),std::stod(timestampvector[3]),std::stod(timestampvector[4]),std::stod(timestampvector[5]));
 db->write(influxdb::Point{"iotsensor"}
 .addField("power",std::stod(linevector[1]))
 .addField("temperature",std::stod(linevector[2]))
 .addField("humidity",std::stod(linevector[3]))
 .addField("light",std::stod(linevector[4]))
 .addField("co2",std::stod(linevector[5]))
 .addField("dust",std::stod(linevector[6]))
 .setTimestamp(tss));
 counter+=1;
 if (counter==10000) {
 db->flushBatch();
 counter=0;
 }
 }
 auto t_end = std::chrono::high_resolution_clock::now();
 std::cout<<"Total time (ms) for write "<<std::chrono::duration<double, std::milli>(t_end-t_start).count()<<std::endl;
 std::cout<<"database record count "<<db->query("select * from iotsensor").size()<<std::endl;
 std::vector<double> doublevector;
 std::vector<double> fieldvector[6];
 std::vector<std::string> fieldnamevector;
 sensordatavector.resize(0);linevector.resize(0);
// for (auto i: db->query("select * from iotsensor")) {
 for (auto i: db->query("select * from iotsensor where time> '2015-08-04 00:00:00' and time<'2015-08-08 01:00:00'")) {
 doublevector.push_back(std::chrono::duration_cast<std::chrono::nanoseconds>(i.getTimestamp().time_since_epoch()).count());
 boost::algorithm::split(sensordatavector, i.getFields(), boost::is_any_of(","));
 for (int j=0;j<6;j++) {
 boost::algorithm::split(linevector, sensordatavector[j], boost::is_any_of("="));
 fieldnamevector.push_back(linevector[0]);
 fieldvector[j].push_back(std::stod(linevector[1]));
 }
 }
 RGBABitmapImageReference *imageref=CreateRGBABitmapImageReference();
 size_t length;
 double *pngdata;
 std::string filename;
 for (int j=0;j<6;j++) {
 DrawScatterPlot(imageref, 600, 400, &doublevector[0], doublevector.size(),&fieldvector[j][0],doublevector.size());
 pngdata=ConvertToPNG(&length,imageref->image);
 filename="plot_partial_"+fieldnamevector[j]+std::to_string(j)+".png";
 WriteToFile(pngdata,length,(char*)filename.c_str());
 }
return 0;
}

输出

name=_internal
name=temperature_db
name=iotsensor_db
number of entries in sensor data 88688
Total time (ms) for write 5874.81
database record count 79992

在具有每个点六个字段的数据库中写入 79992 个点需要 5874.81 毫秒。

plot_co2

plot_dust

plot_humidity

plot_light

plot_power

plot_temperature

记录时间戳在 "2015-08-04 00:00:00" 和 "2015-08-08 01:00:00" 之间。

plot_co2-2

plot_dust-2

plot_humidity-2

plot_light-2

plot_power-2

plot_temperature-2

结论

总结一下,时间序列数据库 (TSDB) 与 RDBMS 不同,因为 TSDB 在备份存储的时间序列数据方面高度专业化。因此,数据的写入和查询以闪电般的速度发生。

为您的应用程序,请查看InfluxDB,这是一个提供时间序列数据指标、事件和数据分析的数据库。