Zeppelin、Spark 和 InfluxDB 用于大数据时间序列场景
作者:Anais Dotis-Georgiou / 产品, 用例, 开发者
2020年3月13日
导航至
您正在使用 InfluxDB 和 Telegraf。也许您正在写入每秒超过一百万个指标点。也许您已经使用 Flux 进行了一些数据探索。但是,您现在发现自己有点陷入困境。您需要处理和分析大量数据,并且您希望使用您喜欢的语言和您常用的库来完成这项工作。 InfluxData 一直在致力于一个解决方案,使您能够轻松解决数据分析问题——一个用于 Apache Zeppelin 的 InfluxDB 解释器,它支持 Apache Spark。
什么是 Apache Zeppelin?
如果您熟悉 Jupyter Notebook,那么您已经了解了一半。Apache Zeppelin 也是一个基于 Web 的笔记本,支持协作数据科学和数据分析。但是,Zeppelin 更进一步,因为它
- 支持快速简易的语言切换。您可以编写 SQL、Scala、Python,然后是 Markdown,以无缝地执行数据分析。
- 提供内置的可视化和动态表单,以轻松展示您的结果并创建输入表单。
- 允许您轻松运行 cron 作业。
- 支持带有依赖加载器的 Spark、PySpark、Spark R、Spark SQL。
- 允许您连接任何 JDBC 数据源,包括 PostgreSQL、MySQL、MariaDB、Redshift、Apache Hive 等。
- 支持 Python,集成 Matplotlib、Conda、Pandas SQL 和 PySpark。
什么是 Apache Spark?
Apache Spark 是一个“用于大数据的统一分析引擎”。更具体地说,正如维基百科所描述的,它是一个“开源分布式通用集群计算框架。Spark 提供了一个接口,用于对整个集群进行编程,具有隐式数据并行性和容错能力。”
使用 Zeppelin 和 Spark 探索 InfluxDB
既然我们已经解释了各个组件,让我们通过一个快速教程来展示一起使用这些技术的强大功能。
在本教程中,我们将介绍如何
- 使用 Java 客户端查询您的 InfluxDB 实例
- 将其转换为 dataframe
- 将 dataframe 并行化为可以并行操作的分布式数据集
- 将并行化的 dataframe 保存为临时视图,以便我们可以在 SparkSQL 中查询和可视化数据
最后,如果您担心在使用客户端查询数据后再进行任何类型的分析,我建议您阅读本教程后面的部分。在其中,我将涵盖有关大数据时间序列管道的一些常见疑虑和问题。
步骤一:导入
我们首先导入我们的依赖项。
import sqlContext.implicits._
import java.sql.Timestamp
import com.influxdb.annotations.{Column, Measurement}
import com.influxdb.client.InfluxDBClientFactory
import com.influxdb.query.FluxRecord
println("Spark version:" + sc.version)
println("Scala version:" + util.Properties.versionString)
步骤二:生成 Flux 查询
我们定义一个 case class,编写一个 Flux 查询并将其存储为常量。
case class Cpu(num: String, usage: Double, time: Timestamp)
val fluxQuery = ("from(bucket: \"my-bucket\")\n"
+ " |> range(start: -1h)"
+ " |> filter(fn: (r) => (r[\"_measurement\"] == \"cpu\" and r[\"_field\"] == \"usage_system\"))")
步骤三和四:使用 Java 客户端查询您的 InfluxDB 实例并将结果并行化到 Spark
我们使用 Java InfluxDB v2.0 客户端 来查询我们的 InfluxDB 实例。首先,我们必须提供适当的身份验证参数。接下来,我们将查询的输出映射到我们的 case class。最后,我们并行化结果。调用 parallelize 方法会将现有集合复制以形成可以并行操作的分布式数据集。换句话说,该方法处理从 Scala Seq 到 Spark 分布式数据集的转换。然后使用 toDF 将结果转换为 dataframe。
val cpu_frame1 = sc.parallelize(
InfluxDBClientFactory.create("http://localhost:9999", "my-token".toCharArray)
.getQueryApi.query(fluxQuery, "my-org").get(1)
.getRecords.toArray(Array.ofDim[FluxRecord](0))
.map(r => Cpu(r.getValueByKey("cpu").toString, r.getValue.toString.toDouble, Timestamp.from(r.getTime)))).toDF()
步骤五:将 DataFrame 注册为临时视图
现在我们可以创建一个 dataframe 的临时视图。临时视图在 SparkSQL 中。将 Spark dataframe 注册为临时视图允许我们使用 SparkSQL 查询 dataframe。
cpu_frame1.createOrReplaceTempView("cpu1")
步骤六:查询和可视化
使用 SQL 语句查询我们的数据,并利用内置的可视化功能。
%sql
select * from cpu1
大数据管道问题和疑虑
通常,当我建议使用客户端执行数据分析或数据科学时,我会遇到相同的反应:叹息、坐立不安、眼神呆滞。我可以读懂人们脸上的疑问:- 如果反正我要把数据拉出来,我为什么还要费心将数据存储在 InfluxDB 中呢?
- 为什么你们没有工具让我可以在服务器端完成这项工作呢?
- 我真的必须做额外的这些工作吗?
- InfluxDB 是一个时间序列数据湖。您可以在其中存储所有原始时间序列数据。许多平台根本无法处理 InfluxDB 提供的这种数据摄取速率。但是,在某些用例中,将数据集移动到数据仓库非常有意义。将特定数据移动到数据仓库为您提供了更结构化的数据分析管道。将唯一数据集保存在不同的仓库中,使您可以将特定的分析与隔离的那些单独数据集配对。
- 简短回答:您确定您真的想要那样做吗?冗长回答:Flux 支持服务器端的数据探索、准备和初步分析。但是,事实证明,大多数数据科学家更喜欢使用他们喜欢的语言和熟悉的库进行繁重的数据分析。此外,InfluxDB v1.x 确实允许您在服务器端完成这项工作。InfluxDB v1.x 具有 Kapacitor,一个数据处理引擎,允许您创建 UDF。但是,事实证明,学习和使用 Kapacitor 独特的脚本语言对于大多数数据科学家来说很麻烦。我建议使用 Flux 来评估哪些数据需要移动到数据仓库,以便您可以利用复杂的数据科学库和分布式计算。
- 我希望在回顾本教程后,您确信使用客户端查询数据并集成 Spark 并不是额外的努力。它实际上为我节省了时间。
大数据时间序列的考虑事项和后续步骤
InfluxDB 解释器 PR 仍在 Apache Zeppelin 社区的审核阶段。但是,与此同时,如果您想在 PR 合并之前试用,您仍然可以从 source 构建带有 InfluxDB 解释器的 Zeppelin。我希望您喜欢本教程,并和我一样对这项工作感到兴奋。与往常一样,请在评论区、我们的社区网站或我们的 Slack 频道中分享您的想法、疑虑或问题。我们很乐意获得您的反馈,并帮助您解决遇到的任何问题!