使用 NiFi 和 InfluxDB 构建物联网数据流

导航至

Apache NiFi 已经彻底改变了物联网世界,让您可以自动化来自物联网和任何边缘传感器的数据转换和流动,几乎可以到达您想要的任何地方。除了支持具有严格安全性、数据出处和合规性要求的关键任务数据量外,NiFi 还为开发人员提供了基于流程编程的拖放式用户界面,以及新流程的自动“实时”部署。

通过结合使用 NiFi 和 InfluxDB,各行各业可以轻松地使其物联网数据流安全地访问和使用。该解决方案使企业能够拥有跨所有设施的单一视图数据,从而实现主动维护、故障检测和其他基于业务的成果。

物联网挑战:基于 OPC 的数据

在本示例中,我们将研究来自工厂自动化和过程制造的 OPC 数据。OPC 使整个制造企业的生产和业务应用程序能够以一致的方式访问实时工厂车间信息。然而,我们的企业客户面临的一个常见挑战是如何将他们的设施或企业历史记录与现代工具集和分析管道连接起来,同时还要保证它们之间的数据流 непрерывный。

NiFi 和 InfluxDB 平台组合可以在几分钟内提供强大的解决方案,以安全地收集、观察和处理您的设施数据。

Building data stream for IoT

开始使用 NiFi-OPCUA-bundle

首先从 Tempus IIoT 框架下载以下 NiFi 服务和处理器捆绑包,用于工业数据摄取和分析。这些处理器和相关的控制器服务允许 NiFi 以只读方式访问 OPC UA 服务器。

您将在 Tempus 捆绑包中找到 2 个处理器,GetOPCNodeListGetOPCData。GetNodeIds 允许访问当前在 OPC UA 服务器中的标签。GetOPCData 接受标签列表并查询 OPC UA 服务器以获取值。

配置 GetNodeList 处理器

首先将 GetOPCNodeList 处理器添加到 NiFi 画布

Get node list processor

添加后,右键单击 GetOPCNodeList 处理器并选择配置。在“属性”选项卡上,为 OPC UA Service Property 字段选择值框,并在以下对话框中选择“创建新服务”,以添加 StandardOPCUAService 的新实例。

OPCUA Service Property

接下来,配置处理器过滤器,使其仅返回您感兴趣的节点列表标签项。使用 Node Filter Property 定义管道( ) 分隔的节点正则表达式列表。

此外,您还需要为处理器将浏览到 OPC 服务器命名空间中的“深度”设置 Recursive Depth Property 。

Configure processor

完成处理器配置后,单击控制器服务右侧的编辑箭头图标。这将带您进入控制器服务配置窗口。

通过输入 OPC UA 服务器的端点信息,开始编辑 StandardOPCUAService 控制器

接下来,更新安全设置握手模式,以匹配 OPC 服务器上的可用模式之一

Configure controller service

设置 Authentication Policy Property 以定义 NiFi 应如何使用 UA 服务器进行身份验证。在这里,除了安全策略模式外,我还添加了用户名/密码凭据以进行访问。

Authentication policy property

您现在拥有访问 OPC 服务器所需的信息。控制器配置完成后,单击“应用”并启用控制器。

配置 GetOPCData 处理器

接下来,将 GetOPCData OPC 处理器添加到您的 NiFi 画布。右键单击处理器,然后从上下文菜单中选择“配置”以配置处理器。单击“属性”选项卡,然后填写如下信息。

Get OPC data

用于基于记录流的 Schema Registry 服务

GetOPCData 处理器的输出数据本质上生成 CSV 样式数据(减去标头),格式为标签名称、时间戳、值和状态。

Schema registry service

我们现在将添加 AvroSchemaRegistry 控制器服务。这告诉基于记录的处理器如何解释 OPC 标签项。

该服务的添加方式与我们添加 OPCUAClientService 的方式相同。再次单击 GetOPCData 处理器中的编辑箭头图标进行配置。

接下来,在 NiFi Flow Configuration 屏幕的右上角,单击加号 (+) 图标以添加新的控制器服务

Add new controller service

AvroSchemaRegistry 添加到 Controller Services。每个属性本质上是一个名称 schema 对,流程随后可以使用该名称 schema 对将数据片段与 schema 关联起来。

Building data stream

通过单击加号图标并将其命名为 opcData 来添加属性,然后将 avro 复制到值中。

Schema 看起来像下面的 avro

{
  "type": "record",
  "name": "opcData",
  "fields": [
    { "name": "uri", "type": "string" },
    { "name": "ts", "type": "string" },
    { "name": "value", "type": "float" },
    { "name": "status", "type": "int" }
     ]
  }

您现在拥有访问 OPC 服务器所需的信息。控制器配置完成后,单击“应用”并启用控制器。

您现在拥有使用动态 avro schema 读取 OPC 数据记录所需的信息。配置完成后,单击“应用”并启用控制器。

用于 Apache NiFi 的 InfluxDB 记录处理器

现在是时候连接 InfluxDB 处理器以摄取 OPC 数据记录了。为了提供最佳的摄取性能,InfluxDB 创建了一个新的处理器,该处理器基于 NiFi Record Design。

开始入门

下载并安装 InfluxDB Processors。将相应的 nar 文件复制到 NiFi 安装的 lib 目录 ($NiFi_HOME/lib) 并重启 NiFi。

https://github.com/influxdata/nifi-influxdb-bundle

返回 NiFi 画布并添加 PutInfluxDatabasesRecord 处理器。将其连接到之前的 GetOPCData 处理器

PutInfluxDatabasesRecord

右键单击以配置处理器并选择 PROPERTIES 选项卡。从 Record Reader Property 开始,然后选择 CSVReader 控制器服务。

添加后,单击编辑箭头图标进行配置

CSVReader

CSVReader 的控制器服务属性中,您将设置 Schema Access Strategy 和 Schema Name

Schema access strategy

我们之前已注册了名称为 opcData 的 OPC-Data avro schema。单击“应用”并启用控制器服务。

返回 PutInfluxDatabasesRecord 处理器并返回 properties 选项卡。您将需要为 InfluxDB Controller Service property 创建新的 StandardInfluxDatabaseService

创建服务后,单击编辑箭头图标进行配置

StandardInfluxDatabaseService

从服务配置窗口中,定义您的 InfluxDB 数据库、URL 和访问凭据。这将为所有 NiFi 处理器创建一个可共享的连接服务

NiFi processors

完成后,单击“应用”并启用 StandardInfluxDatabaseService 控制器。

最后,返回 PutInfluxDatabasesRecord 处理器的 properties 选项卡,您现在可以设置指定的属性,以将记录的内容写入 InfluxDB 数据库。

由于我们的记录不包含具有 Measurement 属性值的字段,我们将简单地将其设置为任意名称 (opc)

Measurement tags fields

使用我们 schema 中的名称对,将 TagsFieldsTimestamp 属性设置为 opcData schema 中定义的名称。

最终处理器流程和控制器服务

最后,将所有处理器和控制器结合在一起,实现了一个流程解决方案,该解决方案可以在满足特定条件时(即使下游泵关闭,也记录到高泵压)智能地收集更多数据并通过 WAN 传输到数据中心。控制器服务配置完成后,只需大约 5 分钟即可配置流程。

控制器服务

NiFi flow configuration

  • AvroSchemaRegistry - 用于注册和访问 schema 的服务
  • CSVReader - 解析 CSV 格式的数据,将文件中的每一行作为记录返回
  • StandardInfluxDatabaseService - 提供到 InfluxDB 连接的服务
  • StandardOPCUAService - 从 OPC UA 服务器获取响应

流程流

OPCUA ingestion

  • GetOPCNodeList - 访问 OPC UA 服务器中当前的标签
  • GetOPCData - 从 OPC UA 服务器获取值
  • PutInfluxDatabaseRecord - 将 NiFi 记录结构化写入 InfluxDB

现在剩下的就是启动您的 NiFi 流程。

注意: 上述流程示例仅供测试使用。在生产环境中,您最好有更多的错误处理处理器和重定向队列。

在 Chronograf 中浏览您的数据

您现在可以轻松跳转到 Chronograf 并开始在您的传感器数据上创建仪表板。单击“数据浏览器”,并在 nifi.autogen 数据库中查找其他传感器字段。开始看到数据后,您可以开始创建一些仪表板。注意:上述流程示例仅供测试使用。在生产环境中,您最好有更多的错误处理处理器和重定向队列。

Industrial automation NiFi

结论

如果您受到传统系统分析能力的限制,并且希望在更现代的工具集中利用基于 OPC 的数据,那么 NiFi + InfluxDB 是一个强大的组合,您可以快速安全地部署它。

使用这种组合,管理员可以轻松快速地集成到企业范围的自动化和业务系统中。系统集成商可以消除传统上专有的工厂车间设备与其他制造软件之间的遗留障碍。

Enterprise automation systems