使用NiFi和InfluxDB构建IoT数据流
作者:Craig Hobbs / 产品,用例,开发者
2019年6月19日
导航至
Apache NiFi 是物联网领域的一个变革者,允许您自动从物联网、任何边缘或传感器将数据转换为几乎任何您想要的地方。除了支持具有严格的安全、数据溯源和合规性要求的任务关键型数据量外,NiFi还为开发者提供了一种基于拖放的用户界面进行流式编程,并自动部署新的流程。
通过结合NiFi和InfluxDB,行业可以轻松使他们的物联网数据流安全可用。该解决方案使企业能够在所有设施中实现数据的单一视图,从而提供主动维护、故障检测和其他基于业务的结果。
物联网挑战:基于OPC的数据
对于这个例子,我们将查看来自工厂自动化和流程制造业的 OPC数据。OPC以一致的方式使制造企业中的生产和业务应用程序能够访问实时工厂信息。然而,我们企业客户的一个常见挑战是如何将他们的设施或企业历史记录与现代化工具集和数据分析流程连接起来,同时确保它们之间的数据流连续不断。
这就是NiFi与InfluxDB平台组合能够在几分钟内提供强大解决方案的地方,以安全地收集、观察并采取您的设施数据。
开始使用NiFi-OPCUA-bundle
首先,从Tempus IIoT框架下载以下NiFi服务与处理器bundle,用于工业数据收集和分析。这些处理器和相关控制器服务允许NiFi以只读方式访问OPC UA服务器。
Tempus bundle中包含2个处理器,GetOPCNodeList和GetOPCData。GetNodeIds允许访问OPC UA服务器中当前存在的标签。GetOPCData接受一个标签列表,并查询OPC UA服务器以获取其值。
配置GetNodeList处理器
首先将GetOPCNodeList处理器添加到NiFi画布上
添加后,右键单击GetOPCNodeList处理器并选择配置。在“属性”选项卡中,选择值为OPC UA Service Property
字段的值框,并在随后对话框中选择创建新的服务,并添加一个StandardOPCUAService的新实例。
接下来,配置处理器过滤器,使其只返回您感兴趣抓取的节点列表标签项。使用Node Filter Property 定义一个以管道( |
)分隔的正则表达式节点列表。 |
此外,您可能还想设置Recursive Depth Property
,以确定处理器将浏览OPC服务器命名空间的分支深度。
配置处理器完成后,单击控制器服务右侧的编辑箭头图标。这会将您带到控制器服务配置窗口。
开始编辑StandardOPCUAService 控制器,输入您的OPC UA服务器的端点信息
接下来,更新安全设置握手模式,以匹配您的OPC-服务器上可用的模式之一
设置Authentication Policy Property
来定义NiFi如何与UA服务器进行身份验证。在这里,除了安全策略模式外,我还添加了用户名/密码凭据以进行访问。
您现在有了访问OPC-服务器所需的信息。一旦完成控制器配置,点击应用并启用控制器。
配置GetOPCData处理器
接下来,将GetOPCData OPC 处理器添加到您的NiFi画布上。在处理器上右键单击,从上下文菜单中选择配置。单击“属性”选项卡,并填写以下信息。
记录流模式的Schema Registry服务
GetOPCData 处理器的输出数据本质上产生CSV风格的数据(除标题外)的格式为标签名、时间戳、值和状态。
现在我们将添加AvroSchemaRegistry控制器服务。这告诉基于记录的处理程序如何解释OPC标签项。
该服务将以与我们添加OPCUAClientService?相同的方式添加。再次在GetOPCData处理器上单击编辑箭头图标以进行配置。
接下来,在NiFi流配置屏幕的右上角,单击加号(+)图标以添加新的控制器服务
将AvroSchemaRegistry添加到控制器服务中。每个属性都是一个名称-模式对,该流可以使用它将数据与模式相关联。
通过单击加号图标并命名为opcData来添加属性,然后复制avro到值中。
模式如下面的avro所示
{
"type": "record",
"name": "opcData",
"fields": [
{ "name": "uri", "type": "string" },
{ "name": "ts", "type": "string" },
{ "name": "value", "type": "float" },
{ "name": "status", "type": "int" }
]
}
您现在有了访问OPC-服务器所需的信息。一旦完成控制器配置,点击应用并启用控制器。
您现在拥有了使用动态Avro模式读取OPC-Data记录所需的信息。配置完成后,点击应用并启用控制器。
InfluxDB记录处理器用于Apache NiFi
现在,是时候将InfluxDB处理器连接起来以导入OPC数据记录了。为了提供最佳的数据导入性能,InfluxDB创建了一个基于NiFi记录设计的全新处理器。
入门指南
下载并安装InfluxDB处理器。将适当的nar文件复制到您的NiFi安装的lib目录($NiFi_HOME/lib)中,并重新启动NiFi。
https://github.com/influxdata/nifi-influxdb-bundle
回到NiFi画布,添加PutInfluxDatabasesRecord
处理器。将其连接到之前的GetOPCData
处理器。
右键单击以配置处理器,并选择属性选项卡。从Record Reader Property
开始,选择CSVReader
控制器服务。
添加后,点击编辑箭头图标进行配置。
在CSVReader
的控制器服务属性中,您将设置模式访问策略和模式名称。
我们之前已将名为opcData
的OPC-Data Avro模式注册。点击应用并启用控制器服务。
返回到PutInfluxDatabasesRecord
处理器,回到属性选项卡。您需要在InfluxDB控制器服务属性中创建一个新的StandardInfluxDatabaseService
。
创建服务后,点击编辑箭头图标进行配置。
在服务配置窗口中,定义您的InfluxDB数据库、URL和访问凭证。这将创建一个所有NiFi处理器可共享的连接服务。
完成后,点击应用并启用StandardInfluxDatabaseService
控制器。
最后,回到PutInfluxDatabasesRecord
处理器的属性选项卡,您现在可以设置指定的属性以将记录的内容写入InfluxDB数据库。
由于我们的记录中没有包含具有Measurement
属性值的字段,我们将将其简单地设置为任意名称(opc)。
使用来自我们模式的名称对,将Tags
、Fields
和Timestamp
属性设置为opcData模式中定义的名称。
最终处理器流程和控制器服务
最后,通过启用所有处理器和控制器,实现了一种智能的解决方案,当满足一定条件时(即使下游泵关闭,也会记录高泵压力),在WAN上收集和传输更多数据到数据中心。一旦控制器服务的配置完成,配置流程只需要大约5分钟。
控制器服务
AvroSchemaRegistry
- 用于注册和访问模式的服务CSVReader
- 解析CSV格式的数据,将文件中的每一行作为记录返回StandardInfluxDatabaseService
- 提供到InfluxDB的连接的服务StandardOPCUAService
- 从OPC UA服务器获取响应
处理流程
GetOPCNodeList
- 访问当前在OPC UA服务器中的标签GetOPCData
- 从OPC UA服务器获取值PutInfluxDatabaseRecord
- 将结构化到InfluxDB的NiFi记录写入
现在,只剩下启动您的NiFi流程了。
注意:以上流程示例仅用于测试。在生产环境中,理想情况下应具有更多的错误处理处理器和重定向队列。
在Chronograf中探索您的数据
现在您可以轻松地进入Chronograf并开始创建传感器数据的仪表板。点击数据探索器,在nifi.autogen数据库中找到其他传感器字段。一旦您开始看到数据,就可以开始创建一些仪表板。注意:以上流程示例仅用于测试。在生产环境中,理想情况下应具有更多的错误处理处理器和重定向队列。
结论
如果您受限于传统系统的分析能力,并希望利用基于OPC的数据在更现代的工具集中,NiFi + InfluxDB是一个强大而快速部署的解决方案,同时确保安全性。
使用此组合,管理员可以轻松快速地集成到企业级的自动化和业务系统中。系统集成商可以移除传统专有工厂地面设备和其他制造软件之间的遗留障碍。