R 和 InfluxDB 入门
作者:社区 / 产品, 用例, 开发者
2022 年 4 月 6 日
导航至
本文由 Gourav Singh Bais 撰写,最初发表于 The New Stack。
作为数据专业人士,您可能会遇到一些数据集,其中只有少数几个独立变量(输入变量)。其中一个变量是时间,另一个可以是任何类型的与时间相关的列,例如酒店的预订数量或航班上的乘客数量。
这种类型的数据被称为“时间序列数据”,它具有某种趋势并捕获时间点。有多种存储此类数据的方法,例如关系数据库或文件,如 CSV 或 Excel。但是,这些选项并非旨在有效存储时间序列数据。时间序列数据库应运而生,它们专门设计用于高效快速地存储时间序列数据。
在各种用例中,时间序列数据库 (TSDB) 的性能明显优于其他存储机制。考虑以下几个:
- 存储物联网数据: 时间序列数据库可以轻松地连续且定期地存储物联网数据,通过时间序列分析可以识别其中的季节性模式、平均消耗和低效率,时间序列分析提供带时间戳的数据点。
- 监控应用程序和基础设施: 公司可以存储有关其应用程序和基础设施使用情况的数据,并且他们稍后可以使用它来执行诸如异常检测或基础设施需求预测之类的任务。一些 Web 和移动应用程序会在 TSDB 中跟踪自己的事件,例如单击按钮、播放视频或共享某些内容。他们可以使用这些事件来绘制用户的旅程、突出显示困难或性能瓶颈,并简化更复杂的操作。
- 零售店销售预测: 零售店收集数据来预测销售额,这有助于他们管理供应链。
- 实时分析: 时间序列数据库可用于存储用于实时分析的数据,例如自动驾驶汽车数据。为自动驾驶汽车生成的数据量巨大且与时间相关,因此不可能将其存储在关系数据库中。时间序列数据库提供更快的写入和查询机制,有助于自动驾驶汽车实时执行操作。
此外,对于该数据类型,使用时间序列数据库而不是其他存储机制有几个优势。以下是一些原因:
- 可扩展性: 时间序列数据库专门用于处理大量的写入操作,并具有最终一致性,即使在分布式存储中也是如此,这意味着那些关心数据的人不必过于担心。
- 可用性: 仅将数据存储在 TSDB 中是不够的。必须能够快速访问它才能做出数据驱动的决策。在这里,可以随着时间的推移聚合数据,以使事务更快、更高效。
- 提高生产力: 时间序列数据库以简单的 API 形式轻松访问,并且可以使用不同的编程语言集进行访问。
一种广泛使用的时间序列数据库是 InfluxDB。InfluxData 公司创建了 InfluxDB,这是一种开源时间序列数据库。它用 Go 语言编写,用于存储和检索时间序列数据,适用于任何用例,包括运营监控、应用程序指标、物联网 (IoT) 传感器数据和实时分析。
要了解有关 InfluxDB 优势的更多信息,您可以参考 InfluxData 网站。
在本文中,您将学习使用 R 语言开始使用 InfluxDB 所需的知识,从安装、设置、查询、写入开始,最后,使用 R 构建一个简单的时间序列应用程序。
InfluxDB 客户端库
使用任何编程语言与 InfluxDB 交互的客户端必须能够连接到数据库,以便可以执行不同的数据库操作。influxdb-client-r 库可用于使用 R 连接到 InfluxDB。它是一个支持操作的软件包,例如写入数据、读取数据和获取数据库状态。此客户端库适用于 InfluxDB 2.0 版本。
让我们从使用 2.0 版本设置 InfluxDB 开始。InfluxDB 可在不同的平台上使用,例如 Windows、Linux 和 macOS。您将在本文中看到的示例已针对 macOS Big Sur 进行了测试,尽管在任何平台上安装它都很简单。
或者,您可以使用 InfluxDB 云 在几分钟内快速获得 InfluxDB 的免费实例,而无需在本地计算机上安装任何东西。
可以使用 Homebrew 在 macOS 上安装 InfluxDB
$ brew update
$ brew install influxdb influxdb-cli
或者,也可以在此处手动下载 InfluxDB 此处。
安装 InfluxDB 后,您可以使用以下代码启动它
$ influxd
首次启动 InfluxDB 时,它会要求您设置帐户,可以使用 UI localhost:8086 或命令行界面 (CLI) 执行此操作。对于 UI 设置,您必须打开 localhost URL 并提供初始设置所需的信息。如果您使用的是 CLI,则需要使用 InfluxDB 客户端执行此操作,该客户端可以使用以下代码在终端中启动
$ influx setup
对于初始设置,请注意以下详细信息
用户名: 您可以为初始用户选择任何用户名。
密码: 您需要创建并确认用于数据库访问的密码。
组织名称: 您需要选择初始组织名称。
存储桶名称: 需要初始存储桶名称,您可以创建任意数量的存储桶来使用。
保留期: 您的存储桶在删除数据之前将存储数据的时间段。您可以选择从不或将其留空以获得无限的保留期。
要在其他平台上安装 InfluxDB,请参阅以下链接。
安装 InfluxDB 并完成设置后,您可以登录 localhost:8086。您应该会看到如下屏幕
您可以浏览仪表板中包含的各种模块,但本文将主要关注那些您可以通过其连接到 InfluxDB 客户端的模块。从数据模块开始
在这里,您可以观察不同的部分,例如 Sources、Bucket、Telegraf、Scrapers 和 Tokens。要使用 R 与 InfluxDB 交互,您需要检查 Buckets 和 Tokens 部分。要连接到数据库,您需要生成一个只有您才能访问的私有令牌(密钥),以便您连接到不同的存储桶。
要生成此令牌,请导航到 Tokens 选项卡。在右侧,您将看到一个 Generate Tokens 按钮。此按钮有两个不同的部分
– 读/写令牌: 此令牌提供对不同存储桶的读写访问权限,可以将范围限制为(特定存储桶)或提供给所有可用的存储桶。使用此令牌,您只能在一个组织中读取和写入数据。
– 全访问令牌: 此令牌提供对操作的完全访问权限,例如读取、写入、更新或删除每个存储桶。这将是推荐的令牌,您可以通过它连接到任何可用的存储桶,而无需任何显式配置,并且可以执行所有需要的操作,例如读取、写入、更新和删除。
就本文而言,您需要生成一个 全访问令牌。生成令牌后,您只需登录到 localhost 控制台即可随时访问它。
现在您已经设置好了 InfluxDB,您可以下载 R 和 RStudio 来编写和测试代码。安装 R 非常简单。您可以在此处下载软件包,然后打开并安装它。安装 R 后,您可以下载 RStudio,这将是您用来编写 R 代码的 IDE。您可以在此处下载 RStudio。
在此阶段,您几乎拥有连接到 InfluxDB 所需的所有工具和技术。作为最后一步,您需要安装 R 的 InfluxDB 客户端库,可以使用以下代码行下载该库
install.packages("influxdbclient")
如果您在 RStudio 上安装它,其他依赖项将与基本库一起下载。但是,如果依赖项未自动下载,您可以分别使用以下代码行下载它们
install.packages(c("httr", "bit64", "nanotime", "plyr"))
建立连接
下一步将是导入 R 中的 InfluxDB 客户端库 并创建 InfluxDBClient 的实例,该实例可用于与数据库交互并执行所有操作集。建立数据库连接所需的参数包括以下内容
- 令牌: 这指的是您使用控制台生成的访问令牌。您可以登录到 InfluxDB 仪表板并从那里复制令牌。
- 存储桶: 这需要您将在其上工作的存储桶的名称。您可以选择初始存储桶或使用仪表板创建一个新存储桶。
- 组织: 这是您在 InfluxDB 的初始设置期间命名的组织。
由于此连接将在本地建立,因此连接脚本应如下所示
## import the client library
library(influxdbclient)
# parameters needed to make connection to Database
token = "Paste Your Token Here"
org = "my-org"
bucket = "RInfluxClient"
## make connection to the influxDB bucket
client <- InfluxDBClient$new(url = "http://localhost:8086",
token = token,
org = org)
如果您使用的是云帐户,请确保 URL 参数与您的云帐户所在的区域匹配,而不是使用 localhost。您可以在文档中找到 URL 端点。
插入数据
现在您已经建立了与 InfluxDB 的连接,是时候使用数据执行不同的数据库操作了。为了理解这些操作,让我们看一下 2020 年 1 月至 2020 年 4 月全球 COVID-19 病例 的一些示例数据
此示例数据包含以下字段
- 日期: 采集观察值的日期。
- 病例: 当天活跃的 COVID 病例数。
- 地区: 报告病例地点的标识符。
- 国家/地区: 采集观察值的地点。
要在 R 中读取数据帧,您需要编写以下代码行
data<-read.csv("/Users/Uname/Downloads/covid_data.csv")
让我们首先将此数据插入 InfluxDB。为此,请使用 write()
方法,该方法接受如下参数
client$write(data, bucket, precision, measurementCol,
tagCols, fieldCols, timeCol, object)
注意: 上述方法只是一个函数定义,不是代码的一部分。
此方法采用以下参数
- data: 要存储在数据库中的数据帧或点列表。
- bucket: 您将在其中存储数据的存储桶名称。
- precision: 时间戳的精度。
- measurementCol: InfluxDB 中的度量类似于关系数据库中的表名。
- tagCols: 此列表示与数据相关的元数据。
- fieldCols: 您要存储在数据库中的列名。
- timeCol: 数据中的时间戳列。
- object: 用于调试写入操作的对象。
要使用 write()
方法将 COVID-19 数据存储在 InfluxDB 中,您需要确保您的时间戳列(Date)采用 POSIXct 格式。
## convert date column to POSIXct
data[['Date']] <- as.POSIXct(strptime(data[['Date']], format='%Y-%m-%d'))
## write data in influxDB
response <- client$write(data, bucket = "bucket", precision = "us",
measurementCol = "Cases",
tagCols = c("Region", "Country"),
fieldCols = c("Cases"),
timeCol = "Date")
来自 write()
函数的响应可以是 NULL、True 或错误。要调试 write()
函数并检查数据在数据库中的写入方式,您可以分配一个对象:lp
。
查询数据
现在您已将带时间戳的数据存储在数据库中,让我们尝试读取数据。要使用 R 客户端查询数据,可以使用 read()
函数,该函数需要 Flux 查询。对于查询,您可以使用您为写入数据创建的同一客户端,也可以创建一个新的 InfluxDB 客户端并执行相同的操作。
result <- client$query('from(bucket: "RInfluxClient") |> range(start: -2y) |> drop(columns: ["_start", "_stop"])')
让我们分解上面的查询。从关键字“from”开始,您需要首先指定存储桶名称,然后指定要从中选择数据的时间范围,最后指定一组条件。在上面的查询中,条件指定不包括数据库中的 start 和 stop 列。
结果包含数据库中针对指定期间的每个条目创建的数据帧列表。要检查它的实例,您可以使用以下代码
result[[1]][c("time", "_value")]
现在您已经查询了数据,让我们利用这些数据进行预测。在这里,您将在检索到的数据上训练时间序列模型,并尝试预测未来五天的病例数。让我们从查询后得到的结果创建一个数据帧
## create an empty dataframe to store all the results
df1 = data.frame()
## iterate over each entry and append it in created dataframe
for (r in result){
sub_df = r[c("time", "_value")]
print(sub_df)
df1 = rbind(df1, sub_df)
}
创建数据帧后,需要进行一些更改才能在其上应用时间序列模型。通常,此阶段是数据预处理。
## arrange dataframe on ascending order based on time
df1 = df1[order(df1$time),]
## change the date to YYYY-MM-DD format
df1[['time']] <- as.POSIXct(strptime(df1[['time']], format='%Y-%m-%d'))
## rename column _value to Cases
colnames(df1)[2] <- "Cases"
## convert double values to integer
df1$Cases <- as.integer(df1$Cases)
预处理之后,现在是时候创建数据的时间序列表示了。这将使用以下代码完成
## import library =
library(forecast)
## create time series representation of data
mts <- ts(df1[c("_value")], start = decimal_date(ymd(df1[1, "time"])),
frequency = 365.25 / 7)
## plotting the input data
plot(mts, xlab ="Weekly Data",
ylab ="Total Positive Cases",
main ="COVID-19 Pandemic",
col.main ="darkgreen")
最后,让我们将数据拟合到 预测模型 中,并预测未来五天的病例数
## fit model on the data
fit <- auto.arima(mts)
## make predictions for next 5 days
forecast(fit, 5)
## plot predictions
plot(forecast(fit, 5), xlab ="Weekly Data",
ylab ="Total Positive Cases",
main ="COVID-19 Pandemic", col.main ="darkgreen")
# saving the file
dev.off()
这就是如何访问和使用数据进行时间序列预测的方法,这只是时间戳数据的实际用例之一。完整的实现可以在这里找到。
有关优化 InfluxDB 性能的更多信息和最佳实践,请参阅文档。
结论
阅读本文后,您现在了解了如何在您的系统中设置InfluxDB,以及如何创建客户端,并使用 R 语言为您的时间序列用例写入和读取数据。InfluxDB 的一个主要优势是它几乎支持所有主要的编程语言。
存储时间序列数据有多种选择,但像 InfluxDB 这样的时间序列数据库可以更快、更大规模地完成这项工作。诸如物联网应用、自动驾驶汽车或实时应用分析等多种用例需要一次性插入从数万到数十万条数据条目。时间序列数据库以非常高的速度和实时性执行此任务,使任何从事实时时间序列应用程序的开发人员都可以轻松地采用它们。请务必考虑部署 InfluxDB,以便在您自己的应用程序中使用这些强大的功能。
关于作者
Gourav Singh Bais 是 ValueMomentum Inc. 的应用机器学习工程师。他擅长开发机器学习/深度学习管道、再训练系统以及将数据科学原型转化为生产级解决方案。他在同一领域工作了三年,并为包括财富 500 强公司在内的许多客户提供过服务,这为他提供了积累经验和技能的机会,可以为机器学习社区做出贡献。