NVIDIA Jetson 系列 - 第 2 部分 (视觉 AI 管道)
作者:Jay Clifford / 产品, 用例, 开发者
2021 年 11 月 30 日
导航至
希望您已经阅读了本系列的第一部分。如果您没有阅读,请不要阅读下面的任何剧透,并查看此处的第一部分。都看完了?很好!让我们继续。
在第 2 部分中,我们的目标是构建一个视觉 AI 管道,它可以作为 DIY 家庭安全系统的一部分使用。这是计划
NVIDIA DeepStream
那么什么是 NVIDIA DeepStream?
最简单的形式,NVIDIA DeepStream 本质上是一系列 GStreamer (单击此处查看我对 Gstreamer 的总结) 元素,这些元素经过优化,可以利用 NVIDIA GPU 加速。
除了 GPU 加速之外,NVIDIA 还提供了一个视觉分析 (Tensor RT) 和物联网插件 (MQTT、Kafka) 生态系统,使您无需学习基础库和系统。但这并不是说开发视觉 AI 解决方案是一件轻而易举的事情。希望本指南将展示如何使用 InfluxDB 和 Telegraf 加速您对此类复杂解决方案的开发。
视觉 AI 管道
既然您已经掌握了足够的背景知识,可以开始“冒险”了,那么让我们看一下我们的视觉管道。我将把它分解为两个图表
对于此解决方案,我们将使用连接到 NVIDIA Jetson 的 USB 网络摄像头。我们的视觉 AI 管道将摄取 USB 网络摄像头生成的原始帧,并通过对象检测模型馈送它们。该模型经过训练,可以检测四个类别:人、汽车、自行车和道路标志。生成的推理结果然后在两个并行输出插件中使用
- RTSP(实时流协议)服务器: 这允许我们将视频帧发送到网络可访问的端点,这意味着我们可以远程监控来自网络摄像头的输出。RTSP 协议通常在 CCTV(闭路电视)架构中使用。
- MQTT 客户端: 简而言之,MQTT 客户端允许我们将检测结果(也称为推理结果)发送到 Telegraf 和 InfluxDB 以进行进一步分析。我们将在稍后更详细地讨论这一点。
那么我们的视觉 AI 管道在 GStreamer 组件中是什么样的呢?
感觉很多,对吧?如果您想了解更多信息,请查看我的深入文档此处。
推理引擎和模型
那么什么是推理引擎和模型?
- 推理引擎 - 为在我们选择的设备(CPU、GPU 等)上运行神经网络提供必要的功能和优化。推理引擎的 API 提供了简化的方法来与您的模型交互,例如输入帧以进行处理并生成输出张量。
- 模型 - 我们的模型本质上是一个预训练的神经网络。在传统编程中,我们提供算法规则来作用于生成答案(如果它有四条腿并且会吠叫,那么就是狗)。神经网络被提供输入数据和答案,并推导出自己的规则来实现答案。
在我们的解决方案中,NVinfer 插件在模型部署方面做了大部分繁重的工作。NVinfer 功能包括
- 优化 - NVinfer 将使用 TensorRT 库自动优化模型框架(Caffe、UFF 和 ONNX)。
- 转换 - NVinfer 确保根据模型的输入要求适当缩放和转换帧。
- 处理 - NVinfer 部署 TensorRT 推理引擎,该引擎运行我们优化的模型。这将生成一个包含模型预测结果的元数据。TensorRT 引擎支持多类对象检测、多标签分类和分割)。
如果您想了解有关 TensorRT 引擎的更多信息,我强烈建议您查看这篇Medium 上的博客。
管道代码
代码相当长,因此我不会在本博客中全部介绍。有关完整代码,请查看我的存储库。
注意:代码的很大一部分来自 NVIDIA DeepStream 示例,可以在此处找到。我的演示基于 5.1 版本,因此请注意,当 6.0 版本发布时,它可能包含重大更改。
我的目标是提高代码的可读性和整体模块化,因为我们将扩展以包含我们自己的 Telegraf 和 InfluxDB 数据模型。我创建了一个类图,并突出显示了管道的一些核心机制此处。
Telegraf 和 InfluxDB
我们的管道目前正在处理来自网络摄像头的图像帧,通过我们的检测模型运行它们,并生成结果的网络流。如果您可以每天 24 小时观看流,那就太好了。那么我们如何收集推理结果并在更广泛的解决方案中使用它们呢?Telegraf 和 InfluxDB 几乎提供了我们开箱即用所需的一切。让我们来看看
MQTT 客户端
这里我可以选择两种方案
- 使用 Python 客户端库将数据直接写入 InfluxDB
- 将数据写入通用协议,并使用 Telegraf 摄取数据
方案 1 的优点是我们减少了一个需要维护的解决方案组件。我们还可以强烈控制何时以及如何将数据发送到 InfluxDB。缺点是我们降低了解决方案的互操作性。Telegraf 为将我们的推理结果发送到如此多的其他系统提供了巨大的范围。我选择了方案 2,我们已经使用 Telegraf 来收集我们的 Jetson 统计信息,因此我们只需向此 Telegraf 实例添加另一个输入插件。
实施
我选择使用 Paho MQTT 客户端,因为该库非常易于配置,并且是一个受良好支持的 Eclipse 项目。我决定创建一个 mqtt_client 类来进行初始化
class mqtt_client:
def __init__(self, address, port, clientID) -> None:
self.mqttBroker = address
self.port = port
self.clientID = clientID
self.client = None
def connect_client(self):
MQTT_KEEPALIVE_INTERVAL = 45
self.client = mqtt.Client(self.clientID)
self.client.connect(host=self.mqttBroker,port=self.port, keepalive=MQTT_KEEPALIVE_INTERVAL)
def connect_client_secure(self, username, password):
MQTT_KEEPALIVE_INTERVAL = 60
self.client = mqtt.Client(self.clientID)
self.client.connect(host=self.mqttBroker,port=self.port, keepalive=MQTT_KEEPALIVE_INTERVAL)
self.client = mqtt.Client(self.clientID)
self.client.connect(self.mqttBroker)
self.client.tls_set() # <--- even without arguments
self.client.username_pw_set(username=username, password=password)
def publish_to_topic(self, topic: str, data: dict):
message = json.dumps(data)
self.client.publish(topic, message)
然后我们在 osd_sink_pad_buffer_probe 函数 (inference_results.py) 中初始化 MQTT 连接
try:
mqttClient = mqtt_client("localhost", 1883 , "Inference_results")
mqttClient.connect_client()
except:
print("could not connect to MQTT Broker")
BROKER_CONNECT = False
推理结果在 l_frame 循环期间发送。请注意以下几点
- 我们有条件地仅每 30 帧发送一次结果。由于我们的网络摄像头每秒生成 30 帧,因此报告每帧没有意义,因为大多数对象在 1 秒内不会发生变化。
- 我们的元数据基于我们创建的 INFERENCE 数据结构保存。我们将在接下来讨论这一点。
注意:您可能想知道为什么我没有决定使用 nvmsgbroker 插件。这是一个个人偏好,因为我希望对我们提供给 Telegraf 的数据结构有更多控制权。可以实现相同的结果,但在 Starlark 中需要更多工作。
Broker
为了保持简单,我在我的 Jetson 设备上直接安装了 Eclipse Mosquitto MQTT broker。以下是安装说明
sudo apt-get update
sudo apt-get install mosquitto
sudo systemctl enable mosquitto
sudo systemctl start mosquitto
这会将 Mosquitto 作为服务安装在您的设备上。然后我们使用 systemctl enable 来确保 broker 在设备启动时启动。
注意:这会将 broker 配置为默认的不安全状态,端口为 1883。有关配置的更多信息,请参阅 Mosquitto 文档。
数据模型
如前所述,编写我们自己的 MQTT 客户端可以灵活地控制我们交付的数据模型。这是一个使用自定义模型的示例
{
"detection_frame_number":2280,
"total_num_obj":1,
"inference_results":[
{
"classID":2,
"label":"Person",
"confidence":0.67747300267219543,
"coor_left":1303.7781982421875,
"coor_top":139.6199951171875,
"box_width":467.4754943847656,
"box_height":937.4452514648438,
"objectID":18446744073709551615,
"unique_com_id":"OBJ1"
}
]
}
让我们分解一下
- detection_frame _number - 告诉我们进行检测的帧号。如果应用程序重新启动,帧号将重置。
- total_num_obj - 模型检测到的对象总数
- Inference_results - 一个数组,其中包含当前帧中每个检测的对象。每个对象包含以下内容
- classID - 检测分类的唯一标识符(1 = 汽车,2 = 人)。
- label - classID 的人类可读版本
- confidence - 介于 0 和 1 之间的数值,包含模型对其检测的置信度百分比 (0.6 = 60%)
- coor_left - 帧的左边缘与边界框的左上角之间的距离。
- coor_top - 帧的顶部边缘与边界框的左上角之间的距离。
- box_width - 从左点到右点的边界框宽度。
- box_height - 从顶点到底点的边界框长度。
- objectID - 如果您引入跟踪器,这将变得很重要。每个对象的唯一标识符。
- Unique_com_id - 由于我们没有使用跟踪器,因此为所有检测生成的 objectID 是等效的。我引入此字段是为了区分检测。
所以您可能想知道在当前数据结构中,我们如何区分给定时间间隔的不同并行检测。InfluxDB 只会覆盖该字段。请继续阅读 Telegraf 部分,了解如何实现。
Telegraf
因此,我们的 MQTT 客户端正在根据自定义数据模型将样本发布到 MQTT Broker。下一步是将数据导入 InfluxDB
输入插件:MQTT_Consumer
我们的摄取插件的配置非常简单
alias = "vision_pipeline"
interval = "5s"
name_override = "BW_Inference_results"
servers = ["tcp://localhost:1883"]
topics = [
"inference"
]
qos = 2
connection_timeout = "30s"
client_id = "1"
data_format = "json_v2"
[[inputs.mqtt_consumer.json_v2]]
[[inputs.mqtt_consumer.json_v2.object]]
path = "@this"
disable_prepend_keys = true
以上大多数插件设置都不言自明。如果您想了解有关它们的更多信息,请查看插件自述文件。我想介绍的是 json_v2 解析器。我已经在社区中多次看到它。在本例中,@this 表示我们的 JSON 对象的顶层。 disable_prepend_keys 将阻止任何字段名称在其父名称之前添加前缀。这是一个将解析后的 JSON 转换为行协议的示例
JSON
{'detection_frame_number': 0, 'total_num_obj': 2, 'inference_results': [{'classID': 0, 'label': 'Car', 'confidence': 0.40576171875, 'coor_left': 650.953125, 'coor_top': 361.28204345703125, 'box_width': 171.03515625, 'box_height': 190.78953552246094, 'objectID': 18446744073709551615, 'unique_com_id': 'OBJ1'}, {'classID': 0, 'label': 'Car', 'confidence': 0.2666015625, 'coor_left': 977.8388671875, 'coor_top': 512.3947143554688, 'box_width': 249.580078125, 'box_height': 259.200927734375, 'objectID': 18446744073709551615, 'unique_com_id': 'OBJ2'}]}
行协议
BW_Inference_results,host=jetson-desktop,topic=inference detection_frame_number=0,total_num_obj=2,classID=0,label="Car",confidence=0.40576171875,coor_left=650.953125,coor_top=361.28204345703125,box_width=171.03515625,box_height=190.78953552246094,objectID=18446744073709552000,unique_com_id="OBJ1" 1635940856224413650
BW_Inference_results,host=jetson-desktop,topic=inference detection_frame_number=0,total_num_obj=2,classID=0,label="Car",confidence=0.2666015625,coor_left=977.8388671875,coor_top=512.3947143554688,box_width=249.580078125,box_height=259.200927734375,objectID=18446744073709552000,unique_com_id="OBJ2" 1635940856224413650
正如您所见,上面的 JSON 示例生成了两个行协议条目,时间戳完全相同。一个用于 OBJ1,第二个用于 OBJ2。这会带来一个问题,因为 OBJ2 在写入 InfluxDB2 时会覆盖 OBJ1。我们可以使用处理器插件来解决此问题。
处理器插件:Starlark
Starlark 是一种类似 Python 的脚本语言,可让您实时操作指标。这可以是任何事情,从执行自定义计算到转换时间戳(查看这篇精彩的博客以获取更多信息)。在我们的例子中,我们将使用 Starlark 向我们的指标添加一个标签,该标签将充当唯一标识符
[[processors.starlark]]
namepass = ["BW_Inference_results"]
source = '''
def apply(metric):
v = metric.fields.get('unique_com_id')
if v == None:
return metric
metric.tags['obj'] = v
return metric
'''
分解
- namepass - 我们将在路由中介绍这一点。
- source - 此参数需要一个 Starlark 脚本。我们的脚本将传入的指标作为输入,并将 unique_com_id 字段中的值存储在 v 中(OBJ1、OBJ2 等)。如果返回的值为 none(未检测到),我们只需返回指标,因为这是一个有效的结果。否则,我们创建一个名为 obj 的新标签,并在其中分配存储在 v 中的值。然后我们返回修改后的指标。
这是我们的指标在通过我们的脚本后的样子
BW_Inference_results,host=jetson-desktop,obj=OBJ1,topic=inference detection_frame_number=0,total_num_obj=2,classID=0,label="Car",confidence=0.40576171875,coor_left=650.953125,coor_top=361.28204345703125,box_width=171.03515625,box_height=190.78953552246094,objectID=18446744073709552000,unique_com_id="OBJ1" 1635940856224413650
BW_Inference_results,host=jetson-desktop,obj=OBJ2,topic=inference detection_frame_number=0,total_num_obj=2,classID=0,label="Car",confidence=0.2666015625,coor_left=977.8388671875,coor_top=512.3947143554688,box_width=249.580078125,box_height=259.200927734375,objectID=18446744073709552000,unique_com_id="OBJ2" 1635940856224413650
正如您所见,新标签已应用于每个指标,从而使它们成为唯一的条目。
输出插件:InfluxDB2
我们在Jetson 系列第 1 部分中讨论了配置。需要考虑的主要点是架构更改。由于我们将至少每秒接收一个样本,我认为最好运行 InfluxDB 的边缘实例来存储我的推理结果。它直接位于我的 Jetson 设备上。从那里我们可以对数据进行降采样并将其发送到云端以进行更密集的计算
[[outputs.influxdb_v2]]
alias = "edge_inference"
namepass = ["BW_Inference_results"]
urls = ["http://localhost:8086"]
token = ""
organization = "Jetson"
bucket = "edge_inference"
路由
我们知道,目前有两个唯一的数据流正在通过 Telegraf 进行处理
- Jetson 统计信息
- 推理结果
问题是我们不希望我们的 Jetson 统计信息结果通过我们的 Starlark 插件进行处理,并且我们不希望我们的原始推理样本发送到云端。
幸运的是,您可以在 Telegraf 中部署名为 pass 和 drop 的路由功能。Pass 和 drop 可以应用于测量 (name)、标签 (tag) 或字段级别 (tag)。在我们的例子中,我们只需要来自最高级别(测量)的路由,因此我们使用 namepass 和 namedrop。
Namepass
由于我们只期望一种类型的测量 (BW_Inference_results) 通过我们的 Starlark 插件,因此 namepass 插件对我们来说是一个不错的选择
[[processors.starlark]]
namepass = ["BW_Inference_results"]
上面的配置将只允许名称为 BW_Inference_results 的测量进入 Starlark 插件。我们还将相同的 namepass 应用于我们的 InfluxDB_v2 (OSS) 插件。
Namedrop
现在,可能我们只想阻止特定测量 (BW_Inference_results) 通过插件,并让其余部分正常处理。我们可以使用 namedrop 来阻止 BW_Inference_results 到达云端
[[outputs.influxdb_v2]]
namedrop = ["BW_Inference_results"]
上面的配置将在名称为 BW_Inference_results 的所有测量到达 InfluxDB_v2 (Cloud) 插件之前将其丢弃。
InfluxDB
我们现在正在将推理指标摄取到 InfluxDB 中。我们要实现两个目标
- 用于原始推理数据的简单仪表板
- 降采样和传输到我们的云实例
仪表板
正如我们在本系列第 1 部分中所做的那样,我们将通过在此处找到的模板安装仪表板。如果您需要有关如何执行此操作的提醒,请回顾第 1 部分中的 InfluxDB Cloud 部分。
导入后,您的仪表板将类似于上面的仪表板。让我们快速浏览一下
- 帧号和当前总检测数 - 单个值单元格,显示上次已知的帧号和检测到的对象总数。
- 平均置信度 - 这对所有样本执行平均聚合(不考虑标签)。从那里我执行一个 map() 函数,将平均值乘以 100,以得出人类可读的百分比。
- 时间线 - 获取检测标签并对标签列(OBJ 1、OBJ2 等)执行 pivot()。这为我们提供了在什么时间检测到什么对象的时间线。
- 检测状态更改 - 这利用了我们的马赛克图。它通过检测对象为某种类型的时间长短来跟踪对象状态更改。正如我们所见,大多数汽车检测 OBJ 1 由紫色条表示。
- 边界框宽度和高度 - 这些图表并没有告诉我们太多信息,除了对象在一段时间内的平均高度和宽度。我选择保留它们是为了表明并非所有原始数据都适合作为良好的可视化数据。我们将在云端进一步处理这些数据,以提供更有趣的见解。
降采样和 RemoteTo
我们列表上的最后一项任务是将我们的解决方案从边缘和云解决方案转换为两者的混合。我们为什么要这样做?
答案: 我们的边缘设备根本无法处理额外的工作负载。它已经在运行我们的管道、broker、Telegraf 和本地 InfluxDB 实例。对于入门级 arm 处理器来说,这已经相当多的工作了(不包括在 GPU 上运行的管道组件)。此外,除非我们将此端点暴露给更大的 WAN,否则我们对本地仪表板的可见性只能在我们的本地网络上访问。
为了实现我们的混合模式,我们需要通过任务部署两个 Flux 功能(我不会深入介绍任务,但查看文档以了解更多信息)
- 降采样
- RemoteTo(正式名称为 experimental.to() 函数,并增加了远程支持功能)
这是任务
import "experimental"
import "influxdata/influxdb/secrets"
option task = {
name: "edge_to_cloud",
every: 1h,
offset: 0s,
}
from(bucket: "edge_inference")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "BW_Inference_results")
|> aggregateWindow(every: 10m, fn: last, createEmpty: false)
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> drop(columns: ["_start", "_stop"])
|> group(columns: ["_measurement", "obj"], mode: "by")
|> experimental.to(
bucketID: "13dd5e3bc3998a75",
orgID: "05ea551cd21fb6e4",
host: "https://us-east-1-1.aws.cloud2.influxdata.com",
token: secrets.get(key: "cloud"),
)
上述任务每小时运行一次,并执行以下操作
- 查询 edge_inference bucket 以获取过去一小时的数据。
- 将数据聚合到 10 分钟的间隔,并选择该间隔的最后一个非空值。
- 然后我们按 _field 列进行透视。这会将我们的字段根据时间条目转换为列。要了解有关透视的更多信息,请查看这篇 博客。
- 然后我们按我们的测量和标签列进行分组。这是 experimental.to() 的结构要求。
- 现在到了有趣的部分。我们使用 experimental.to() 将下游表发送到云实例。为此,您需要以下参数:bucket/bucketID、org/orgID、host 和 token(请查看密钥存储文档)。请注意,当使用 ID 而不是名称时,参数名称会有所不同。
搞定了!我们已经将下采样数据卸载到云实例以进行进一步处理。请耐心等待第 3 部分,其中将介绍一些有趣的 Flux 转换。
结论
内容很多!但是,正如我们大多数人所知,从头开始构建和创建 IoT 项目是一项艰巨的任务,许多公司都以此为基础构建整个业务。我们在这里学到的是,通过结合三个开源项目(NVIDIA DeepStream、Telegraf 和 InfluxDB),您可以加速实现卓越成果!
我希望这篇博客能让您有信心深入探索并开始尝试 Vision AI。只有更多具有领域特定知识的才华横溢的人参与进来,这个领域才会不断进步。
一旦您启动并运行,请在 InfluxData Slack 和社区论坛上与我联系(请务必标记我 @Jay Clifford)。让我们继续交流!