PID 控制器和 InfluxDB:第二部分 - 数字孪生

导航至

在之前的文章中,我们描述了一个 CSTR 和 PID 控制器。本文将介绍来自项目仓库的数字孪生的代码和架构。该项目利用 Kafka 进行数据流处理,Faust 进行数据处理,InfluxDB 存储时间序列数据,Telegraf 将数据从主题写入 InfluxDB。我们还将讨论此堆栈的优缺点。本文对应的仓库可以在这里找到。

2024-08-28 上午 11.16.39 的屏幕截图本文描述的项目架构图。CSTR 的数字孪生产生数据,由 Kafka 收集。Faust 充当 PID 控制器。Telegraf 订阅 Kafka 主题,将传感器和控制数据发送到 InfluxDB。反应器和冷却夹套由 Grafana 监控。

数字孪生逻辑架构

我们将重点关注 cstr_kafka_influxdb 目录中的登录。此目录包含使用 InfluxDB、Kafka、Telegraf 和 Python 模型 CSTR 的示例。两个脚本 cstr_model.pypid_controller.py 分别负责模拟 CSTR 和 PID 控制器。高级架构如下所示: 结果是一个 CSTR 数字孪生,模拟在设定的温度设定点条件下执行反应。我们生成以下数据:

CSTR 数字孪生的结果。顶部图表显示了反应物 A (Ca) 随时间的浓度,中间图表显示了反应器温度 (T) 和设定点,底部图表显示了冷却夹套的温度 (u)。

cstr_model.py 解释

此脚本模拟连续搅拌罐反应器 (CSTR) 并处理来自 PID 控制器的输入控制信号。脚本包含以下部分

导入和应用程序配置

import faust
import numpy as np
from scipy.integrate import odeint
app = faust.App(
    'cstr_model',
    broker='kafka://127.0.0.1:9092',
    store='memory://',
    value_serializer='json',
    web_port=6066
)
cstr_topic = app.topic('cstr')
pid_control_topic = app.topic('pid_control')

定义了两个 Kafka 主题:cstr(用于发送 CSTR 状态)和 pid_control(用于接收控制信号)。

初始值和常量

# The process count and iterations is used to stop the number of times values are produced and consumed from the respective topics. 
process_count = 0
max_iterations = 300
# Tf is the temperature of the feed. This is assumed to be a constant. 
Tf = 350
# Caf is the concentration of the reactant A in the feed. This is assumed to be a constant. 
Caf = 1
# ts represents the time duration between points. Its assumed that all time is regular here. 
ts = [0,0.03333]  
# initial_Ca and initial T are the initial values of the concentration of reactant A and the temperature inside the reactor, respectively. 
initial_Ca = 0.87725294608097
initial_T = 324.475443431599

CSTR 模型函数

定义了模型 CSTR 的微分方程。

def cstr_model_func(x, t, u, Tf, Caf):
    Ca, T = x
    q = 100
    V = 100
    rho = 1000
    Cp = 0.239
    mdelH = 5e4
    EoverR = 8750
    k0 = 7.2e10
    UA = 5e4
    rA = k0 * np.exp(-EoverR / T) * Ca
    dCadt = q / V * (Caf - Ca) - rA
    dTdt = q / V * (Tf - T) + mdelH / (rho * Cp) * rA + UA / V / rho / Cp * (u - T)
    return [dCadt, dTdt]`

模拟函数

此函数将用于从 pid_control 主题中消费 u 值。该函数还使用 odeint 对微分方程进行积分并模拟下一时间戳的CSTR。这些 new_Canew_T 值将被发送到 cstr 主题,以便PID控制器可以评估下一个 u 值。

def simulate_cstr(Ca, T, ts, u, Tf, Caf):
    x0 = [Ca, T]
    y = odeint(cstr_model_func, x0, ts, args=(u, Tf, Caf))
    new_Ca = y[-1][0]
    new_T = y[-1][1]
    return new_Ca, new_T

定义Faust代理

接下来,我们定义代理和函数,这些代理和函数用于监听主题中的值,并发送 new_Ca 和 new_T 值以及我们的初始值。

@app.agent(cstr_topic)
async def cstr(cstr):
    global process_count
    async for Ca_T_values in cstr:
        Ca = Ca_T_values.get('Ca')
        T = Ca_T_values.get('T')
        print(f"[model] cstr func")
        print(f"[model] Received Ca cstr: {Ca}, T: {T}")

@app.agent(pid_control_topic)
async def consume_u(events):
    global process_count
    print("[model] Starting PID control loop")
    initial_values = {
        'Ca': initial_Ca,
        'T': initial_T`
    }`
    print(f"[model] Sending initial values: Ca: {initial_Ca}, T: {initial_T}")
    await cstr_topic.send(value=initial_values)

最后,我们处理来自 pid_control Kafka 主题的传入事件,以获取 u(冷却套温度)的新值,解算常微分方程以获得下一个 CaT 值,并将结果发送到 cstr Kafka 主题。

  async for event in events:
        print(f"[model] Received event: {event}")
        u = event.get('u')
        Ca = event.get('Ca')
        T = event.get('T')
        print(f"[model] Into simulate_cstr u: {u}, Into simulate_cstr Ca: {Ca}, Into simulate_cstr T: {T}")
        if u is not None and Ca is not None and T is not None:
            new_Ca, new_T = simulate_cstr(Ca, T, ts, u, Tf, Caf)
            new_values = {
                'Ca': new_Ca,
                'T': new_T,
            }
            print(f"[model] consume sent")
            print(f"[model] Received u: {u}, Computed new Ca: {new_Ca}, new T: {new_T}")
            await cstr_topic.send(value=new_values)

pid_controller.py 说明

从 Kafka 的角度来看,此脚本基本上与 cstr_model.py 脚本相同,只是它监听来自 cstr 主题的值,并计算要发送到 pid_control 主题的 u 值。

唯一的真正区别是PID控制器的逻辑,该逻辑由该函数提供

def pid_control(T_ss, u_ss, ts, Tf, Caf, Ca, T, sp, ie_previous):
    """Compute the u value based on PID control."""
    delta_t = ts[1] - ts[0]
    e = sp - T
    if process_count >= 1:
        ie = ie_previous + e * delta_t
    else:
        ie = 0.0
    P = Kc * e
    I = Kc / tauI * ie
    print(f"[pid] delta_t: {delta_t}, e: {e}, ie_previous: {ie_previous}, ie: {ie}")  # Debugging print
    op = u_ss + P + I
    # Upper and Lower limits on OP
    op_hi = 350.0
    op_lo = 250.0
    if op > op_hi:
        op = op_hi
        ie = ie - e * delta_t
    if op < op_lo:
        op = op_lo
        ie = ie - e * delta_t
    u = op
    return u, ie

此函数返回冷却套温度以及误差项的积分。IE是PID控制器的一个关键组件。它是PID算法中的积分控制动作的一部分,有助于消除稳态误差并随着时间的推移提高系统精度。

使用 Telegraf 将数据写入 InfluxDB v3

使用 Kafka Consumer 输入插件 从正确的主题获取数据并将其发送到 InfluxDB。配置文件如下所示

TOML or bash
[agent]
  interval = "10s"
  round_interval = true
  debug = true
[[inputs.kafka_consumer]]
  brokers = ["kafka:9092"]
  topics = ["pid_control"]
  data_format = "json_v2"
  [[inputs.kafka_consumer.json_v2]]
    measurement_name = "CSTR_data"
    [[inputs.kafka_consumer.json_v2.field]]
      path = "Ca"
      type = "float"
    [[inputs.kafka_consumer.json_v2.field]]
      path = "T"
      type = "float"
    [[inputs.kafka_consumer.json_v2.field]]
      path = "u"
      type = "float"
    [[inputs.kafka_consumer.json_v2.field]]
      path = "setpoint"
      type = "float"
    [[inputs.kafka_consumer.json_v2.field]]
    path = "ie"
    type = "float"
[[outputs.file]]
  files = ["stdout"]
[[outputs.influxdb_v2]]
  urls = ["https://us-east-1-1.aws.cloud2.influxdata.com/"]
  token = "xxx"
  organization = "xxx"
  bucket = "CSTR"`

本质上,我们只是订阅了 pid_control 主题,并收集了其中写入的所有计算值。

使用 Grafana 监控化学反应器

现在,我们的数字孪生将反应器温度、浓度、冷却套温度和设定点数据写入 InfluxDB,我们可以使用 Grafana 构建仪表板和警报来监控我们的CSTR。您可以使用 Grafana InfluxDB v3 数据源 创建此类可视化

Grafana中的CSTR和冷却套数据仪表板。

右下角的设定点和反应器温度图表显示,冷却套和控制器成功帮助反应器温度保持在附近并达到设定点。

为什么使用 Kafka?

在我的当前项目中,我选择 Kafka 作为 CSTR 和 PID 控制器之间实时数据流和处理的骨干。尽管每个主题(cstrpid_control)仅使用一个分区,但 Kafka 提供了一个高度可靠和可扩展的架构,可以根据项目需求扩展。Kafka 的强大消息代理功能确保实时数据从 CSTR 无缝流向 PID 控制器,从而促进对反应器条件的持续监控和调整。

选择 Kafka 的主要原因之一是其容错性和持久性,这保证了在流过程中不会丢失任何数据。这种可靠性对于维持最佳反应器条件并确保工艺的完整性至关重要。此外,Kafka 处理高吞吐量、低延迟数据流的能力使其非常适合此类实时应用,其中及时数据处理至关重要。

展望未来,如果项目需要扩展,Kafka的架构允许轻松地对主题进行分区。通过增加分区数量,我们可以将数据负载分配到多个消费者,提高系统的吞吐量和容错性。这种可扩展性意味着随着流程复杂性的增长——比如增加更多的CSTR或PID控制器——基于Kafka的基础设施可以无缝适应处理增加的数据量和处理需求。

Faust的优缺点

Faust是一个流处理库,为与实时数据工作的开发者提供了显著的优势。我选择使用它,因为它的Pythonic API使其特别吸引人。我无需使用Java或Scala,这两者通常与Kafka Streams一起使用。此外,Faust与Kafka的集成使得创建健壮、可扩展和容错的数据管道成为可能,这些管道可以高效地处理高吞吐量的数据流。Faust还支持异步编程,这增强了其并行处理数据的能力,使其适用于需要实时分析、监控和事件驱动处理的应用。Faust注重简单易用,这有助于快速开发和部署流应用程序。

然而,Faust也有一些局限性。主要缺点是,在需要高并发的环境中,Faust比Java或Scala的对应产品(如Kafka Streams)效率较低。此外,Faust完全是社区支持的,导致文档较少、社区资源较少,并且新特性和错误修复的开发可能较慢。

考虑其他数据处理工具

虽然我选择Faust因为它易于使用,但还有其他一些知名的数据处理工具,包括Spark、Kafka Streams、Bytewax和Quix.io,也值得考虑。

  • Kafka Streams是用Java和Scala编写的,针对性能进行了高度优化,并与Kafka紧密集成,这使得它成为企业级流应用的稳健选择,但代价是需要Java/Scala的专家知识。
  • Apache Spark虽然主要以其批处理功能而闻名,但也支持通过Structured Streaming进行流处理,并提供了一个强大的、统一的引擎,它可以以高性能和可扩展性处理批处理和流处理。
  • Bytewax类似于Faust,是为Python开发者设计的,利用及时数据流进行实时分析,为流处理提供了一种更现代、更原生的方法。
  • Quix.io专注于高频数据流,提供了一个综合平台,包括数据摄取、处理和可视化工具。它还包含快速入门模板和插件,便于与InfluxDB集成。

每个工具都有其自身优点和权衡,因此选择取决于具体的项目需求、现有基础设施以及团队的专业知识。

运行示例

要自己运行示例,请按照以下步骤操作

  1. 在此处创建InfluxDB Cloud v3账户
  2. 创建一个名为“CSTR”的桶
  3. 创建一个令牌
  4. 编辑telegraf.conf以包含您的InfluxDB凭据
  5. 运行docker-compose up -d

现在,您应该能够在InfluxDB中可视化您的CSTR数据。在下图截图中,我们使用SQL查询您的Ca和T数据。

未来

这个数字孪生模型了一个完美的CSTR,并为此做出了很多假设,包括

  • 完美的混合:反应器的内容被完全混合,这意味着浓度和温度在整个反应器中是均匀的。
  • 反应器的体积恒定。
  • 稳态流速:进料和浓度率是恒定的。
  • 一级化学反应动力学。
  • 反应器和冷却夹套之间的瞬间传热。
  • 还有更多条件,这里不一一列举。

这些假设的影响使我们能够创建可以解决的常微分方程(ODE)。不幸的是,在现实世界中,这些假设并不总是有效的。具体来说,simulate_cstr函数在cstr_model.py中使用接收到的控制输入u和当前值CaT来预测下一个值。然后,函数odeint在微小的时间步长上积分微分方程,以计算新的浓度和温度。在现实世界的例子中,我们可能会有应用这些简化假设并使用一系列ODE轻松预测下一个值的奢侈。在这种情况下,可以使用其他预测模型来预测下一个值。

在即将发表的博客文章中,我将重点介绍如何利用一些预测模型来预测CaT值,并将这些预测纳入创建更好的CSTR和PID控制器的模型中。

结论

本博客文章描述了如何使用Kafka和Faust作为PID控制器创建CSTR的数字孪生,同时将温度数据存储在InfluxDB中。我们使用Telegraf从Kafka主题获取数据,并使用Grafana可视化结果。一如既往,您可以在InfluxDB v3 Cloud开始使用。在下一篇文章中,我们将介绍如何运行项目,深入了解架构和逻辑,并讨论所选堆栈的一些优缺点。如果您需要帮助,请通过我们的社区网站Slack频道联系我们。如果您也在使用InfluxDB进行数据处理项目,我非常乐意听到您的声音!