使用R和InfluxDB入门

导航到

本文由Gourav Singh Bais撰写,最初发表在The New Stack上。

作为数据专业人士,你可能会遇到一些具有少量独立变量(输入变量)的数据集。一个变量将是时间,另一个可以是任何类型的时间相关列,例如酒店预订数量或航班乘客数量。

这类数据被称为“时序数据”,它具有某种趋势并捕捉了某个时间点。存储这类数据的方法有很多,如关系数据库或CSV或Excel等文件。然而,这些选项并不是为高效存储时序数据而设计的。这就引入了时序数据库,它们专门设计用于高效快速地存储时序数据

有许多用例中,时序数据库(TSDB)比其他存储机制表现显著更好。考虑以下一些

  1. 存储物联网数据:时序数据库可以轻松存储物联网数据,并按固定时间间隔持续存储,通过时间序列分析可以识别季节性模式、平均消耗和低效,这些分析提供了带时间戳的数据点。
  2. 监控应用程序和基础设施:公司可以存储有关其应用程序和基础设施使用情况的数据,并可以在以后用于异常检测或预测基础设施需求等任务。一些Web和移动应用程序会在TSDB中跟踪自己的事件,例如点击按钮、播放视频或分享某些内容。他们可以绘制用户的旅程图,突出显示困难或性能瓶颈,并利用这些事件简化更复杂的操作。
  3. 零售店销售预测:零售店收集数据以预测销售,这有助于他们管理供应链。
  4. 实时分析:时序数据库可以用于存储用于实时分析的数据,例如自动驾驶汽车数据。为自动驾驶汽车生成的大量数据具有很强的时间和依赖性,无法存储在关系数据库中。时序数据库提供了更快的写入和查询机制,有助于自动驾驶汽车实时执行操作。

此外,使用时序数据库而不是其他存储机制来存储该类型数据具有几个优点。以下是一些原因

  1. 可扩展性:时序数据库专注于更高的写入次数和最终一致性,即使在分布式存储中也是如此,这意味着那些关心数据的人更少焦虑。
  2. 易用性:在TSDB中存储数据还不够。必须能够快速访问它以做出数据驱动的决策。在这里,数据可以按时间聚合,以加快交易速度并提高效率。
  3. 提高生产力:时序数据库可以通过简单的API轻松访问,并可以使用不同的编程语言集访问。

一个广泛使用时序数据库是InfluxDB。InfluxData公司创建了InfluxDB,这是一个开源时序数据库。它使用Go语言编写,用于存储和检索任何用例的时序数据,包括运营监控、应用程序指标、物联网(IoT)传感器数据和实时分析。

要了解更多关于InfluxDB的好处,您可以参考InfluxData的网站

本文将为您介绍如何使用R语言开始使用InfluxDB,从安装、设置、查询、写入数据,最后使用R构建一个简单的时间序列应用程序。

InfluxDB客户端库

使用任何编程语言与InfluxDB交互的客户端必须能够连接到数据库,以便执行不同的数据库操作。可以使用influxdb-client-r库使用R连接到InfluxDB。这是一个支持写入数据、读取数据和获取数据库状态的包。此客户端库与InfluxDB版本2.0兼容。

让我们从使用版本2.0设置InfluxDB开始。InfluxDB可以在不同的平台上使用,例如Windows、Linux和macOS。本文中的示例已在macOS Big Sur上进行测试,尽管在任意平台上安装都很简单。

或者,您可以使用InfluxDB Cloud,在几分钟内快速获得一个免费运行的InfluxDB实例,而不需要在您的机器上本地安装任何东西。

使用Homebrew在macOS上安装InfluxDB

$ brew update
$ brew install influxdb influxdb-cli

或者,您可以在此链接手动下载InfluxDB。

InfluxDB安装完成后,您可以使用以下代码启动它

$ influxd

首次启动InfluxDB时,它将要求您设置账户,这可以通过UI localhost:8086 或命令行界面(CLI)进行。对于UI设置,您需要打开localhost URL并提供初始设置所需的信息。如果您使用CLI,您需要在终端中使用以下代码执行操作

$ influx setup

对于初始设置,请注意以下细节

用户名:您可以为初始用户选择任何用户名。

密码:您需要创建并确认用于数据库访问的密码。

组织名称:您需要选择初始组织名称。

桶名称:需要初始桶名称,您可以根据需要创建任意数量的桶。

保留期:桶在删除之前存储数据的时间段。您可以选择永不或留空以设置无限保留期。

有关在其他平台上安装InfluxDB的说明,请参阅以下链接

安装InfluxDB并完成设置后,您可以登录到localhost:8086。您应该看到如下屏幕

InfluxDB Home

InfluxDB首页

您可以通过仪表板查看包含的各种模块,尽管本文将主要关注您可以通过它们连接到InfluxDB客户端的模块。从数据模块开始

InfluxDB Load Data

InfluxDB加载数据

在这里,您可以观察不同的部分,如Telegraf抓取器令牌。要使用R与InfluxDB交互,您需要检查令牌部分。要连接到数据库,您需要生成一个私有的令牌(密钥),它只能由您访问,允许您连接到不同的桶。

要生成此令牌,请转到“令牌”标签。在右侧,您将看到一个“生成令牌”按钮。此按钮有两个不同的部分

– 读写令牌:此令牌提供对不同桶的读写访问权限,访问范围可以是特定的桶或提供对所有可用桶的访问。使用此令牌,您只能读取和写入组织中的数据。

Generate Read/Write Tokens InfluxDB

在 InfluxDB 中生成读写令牌

– 全访问令牌:此令牌提供对所有操作的完全访问权限,例如读取、写入、更新或删除每个桶。这将是推荐的令牌,通过它可以连接到任何可用的桶而无需任何显式配置,并且可以执行所有需要的操作,如读取、写入、更新和删除。

All-Access Token

全访问令牌

为了本文的目的,您需要生成一个 全访问令牌。生成令牌后,您可以通过简单地登录到 localhost 控制台随时访问它。

现在,您已经将 InfluxDB 设置好了,您可以下载 R 和 RStudio 用于编写和测试代码。安装 R 非常简单。您可以从这里下载软件包,然后打开并安装它。在安装 R 之后,您可以下载 RStudio,它将是您编写 R 代码的 IDE。您可以从这里下载 RStudio。

在这个阶段,您几乎拥有了连接到 InfluxDB 所需的所有工具和技术。作为最后一步,您需要安装 R 的 InfluxDB 客户端库,可以使用以下代码下载

install.packages("influxdbclient")

如果您在 RStudio 中安装它,则会下载与基本库一起的其他依赖项。但是,如果依赖项没有自动下载,您可以使用以下代码单独下载它们

install.packages(c("httr", "bit64", "nanotime", "plyr"))

建立连接

下一步是导入 R 中的 InfluxDB 客户端库 并创建一个 InfluxDBClient 实例,用于与数据库交互并执行所有操作集。建立数据库连接所需的参数包括以下内容

  • 令牌:这指的是您使用控制台生成的访问令牌。您可以从 InfluxDB 仪表板登录并复制该令牌。
  • 桶: 这需要您将要工作的桶的名称。您可以选择初始桶或使用仪表板创建新的桶。
  • 组织: 这是您在 InfluxDB 初始设置期间命名的组织。

由于此连接将在本地进行,连接脚本应如下所示

## import the client library
library(influxdbclient)
# parameters needed to make connection to Database
token = "Paste Your Token Here"
org = "my-org"
bucket = "RInfluxClient"
## make connection to the influxDB bucket
client <- InfluxDBClient$new(url = "https://127.0.0.1:8086",
                             token = token,
                             org = org)

如果您使用的是云账户,请确保 URL 参数与您的云账户所在的区域匹配,而不是使用 localhost。您可以在文档中找到URL 端点

插入数据

现在您已与 InfluxDB 建立了连接,是时候使用数据执行不同的数据库操作了。为了了解这些操作,让我们看一下 2020 年 1 月至 2020 年 4 月全球 COVID-19 病例 的示例数据

Sample data

示例数据

此示例数据包含以下字段

  • 日期:观测数据所取的日期。
  • 病例: 该日期活跃的 COVID 病例数。
  • 区域: 病例报告地点的标识符。
  • 国家: 观测值所在的地点。

要在R中读取数据框,您需要编写以下代码行

data<-read.csv("/Users/Uname/Downloads/covid_data.csv")

让我们首先将此数据插入到InfluxDB中。为此,使用write()方法,它接受如下参数

client$write(data, bucket, precision, measurementCol,
tagCols, fieldCols, timeCol, object)

注意:上述方法仅是函数定义,不是代码的一部分。

此方法接受以下参数

  • data: 要存储在数据库中的数据框或点列表。
  • bucket: 您将存储数据的桶名称。
  • precision: 时间戳的精度。
  • measurementCol: InfluxDB中的测量值类似于关系型数据库中的表名。
  • tagCols: 此列代表与数据相关的元数据。
  • fieldCols: 您想要存储在数据库中的列名。
  • timeCol: 您数据中的时间戳列。
  • object: 用于调试写入操作的实体。

要使用write()方法将COVID-19数据存储在InfluxDB中,您需要确保时间戳列(日期)是POSIXct格式。

## convert date column to POSIXct
data[['Date']] <- as.POSIXct(strptime(data[['Date']], format='%Y-%m-%d'))
## write data in influxDB
response <- client$write(data, bucket = "bucket", precision = "us",
                          measurementCol = "Cases",
                          tagCols = c("Region", "Country"),
                          fieldCols = c("Cases"),
                          timeCol = "Date")

write()函数的响应可以是NULL、True或错误。要调试write()函数并检查数据在数据库中的写入情况,您可以分配一个对象:lp

查询数据

现在,您的时间戳数据已存储在数据库中,让我们尝试读取数据。要使用R客户端查询数据,使用read()函数,它期望一个Flux查询。对于查询,您可以使用您创建用于写入数据的同一客户端,或者创建一个新的InfluxDB客户端并执行相同的操作。

result <- client$query('from(bucket: "RInfluxClient") |> range(start: -2y) |> drop(columns: ["_start", "_stop"])')

让我们分析上述查询。从关键字“from”开始,您需要首先指定桶名称,然后是您想要选择数据的范围,最后是一组条件。在上述查询中,条件指定了不要包括数据库中的起始和结束列。

结果包含每个数据库条目在指定期间创建的数据框列表。要检查其实例,可以使用以下代码

result[[1]][c("time", "_value")]

现在,您已查询到数据,让我们利用这些数据进行预测。在这里,您将对检索到的数据训练时间序列模型,并尝试预测未来五天的病例。让我们从查询结果创建一个数据框

## create an empty dataframe to store all the results
df1 = data.frame()
## iterate over each entry and append it in created dataframe
for (r in result){
    sub_df = r[c("time", "_value")]
    print(sub_df)
    df1 = rbind(df1, sub_df)
}

一旦创建了数据框,就需要进行一些更改才能在该数据框上应用时间序列模型。通常,这一阶段是数据预处理。

## arrange dataframe on ascending order based on time
df1 = df1[order(df1$time),]
## change the date to YYYY-MM-DD format
df1[['time']] <- as.POSIXct(strptime(df1[['time']], format='%Y-%m-%d'))
## rename column _value to Cases
colnames(df1)[2] <- "Cases"
## convert double values to integer
df1$Cases <- as.integer(df1$Cases)

预处理后,现在是创建我们数据的时间序列表示的时候了。这可以通过以下代码完成

## import library =
library(forecast)
## create time series representation of data
mts <- ts(df1[c("_value")], start = decimal_date(ymd(df1[1, "time"])),
                                     frequency = 365.25 / 7)
## plotting the input data
plot(mts, xlab ="Weekly Data",
      ylab ="Total Positive Cases",
      main ="COVID-19 Pandemic",
      col.main ="darkgreen")

Input data

输入数据

最后,让我们将数据拟合到预测模型中,并预测未来五天的病例

## fit model on the data
fit <- auto.arima(mts)
## make predictions for next 5 days
forecast(fit, 5)
## plot predictions
plot(forecast(fit, 5), xlab ="Weekly Data",
      ylab ="Total Positive Cases",
      main ="COVID-19 Pandemic", col.main ="darkgreen")
# saving the file
dev.off()

Model prediction

模型预测

这就是如何访问和使用时间序列数据来进行时间序列预测,这是时间序列数据的一个实际用例。整个实现可以在这里找到。

有关更多信息和InfluxDB性能优化的最佳实践,请参阅文档

结论

阅读完这篇文章后,您现在知道如何在系统中设置InfluxDB,以及如何使用R语言创建客户端,并为您的时间序列用例写入和读取数据。InfluxDB的一个主要优势是它支持几乎所有主要的编程语言。

存储时间序列数据有多种选择,但像InfluxDB这样的时间序列数据库可以在更高速度和更大规模上完成这项任务。一些用例,如物联网应用、自动驾驶汽车或实时应用分析,需要每次插入从数万到数十万条记录。时间序列数据库以非常高的速度和实时性执行此任务,使它们可以轻松地被任何正在开发实时时间序列应用的开发者所采用。务必考虑部署InfluxDB以在您的应用程序中使用这些优秀功能。

关于作者

Gourav-Singh-Bais

戈拉夫·辛格·拜斯是ValueMomentum Inc.的一名应用机器学习工程师。他擅长开发机器学习/深度学习管道、重新训练系统,以及将数据科学原型转化为生产级解决方案。他从事该领域工作已有三年,为包括财富500强公司在内的许多客户提供过服务,这让他有机会积累有助于机器学习社区的宝贵经验和技能。