PID 控制器和 InfluxDB:第 2 部分 - 数字孪生

导航至

之前的文章中,我们描述了 CSTR 和 PID 控制器。这篇文章将介绍此项目仓库中数字孪生的代码和架构。该项目利用 Kafka 进行数据流处理,Faust 进行数据处理,InfluxDB 用于存储时间序列数据,Telegraf 用于将数据从主题写入 InfluxDB。我们还将介绍此堆栈的优点和缺点。这篇博文的相应仓库可以在这里找到。

2024 年 8 月 28 日上午 11:16:39 屏幕截图 此博客中描述的项目架构图。CSTR 的数字孪生生成的数据由 Kafka 收集。Faust 充当 PID 控制器。Telegraf 订阅 Kafka 主题以将传感器和控制数据发送到 InfluxDB。反应器和冷却套由 Grafana 监控。

数字孪生逻辑架构

这里我们将重点关注 cstr_kafka_influxdb 目录中的登录。此目录包含如何使用 InfluxDB、Kafka、Telegraf 和 Python 建模 CSTR 的示例。两个脚本 cstr_model.pypid_controller.py 分别负责模拟 CSTR 和 PID 控制器。高级架构如下所示: 结果是一个 CSTR 数字孪生,它模拟在提供的温度设定点条件下执行反应。我们生成的数据如下所示:

CSTR 数字孪生的结果。顶部的图表显示了反应物 A (Ca) 随时间的浓度,中间的图表显示了反应器的温度 (T) 和设定点(设定点),底部的图表显示了冷却套的温度 (u)。

cstr_model.py 解释

此脚本模拟连续搅拌釜式反应器 (CSTR) 并处理来自 PID 控制器的传入控制信号。该脚本包含以下部分

导入和应用配置

import faust
import numpy as np
from scipy.integrate import odeint
app = faust.App(
    'cstr_model',
    broker='kafka://localhost:9092',
    store='memory://',
    value_serializer='json',
    web_port=6066
)
cstr_topic = app.topic('cstr')
pid_control_topic = app.topic('pid_control')

定义了两个 Kafka 主题:cstr(用于发送 CSTR 状态)和 pid_control(用于接收控制信号)。

初始值和常数

# The process count and iterations is used to stop the number of times values are produced and consumed from the respective topics. 
process_count = 0
max_iterations = 300
# Tf is the temperature of the feed. This is assumed to be a constant. 
Tf = 350
# Caf is the concentration of the reactant A in the feed. This is assumed to be a constant. 
Caf = 1
# ts represents the time duration between points. Its assumed that all time is regular here. 
ts = [0,0.03333]  
# initial_Ca and initial T are the initial values of the concentration of reactant A and the temperature inside the reactor, respectively. 
initial_Ca = 0.87725294608097
initial_T = 324.475443431599

CSTR 模型函数

定义了模拟 CSTR 的微分方程。

def cstr_model_func(x, t, u, Tf, Caf):
    Ca, T = x
    q = 100
    V = 100
    rho = 1000
    Cp = 0.239
    mdelH = 5e4
    EoverR = 8750
    k0 = 7.2e10
    UA = 5e4
    rA = k0 * np.exp(-EoverR / T) * Ca
    dCadt = q / V * (Caf - Ca) - rA
    dTdt = q / V * (Tf - T) + mdelH / (rho * Cp) * rA + UA / V / rho / Cp * (u - T)
    return [dCadt, dTdt]`

模拟函数

此函数将用于从 pid_control 主题中消耗 u 值。该函数还使用 odeint 来积分微分方程并在下一个时间戳模拟 CSTR。这些 new_Canew_T 值被发送到 cstr 主题,以便 PID 控制器可以评估下一个 u 值。

def simulate_cstr(Ca, T, ts, u, Tf, Caf):
    x0 = [Ca, T]
    y = odeint(cstr_model_func, x0, ts, args=(u, Tf, Caf))
    new_Ca = y[-1][0]
    new_T = y[-1][1]
    return new_Ca, new_T

定义 Faust 代理

接下来,我们定义代理和函数,它们监听来自主题的值,并发送 new_Ca 和 new_T 值以及我们的初始值

@app.agent(cstr_topic)
async def cstr(cstr):
    global process_count
    async for Ca_T_values in cstr:
        Ca = Ca_T_values.get('Ca')
        T = Ca_T_values.get('T')
        print(f"[model] cstr func")
        print(f"[model] Received Ca cstr: {Ca}, T: {T}")

@app.agent(pid_control_topic)
async def consume_u(events):
    global process_count
    print("[model] Starting PID control loop")
    initial_values = {
        'Ca': initial_Ca,
        'T': initial_T`
    }`
    print(f"[model] Sending initial values: Ca: {initial_Ca}, T: {initial_T}")
    await cstr_topic.send(value=initial_values)

最后,我们处理来自 pid_control Kafka 主题的传入事件,以获取 u(冷却套温度)的新值,求解 ODE 以获得下一个 CaT 值,并将结果发送到 cstr Kafka 主题。

  async for event in events:
        print(f"[model] Received event: {event}")
        u = event.get('u')
        Ca = event.get('Ca')
        T = event.get('T')
        print(f"[model] Into simulate_cstr u: {u}, Into simulate_cstr Ca: {Ca}, Into simulate_cstr T: {T}")
        if u is not None and Ca is not None and T is not None:
            new_Ca, new_T = simulate_cstr(Ca, T, ts, u, Tf, Caf)
            new_values = {
                'Ca': new_Ca,
                'T': new_T,
            }
            print(f"[model] consume sent")
            print(f"[model] Received u: {u}, Computed new Ca: {new_Ca}, new T: {new_T}")
            await cstr_topic.send(value=new_values)

pid_controller.py 解释

从 Kafka 的角度来看,此脚本的工作方式与 cstr_model.py 脚本大致相同,不同之处在于它监听来自 cstr 主题的值并计算 u 值,然后将其发送到 pid_control 主题。

唯一真正的区别是 PID 控制器的逻辑,它由此函数提供

def pid_control(T_ss, u_ss, ts, Tf, Caf, Ca, T, sp, ie_previous):
    """Compute the u value based on PID control."""
    delta_t = ts[1] - ts[0]
    e = sp - T
    if process_count >= 1:
        ie = ie_previous + e * delta_t
    else:
        ie = 0.0
    P = Kc * e
    I = Kc / tauI * ie
    print(f"[pid] delta_t: {delta_t}, e: {e}, ie_previous: {ie_previous}, ie: {ie}")  # Debugging print
    op = u_ss + P + I
    # Upper and Lower limits on OP
    op_hi = 350.0
    op_lo = 250.0
    if op > op_hi:
        op = op_hi
        ie = ie - e * delta_t
    if op < op_lo:
        op = op_lo
        ie = ie - e * delta_t
    u = op
    return u, ie

此函数返回冷却套温度以及误差项的积分。IE 是 PID 控制器的关键组件。它是 PID 算法中积分控制动作的一部分,有助于消除稳态误差并提高系统随时间的精度。

使用 Telegraf 将数据写入 InfluxDB v3

Kafka Consumer Input Plugin 用于从正确的主题获取数据并将其发送到 InfluxDB。配置文件如下所示

TOML or bash
[agent]
  interval = "10s"
  round_interval = true
  debug = true
[[inputs.kafka_consumer]]
  brokers = ["kafka:9092"]
  topics = ["pid_control"]
  data_format = "json_v2"
  [[inputs.kafka_consumer.json_v2]]
    measurement_name = "CSTR_data"
    [[inputs.kafka_consumer.json_v2.field]]
      path = "Ca"
      type = "float"
    [[inputs.kafka_consumer.json_v2.field]]
      path = "T"
      type = "float"
    [[inputs.kafka_consumer.json_v2.field]]
      path = "u"
      type = "float"
    [[inputs.kafka_consumer.json_v2.field]]
      path = "setpoint"
      type = "float"
    [[inputs.kafka_consumer.json_v2.field]]
    path = "ie"
    type = "float"
[[outputs.file]]
  files = ["stdout"]
[[outputs.influxdb_v2]]
  urls = ["https://us-east-1-1.aws.cloud2.influxdata.com/"]
  token = "xxx"
  organization = "xxx"
  bucket = "CSTR"`

本质上,我们只是订阅 pid_control 主题并收集所有在那里写入的计算值。

使用 Grafana 监控化学反应器

既然我们的数字孪生正在将反应器温度、浓度、冷却套温度和设定点数据写入 InfluxDB,我们可以使用 Grafana 构建仪表板和警报来监控我们的 CSTR。您可以使用 Grafana InfluxDB v3 数据源来制作这样的可视化效果

Grafana 中 CSTR 和冷却套数据的仪表板。

反应器设定点和温度的右下图显示,冷却套和控制器成功帮助反应器温度保持接近并达到设定点。

为什么使用 Kafka?

在我当前的项目中,我选择 Kafka 作为 CSTR 和 PID 控制器之间实时数据流处理的骨干。尽管每个主题 (cstrpid_control) 仅使用单个分区,但 Kafka 提供了高度可靠且可扩展的架构,可以随着项目需求的增长而扩展。Kafka 强大的消息代理功能确保实时数据从 CSTR 无缝流向 PID 控制器,从而促进反应器条件的持续监控和调整。

选择 Kafka 的主要原因之一是其容错能力和持久性,这保证了在流处理过程中不会丢失任何数据。这种可靠性对于维持最佳反应器条件和确保过程的完整性至关重要。此外,Kafka 处理高吞吐量、低延迟数据流的能力使其成为此类实时应用的理想选择,在这些应用中,及时的数据处理至关重要。

展望未来,如果项目需要扩展,Kafka 的架构允许轻松分区主题。通过增加分区数量,我们可以将数据负载分布在多个消费者之间,从而提高系统的吞吐量和容错能力。这种可扩展性意味着,随着过程复杂性的增加——可能通过添加更多 CSTR 或 PID 控制器——基于 Kafka 的基础设施可以无缝适应以处理增加的数据量和处理需求。

Faust 的优点和缺点

Faust 是一个流处理库,为处理实时数据的开发者提供了显着的优势。我选择使用它是因为它的 Pythonic API 特别有吸引力。我不需要使用通常与 Kafka Streams 一起使用的 Java 或 Scala。此外,Faust 与 Kafka 的集成使创建强大、可扩展且容错的数据管道成为可能,这些管道可以高效地处理高吞吐量数据流。Faust 还支持异步编程,这增强了其并发处理数据的能力,使其适用于需要实时分析、监控和事件驱动处理的应用。Faust 专注于简单易用,这使得流处理应用的快速开发和部署成为可能。

然而,Faust 也有一些局限性。主要缺点之一是,在需要高并发的环境中,Faust 的效率不如 Java 或 Scala 的同类产品(如 Kafka Streams)。此外,Faust 完全由社区支持,导致文档不太全面,社区资源较少,并且新功能和错误修复的开发可能较慢。

考虑其他数据处理工具

虽然我选择 Faust 是因为它易于使用,但其他几种知名的数据处理工具,包括 Spark、Kafka Streams、Bytewax 和 Quix.io,也值得考虑。

  • Kafka Streams,用 Java 和 Scala 编写,经过高度优化以获得性能,并提供与 Kafka 的紧密集成,使其成为企业级流处理应用的强大选择,但代价是需要 Java/Scala 方面的专业知识。
  • Apache Spark,虽然主要以批处理而闻名,但也通过 Structured Streaming 支持流处理,并提供了一个强大的统一引擎,可以处理高性能和可扩展性的批处理和流处理。
  • Bytewax,类似于 Faust,专为 Python 开发者设计,并利用及时的数据流进行实时分析,为流处理提供了更现代和 Python 原生的方法。
  • Quix.io 以其对高频数据流的关注而脱颖而出,提供了一个全面的平台,包括数据摄取、处理和可视化工具。它还包含快速入门模板和插件,以便轻松与 InfluxDB 集成。

每种工具都有其自身的优势和权衡,因此选择取决于具体的项目需求、现有基础设施和团队的专业知识。

运行示例

要自己运行示例,请按照以下步骤操作

  1. 在此处创建一个 InfluxDB Cloud v3 帐户
  2. 创建一个 bucket,名为 “CSTR”
  3. 创建一个令牌
  4. 编辑 telegraf.conf 以包含您的 InfluxDB 凭据
  5. 运行 docker-compose up -d

现在,您应该能够在 InfluxDB 中可视化您的 CSTR 数据。在下面的屏幕截图中,我们使用 SQL 查询您的 Ca 和 T 数据。

未来

此数字孪生模型模拟了一个完美的 CSTR,并为此做了很多假设,包括

  • 完全混合:反应器中的内容物完全混合,这意味着浓度和温度在整个反应器中是均匀的。
  • 反应器体积恒定。
  • 稳态流速:进料和浓度速率恒定。
  • 一级化学反应动力学。
  • 反应器和冷却套之间瞬时传热。
  • 此外,还有许多我不会列出的条件。

这里的含义使我们能够创建我们可以实际求解的 ODE。不幸的是,在现实世界中,这些假设并非总是有效。具体来说,cstr_model.py 中的 simulate_cstr 函数使用接收到的控制输入 u 以及当前的 CaT 值来预测下一个值。然后,函数 odeint 在一个小的时步内积分微分方程,以计算新的浓度和温度。在现实世界的示例中,我们可能没有奢侈地应用这些简化假设,并使用一系列 ODE 轻松预测下一个值。在这种用例中,可以使用其他预测模型来预测下一个值。

在即将发布的博客文章中,我将重点介绍如何利用一些预测模型来预测 Ca 和 T 值,并将这些预测纳入其中,为 CSTR 和 PID 控制器创建更好的模型。

结论

这篇博客文章描述了我们如何使用 Kafka 和 Faust 作为 PID 控制器来制作 CSTR 的数字孪生,同时将温度数据存储在 InfluxDB 中。我们使用 Telegraf 从 Kafka 主题获取数据,并使用 Grafana 可视化结果。与往常一样,在此处开始使用 InfluxDB v3 Cloud。在下一篇文章中,我们将介绍如何运行该项目,深入研究架构和逻辑,并讨论所选堆栈的一些优点和缺点。如果您需要帮助,请在我们的社区网站Slack 频道上联系我们。如果您也在使用 InfluxDB 进行数据处理项目,我很乐意收到您的来信!