NVIDIA Jetson 系列 - 第 2 部分 (视觉 AI 流程)

导航到

希望你已经看过了本系列的第 1 部分。如果没有,请不要阅读下面的任何剧透内容,并查看 第 1 部分。都已经跟上了吗?太好了!让我们继续前进。

在第 2 部分,我们的目标是构建一个可以用于 DIY 家庭安全系统的视觉 AI 流程。以下是计划

Solution-Architecture

解决方案架构

NVIDIA DeepStream

那么 NVIDIA DeepStream 是什么?

在其最简单的形式中,NVIDIA DeepStream 主要是针对 NVIDIA GPU 加速进行优化的 GStreamer (点击此处查看我的 GStreamer 总结) 元素集合。

除了 GPU 加速之外,NVIDIA 还提供了一系列视觉分析 (Tensor RT) 和物联网插件 (MQTT、Kafka),它们可以将您从学习基础库和系统中抽象出来。这并不是说开发视觉 AI 解决方案是一件容易的事。希望这篇指南能展示如何使用 InfluxDB 和 Telegraf 加速您开发此类复杂解决方案。

视觉 AI 流程

现在你已经有了足够的背景知识来冒险了,让我们看看我们的视觉流程。我将把它分解成两个图表

Vision AI Pipeline (High level)

视觉 AI 流程(高级)

对于这个解决方案,我们将使用连接到 NVIDIA Jetson 的 USB 摄像头。我们的视觉 AI 流程将摄取 USB 摄像头产生的原始帧,并通过一个目标检测模型进行传输。该模型被训练来检测四个类别:人、汽车、自行车和道路标志。然后,产生的推理结果被用于两个并行输出插件

  1. RTSP(实时流协议)服务器:这使我们能够将视频帧发送到可网络访问的端点,这意味着我们可以远程监控摄像头的输出。RTSP 协议通常在闭路电视(CCTV)架构中使用。
  2. MQTT 客户端:简而言之,MQTT 客户端允许我们将检测结果(也称为推理结果)发送到 Telegraf 和 InfluxDB 以进行进一步的分析。我们将在后面进一步讨论。

那么我们的视觉AI流程在GStreamer组件中是什么样子的呢?

Vision AI Pipeline (GStreamer)

视觉AI流程(GStreamer)

感觉有很多东西吧?如果您想了解更多,请查看我的深入解析文档这里

推理引擎和模型

那么什么是推理引擎和模型呢?

  • 推理引擎 - 提供在所选设备(CPU、GPU等)上运行神经网络的必要功能和优化。推理引擎的API提供了与您的模型进行交互的简化方法,例如输入帧进行处理并产生输出张量。
  • 模型 - 我们的模型本质上是一个预训练的神经网络。在传统编程中,我们为算法提供规则,使其能够生成答案(如果有四条腿且会吠叫,则等于狗)。神经网络提供输入数据和答案,并推导出实现答案的规则。

在我们的解决方案中,NVinfer插件在模型部署方面做了大部分繁重的工作。NVinfer的功能包括:

  • 优化 - NVinfer将使用TensorRT库自动优化模型框架(Caffe、UFF和ONNX)。
  • 转换 - NVinfer确保帧根据模型的输入要求进行适当的缩放和转换。
  • 处理 - NVinfer部署TensorRT推理引擎,运行我们的优化模型。这产生一个元数据,包含模型的预测结果。TensorRT引擎支持多类目标检测、多标签分类和分割。

如果您想了解更多关于TensorRT引擎的信息,我强烈建议您查看这篇Medium博客

流程代码

代码相当长,所以在这里我不会全部讲解。如果您想查看完整的代码,请查看我的代码库

注意:代码的大部分内容来自NVIDIA DeepStream示例,您可以在这里找到:这里。我的演示基于版本5.1,请注意,当6.0发布时,可能会包含破坏性更改。

我的目标是提高代码的可读性和整体模块化,因为我们将会扩展以包含Telegraf和InfluxDB的自己的数据模型。我已经创建了一个类图并突出了一些流程的核心机制这里

Telegraf和InfluxDB

我们的流程目前正在处理来自网络摄像头的图像帧,运行它们通过我们的检测模型,并产生结果的网络流。如果您可以24小时观看这个流,那就太好了。那么我们如何收集推理结果并在更广泛的应用中使用它们呢?Telegraf和InfluxDB几乎为我们提供了所有需要的东西。让我们来看看。

BPMN - Final Solution img.

BPMN(最终解决方案)

MQTT客户端

这里有几种选择

  1. 使用Python客户端库直接将数据写入InfluxDB
  2. 将数据写入通用协议,并使用Telegraf导入数据

选项1的优点是我们少了一个需要维护的解决方案组件。我们还能够对何时以及如何将数据发送到InfluxDB获得更强的控制。缺点是我们降低了我们解决方案的互操作性。Telegraf提供了大量发送我们的推理结果到其他系统的可能性。我选择了选项2,因为我们已经使用Telegraf来收集我们的Jetson统计信息,所以我们只需简单地向这个Telegraf实例添加另一个输入插件。

实施

我选择了使用Paho MQTT客户端,因为这个库配置起来非常简单,并且是一个受Eclipse项目支持的库。我决定创建一个mqtt_client类进行初始化

class mqtt_client:
    def __init__(self, address, port, clientID) -> None:

        self.mqttBroker = address
        self.port = port
        self.clientID = clientID
        self.client = None

    def connect_client(self):
        MQTT_KEEPALIVE_INTERVAL = 45
        self.client = mqtt.Client(self.clientID)
        self.client.connect(host=self.mqttBroker,port=self.port, keepalive=MQTT_KEEPALIVE_INTERVAL)

    def connect_client_secure(self, username, password):
        MQTT_KEEPALIVE_INTERVAL = 60
        self.client = mqtt.Client(self.clientID)
        self.client.connect(host=self.mqttBroker,port=self.port, keepalive=MQTT_KEEPALIVE_INTERVAL)
        self.client = mqtt.Client(self.clientID)
        self.client.connect(self.mqttBroker)
        self.client.tls_set()  # <--- even without arguments
        self.client.username_pw_set(username=username, password=password)

    def publish_to_topic(self, topic: str, data: dict):
        message = json.dumps(data)
        self.client.publish(topic, message)

然后,我们在osd_sink_pad_buffer_probe函数中初始化MQTT连接(《inference_results.py》文件

try:
        mqttClient = mqtt_client("localhost", 1883 , "Inference_results")
        mqttClient.connect_client()
    except:
        print("could not connect to MQTT Broker")
        BROKER_CONNECT = False

推理结果在l_frame循环期间发送。这里需要注意以下几点

  1. 我们只每隔30帧发送一次结果。由于我们的摄像头每秒产生30帧,所以报告每一帧没有意义,因为大多数物体在1秒内不会发生变化。
  2. 我们的元数据是基于我们创建的INFERENCE数据结构保存的。我们将在下一节讨论这一点。

注意:你可能想知道为什么我没有决定使用nvmsgbroker插件。这是一个个人偏好,因为我想要对我们提供给Telegraf的数据结构有更多的控制。虽然需要更多的Starlark工作,但可以达到相同的结果。

代理

为了简单起见,我直接在我的Jetson设备上安装了Eclipse Mosquitto MQTT代理。以下是安装说明

sudo apt-get update
sudo apt-get install mosquitto
sudo systemctl enable mosquitto
sudo systemctl start mosquitto

这将使Mosquitto作为服务安装到您的设备上。然后我们使用systemctl enable确保代理在设备启动时启动。

注意:这将配置代理以默认不安全状态在端口1883上启动。有关配置的更多信息,请参阅Mosquitto文档

数据模型

如前所述,编写我们自己的MQTT客户端使我们能够灵活地选择我们提供的数据模型。以下是一个使用自定义模型的示例

{
   "detection_frame_number":2280,
   "total_num_obj":1,
   "inference_results":[
      {
         "classID":2,
         "label":"Person",
         "confidence":0.67747300267219543,
         "coor_left":1303.7781982421875,
         "coor_top":139.6199951171875,
         "box_width":467.4754943847656,
         "box_height":937.4452514648438,
         "objectID":18446744073709551615,
         "unique_com_id":"OBJ1"
      }
   ]
}

让我们分解一下

  1. 检测帧编号 - 告诉我们检测所使用的帧编号。如果应用程序重新启动,帧编号将重置。
  2. 总对象数 - 模型检测到的对象总数
  3. 推理结果 - 一个数组,包含当前帧中每个检测的对象。每个对象包含以下内容
    • classID - 检测分类的唯一标识符(1 = 汽车,2 = 人)。
    • 标签 - classID的可读版本
    • 置信度 - 一个介于0和1之间的数值,包含模型对其检测的置信度百分比(0.6 = 60%)
    • coor_left - 帧左侧边缘与边界框左上角之间的距离。
    • coor_top - 帧顶部边缘与边界框左上角之间的距离。
    • box_width - 从左侧点到右侧点的边界框宽度。
    • box_height - 从顶部点到底部点的边界框长度。
    • objectID - 如果您引入跟踪器,这将成为重要的因素。每个对象的唯一标识符。
    • Unique_com_id - 由于我们没有使用跟踪器,生成的objectID对所有检测是等效的。我引入这个字段来区分检测。

因此,您可能想知道在当前的数据结构中,我们如何区分给定时间间隔内的不同并行检测。InfluxDB会简单地覆盖字段。请继续阅读Telegraf部分以了解更多信息。

Telegraf

因此,我们有我们的MQTT客户端根据自定义数据模型向MQTT代理发布样本。下一步是将数据导入InfluxDB。

BPMN - Telegraf

图5 - BPMN(Telegraf)

输入插件:MQTT_Consumer

我们数据导入插件的配置相当简单。

alias = "vision_pipeline"
interval = "5s"
name_override = "BW_Inference_results"
servers = ["tcp://127.0.0.1:1883"]
topics = [
    "inference"
  ]
qos = 2
connection_timeout = "30s"
client_id = "1"
data_format = "json_v2"

 [[inputs.mqtt_consumer.json_v2]]
       [[inputs.mqtt_consumer.json_v2.object]]
           path = "@this"
           disable_prepend_keys = true

上述大多数插件设置都是显而易见的。如果您想了解更多关于它们的信息,请查看插件readme。我想讨论的是json_v2解析器。我在社区中看到它出现得相当频繁。在这种情况下,@this代表我们的JSON对象的最顶层。disable_prepend_keys将防止任何字段名称在它们的父名称之前。以下是将解析的JSON转换为行协议的示例。

JSON

{'detection_frame_number': 0, 'total_num_obj': 2, 'inference_results': [{'classID': 0, 'label': 'Car', 'confidence': 0.40576171875, 'coor_left': 650.953125, 'coor_top': 361.28204345703125, 'box_width': 171.03515625, 'box_height': 190.78953552246094, 'objectID': 18446744073709551615, 'unique_com_id': 'OBJ1'}, {'classID': 0, 'label': 'Car', 'confidence': 0.2666015625, 'coor_left': 977.8388671875, 'coor_top': 512.3947143554688, 'box_width': 249.580078125, 'box_height': 259.200927734375, 'objectID': 18446744073709551615, 'unique_com_id': 'OBJ2'}]}

行协议

BW_Inference_results,host=jetson-desktop,topic=inference detection_frame_number=0,total_num_obj=2,classID=0,label="Car",confidence=0.40576171875,coor_left=650.953125,coor_top=361.28204345703125,box_width=171.03515625,box_height=190.78953552246094,objectID=18446744073709552000,unique_com_id="OBJ1" 1635940856224413650
BW_Inference_results,host=jetson-desktop,topic=inference detection_frame_number=0,total_num_obj=2,classID=0,label="Car",confidence=0.2666015625,coor_left=977.8388671875,coor_top=512.3947143554688,box_width=249.580078125,box_height=259.200927734375,objectID=18446744073709552000,unique_com_id="OBJ2" 1635940856224413650

如您所见,上述JSON示例生成了两个具有相同时间戳的行协议条目。一个是OBJ1,另一个是OBJ2。这会引发问题,因为OBJ2在写入InfluxDB2时会覆盖OBJ1。我们可以通过处理器插件来解决这个问题。

处理器插件:Starlark

Starlark是一种类似于Python的脚本语言,允许您实时操作指标。这可以是执行自定义计算或将时间戳转换为其他格式(查看更多信息的优秀博客)。在我们的案例中,我们将使用Starlark向我们的指标添加一个标签,该标签将作为唯一标识符。

[[processors.starlark]]
  namepass = ["BW_Inference_results"]

  source = '''
def apply(metric):
    v = metric.fields.get('unique_com_id')

    if v == None:
      return metric

    metric.tags['obj'] = v
    return metric
'''

分解

  1. namepass - 我们将在路由中讨论这个问题。
  2. source - 此参数期望一个Starlark脚本。我们的脚本将输入指标作为输入,并将值存储在v(OBJ1、OBJ2等)中的unique_com_id字段中。如果返回值是none(没有检测),我们只需返回指标,因为这是一个有效的结果。否则,我们创建一个新的标签obj,并将其值分配给它。然后我们返回修改后的指标。

以下是经过我们的脚本处理后的指标

BW_Inference_results,host=jetson-desktop,obj=OBJ1,topic=inference detection_frame_number=0,total_num_obj=2,classID=0,label="Car",confidence=0.40576171875,coor_left=650.953125,coor_top=361.28204345703125,box_width=171.03515625,box_height=190.78953552246094,objectID=18446744073709552000,unique_com_id="OBJ1" 1635940856224413650
BW_Inference_results,host=jetson-desktop,obj=OBJ2,topic=inference detection_frame_number=0,total_num_obj=2,classID=0,label="Car",confidence=0.2666015625,coor_left=977.8388671875,coor_top=512.3947143554688,box_width=249.580078125,box_height=259.200927734375,objectID=18446744073709552000,unique_com_id="OBJ2" 1635940856224413650

如您所见,新标签已应用于每个指标,从而使它们成为唯一的条目。

输出插件:InfluxDB2

我们在Jetson系列的第一部分中讨论了配置。需要考虑的主要点是架构更改。由于我们将至少每秒接收一个样本,我认为最好在我的Jetson设备上运行InfluxDB的边缘实例来存储我的推理结果。这位于我的Jetson设备上。从那里我们可以对数据进行下采样,并将其发送到云中进行更复杂的计算。

[[outputs.influxdb_v2]]
alias = "edge_inference"
namepass = ["BW_Inference_results"]
urls = ["https://127.0.0.1:8086"]
token = ""
organization = "Jetson"
bucket = "edge_inference"

路由

正如我们所知,当前有两个唯一的数据流通过Telegraf进行处理

  1. Jetson Stats
  2. 推理结果

问题是,我们不希望我们的Jetson Stat结果通过Starlark插件进行处理,也不希望我们的原始推理样本发送到云中。

Telegraf-without-routing

没有路由的Telegraf

幸运的是,您可以在Telegraf中部署称为pass和drop的路由功能。pass和drop可以在测量(名称)、标签(标签)或字段(标签)级别应用。在我们的案例中,我们只需要从最高级别(测量)进行路由,因此我们使用namepass和namedrop。

名称传递

由于我们只期望一种类型的测量(BW_Inference_results)通过我们的Starlark插件,因此名称传递插件对我们来说是一个很好的选择。

[[processors.starlark]]
  namepass = ["BW_Inference_results"]

上述配置将只允许名称为 BW_Inference_results 的测量进入Starlark插件。我们也对InfluxDB_v2(开源)插件应用了相同的名称传递。

名称丢弃

现在可能的情况是我们只想阻止特定的测量(BW_Inference_results)通过插件,并让其他测量正常处理。我们可以使用名称丢弃来阻止 BW_Inference_results 达到云服务。

[[outputs.influxdb_v2]]
namedrop = ["BW_Inference_results"]

上述配置将在 BW_Inference_results 达到InfluxDB_v2(云)插件之前丢弃所有具有该名称的测量。

Telegraf-with-routing

Telegraf路由

InfluxDB

我们现在正在将推理指标导入InfluxDB。我们希望达到两个目标:

  1. 对原始推理数据执行简单仪表板操作
  2. 对数据进行下采样并将其传输到我们的云实例

InfluxDB-hybrid-architecture

InfluxDB混合架构

仪表板

正如我们在本系列的第1部分中所做的那样,我们将通过此处找到的模板安装仪表板。如果您需要回忆如何进行此操作,请参阅第1部分的InfluxDB Cloud部分。

InfluxDB template dashboard

基本推理仪表板

一旦导入,您的仪表板将类似于上面的仪表板。让我们快速浏览一下:

  1. 帧编号 & 当前总数检测 - 单个值单元格,显示最后一个已知的帧编号和检测到的对象总数。
  2. 平均置信度 - 这对所有样本(不考虑标签)执行平均值聚合。然后我执行一个 map() 函数,将平均值乘以100,以便以人类可读的百分比形式显示。
  3. 时间线 - 对检测标签执行 pivot() 操作,对标签列(OBJ 1、OBJ 2等)进行操作。这为我们提供了检测到的对象在何时被检测到的时间线。
  4. 检测状态更改 - 这使用了我们的 mosaic tile。它通过检测一个对象保持某种状态的时间来跟踪对象状态的变化。正如我们所看到的,大多数汽车检测OBJ 1用紫色条表示。
  5. 边界框宽度和高度 - 这些图表并没有告诉我们太多,除了显示对象随时间变化的平均高度和宽度。我选择保留它们以表明并非所有原始数据都适合作为良好的视觉数据。我们将在云中进行进一步的数据处理,以提供更有趣的见解。

下采样和远程传输

我们列表中的最后一项任务是使我们的解决方案从边缘和云解决方案转变为两者的混合。我们为什么要这样做呢?

答案:我们的边缘设备根本无法处理额外的负载。它已经在运行我们的管道、代理、Telegraf和本地InfluxDB实例。这对入门级ARM处理器来说已经是一大堆工作了(不包括在GPU上运行的管道组件)。此外,我们只能在本地网络上访问本地仪表板,除非我们将此端点暴露给更大的WAN。

为了实现我们的混合,我们需要通过任务部署两个Flux功能(我不会深入介绍任务,但请查看文档以了解更多信息

  1. 下采样
  2. 远程传输(官方名称为实验性的to() 函数,增加了远程支持功能)

以下是任务:

import "experimental"
import "influxdata/influxdb/secrets"

option task = {
    name: "edge_to_cloud",
    every: 1h,
    offset: 0s,
}

from(bucket: "edge_inference")
    |> range(start: -1h)
    |> filter(fn: (r) => r["_measurement"] == "BW_Inference_results")
    |> aggregateWindow(every: 10m, fn: last, createEmpty: false)
    |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
    |> drop(columns: ["_start", "_stop"])
    |> group(columns: ["_measurement", "obj"], mode: "by")
    |> experimental.to(
        bucketID: "13dd5e3bc3998a75",
        orgID: "05ea551cd21fb6e4",
        host: "https://us-east-1-1.aws.cloud2.influxdata.com",
        token: secrets.get(key: "cloud"),
    )

上述任务每小时运行一次,并执行以下操作:

  1. 查询edge_inference存储桶过去一小时的全部数据。
  2. 将数据聚合到10分钟间隔,并选择该间隔的最后一个非空值。
  3. 然后通过_field列进行pivot。这根据时间条目将字段转换为列。要了解更多关于pivot的信息,请参阅此博客
  4. 然后,我们将测量和标签列进行分组。这是实验性.to()函数的结构要求
  5. 现在到了有趣的部分。我们使用实验性.to()函数将下游表发送到云实例。为此,你需要以下参数:bucket/bucketID、org/orgID、host和token(请参阅密钥存储文档)。注意,当使用ID而不是名称时,参数名称可能不同。

这就是我们所需要的!我们已经将数据下采样,并将其传输到云实例进行进一步处理。我将在第三部分给你带来一些有趣的Flux转换。

结论

这真是太棒了!但正如我们所知,从零开始构建物联网项目是一项巨大的任务,许多公司都是基于这个任务建立其整个业务。在这里我们所学的就是,通过结合三个开源项目(NVIDIA DeepStream、Telegraf和InfluxDB),你可以加速你的进程

我希望这篇博客能给你信心,让你大胆尝试Vision AI。只有更多具有特定领域知识的杰出人才参与其中,这个领域才会不断进步。

一旦你开始运行,请在InfluxData Slack和社区论坛上联系我(确保加上@Jay Clifford标签)。让我们继续聊天!