PID 控制器和 InfluxDB:第 2 部分 - 数字孪生
作者:Anais Dotis-Georgiou / 开发者
2024 年 8 月 29 日
导航至
在之前的文章中,我们描述了 CSTR 和 PID 控制器。这篇文章将介绍此项目仓库中数字孪生的代码和架构。该项目利用 Kafka 进行数据流处理,Faust 进行数据处理,InfluxDB 用于存储时间序列数据,Telegraf 用于将数据从主题写入 InfluxDB。我们还将介绍此堆栈的优点和缺点。这篇博文的相应仓库可以在这里找到。
此博客中描述的项目架构图。CSTR 的数字孪生生成的数据由 Kafka 收集。Faust 充当 PID 控制器。Telegraf 订阅 Kafka 主题以将传感器和控制数据发送到 InfluxDB。反应器和冷却套由 Grafana 监控。
数字孪生逻辑架构
这里我们将重点关注 cstr_kafka_influxdb 目录中的登录。此目录包含如何使用 InfluxDB、Kafka、Telegraf 和 Python 建模 CSTR 的示例。两个脚本 cstr_model.py
和 pid_controller.py
分别负责模拟 CSTR 和 PID 控制器。高级架构如下所示: 结果是一个 CSTR 数字孪生,它模拟在提供的温度设定点条件下执行反应。我们生成的数据如下所示:
CSTR 数字孪生的结果。顶部的图表显示了反应物 A (Ca) 随时间的浓度,中间的图表显示了反应器的温度 (T) 和设定点(设定点),底部的图表显示了冷却套的温度 (u)。
cstr_model.py 解释
此脚本模拟连续搅拌釜式反应器 (CSTR) 并处理来自 PID 控制器的传入控制信号。该脚本包含以下部分
导入和应用配置
import faust
import numpy as np
from scipy.integrate import odeint
app = faust.App(
'cstr_model',
broker='kafka://localhost:9092',
store='memory://',
value_serializer='json',
web_port=6066
)
cstr_topic = app.topic('cstr')
pid_control_topic = app.topic('pid_control')
定义了两个 Kafka 主题:cstr
(用于发送 CSTR 状态)和 pid_control
(用于接收控制信号)。
初始值和常数
# The process count and iterations is used to stop the number of times values are produced and consumed from the respective topics.
process_count = 0
max_iterations = 300
# Tf is the temperature of the feed. This is assumed to be a constant.
Tf = 350
# Caf is the concentration of the reactant A in the feed. This is assumed to be a constant.
Caf = 1
# ts represents the time duration between points. Its assumed that all time is regular here.
ts = [0,0.03333]
# initial_Ca and initial T are the initial values of the concentration of reactant A and the temperature inside the reactor, respectively.
initial_Ca = 0.87725294608097
initial_T = 324.475443431599
CSTR 模型函数
定义了模拟 CSTR 的微分方程。
def cstr_model_func(x, t, u, Tf, Caf):
Ca, T = x
q = 100
V = 100
rho = 1000
Cp = 0.239
mdelH = 5e4
EoverR = 8750
k0 = 7.2e10
UA = 5e4
rA = k0 * np.exp(-EoverR / T) * Ca
dCadt = q / V * (Caf - Ca) - rA
dTdt = q / V * (Tf - T) + mdelH / (rho * Cp) * rA + UA / V / rho / Cp * (u - T)
return [dCadt, dTdt]`
模拟函数
此函数将用于从 pid_control
主题中消耗 u
值。该函数还使用 odeint
来积分微分方程并在下一个时间戳模拟 CSTR。这些 new_Ca
和 new_T
值被发送到 cstr
主题,以便 PID 控制器可以评估下一个 u
值。
def simulate_cstr(Ca, T, ts, u, Tf, Caf):
x0 = [Ca, T]
y = odeint(cstr_model_func, x0, ts, args=(u, Tf, Caf))
new_Ca = y[-1][0]
new_T = y[-1][1]
return new_Ca, new_T
定义 Faust 代理
接下来,我们定义代理和函数,它们监听来自主题的值,并发送 new_Ca 和 new_T 值以及我们的初始值
@app.agent(cstr_topic)
async def cstr(cstr):
global process_count
async for Ca_T_values in cstr:
Ca = Ca_T_values.get('Ca')
T = Ca_T_values.get('T')
print(f"[model] cstr func")
print(f"[model] Received Ca cstr: {Ca}, T: {T}")
@app.agent(pid_control_topic)
async def consume_u(events):
global process_count
print("[model] Starting PID control loop")
initial_values = {
'Ca': initial_Ca,
'T': initial_T`
}`
print(f"[model] Sending initial values: Ca: {initial_Ca}, T: {initial_T}")
await cstr_topic.send(value=initial_values)
最后,我们处理来自 pid_control
Kafka 主题的传入事件,以获取 u
(冷却套温度)的新值,求解 ODE 以获得下一个 Ca
和 T
值,并将结果发送到 cstr
Kafka 主题。
async for event in events:
print(f"[model] Received event: {event}")
u = event.get('u')
Ca = event.get('Ca')
T = event.get('T')
print(f"[model] Into simulate_cstr u: {u}, Into simulate_cstr Ca: {Ca}, Into simulate_cstr T: {T}")
if u is not None and Ca is not None and T is not None:
new_Ca, new_T = simulate_cstr(Ca, T, ts, u, Tf, Caf)
new_values = {
'Ca': new_Ca,
'T': new_T,
}
print(f"[model] consume sent")
print(f"[model] Received u: {u}, Computed new Ca: {new_Ca}, new T: {new_T}")
await cstr_topic.send(value=new_values)
pid_controller.py 解释
从 Kafka 的角度来看,此脚本的工作方式与 cstr_model.py
脚本大致相同,不同之处在于它监听来自 cstr
主题的值并计算 u
值,然后将其发送到 pid_control
主题。
唯一真正的区别是 PID 控制器的逻辑,它由此函数提供
def pid_control(T_ss, u_ss, ts, Tf, Caf, Ca, T, sp, ie_previous):
"""Compute the u value based on PID control."""
delta_t = ts[1] - ts[0]
e = sp - T
if process_count >= 1:
ie = ie_previous + e * delta_t
else:
ie = 0.0
P = Kc * e
I = Kc / tauI * ie
print(f"[pid] delta_t: {delta_t}, e: {e}, ie_previous: {ie_previous}, ie: {ie}") # Debugging print
op = u_ss + P + I
# Upper and Lower limits on OP
op_hi = 350.0
op_lo = 250.0
if op > op_hi:
op = op_hi
ie = ie - e * delta_t
if op < op_lo:
op = op_lo
ie = ie - e * delta_t
u = op
return u, ie
此函数返回冷却套温度以及误差项的积分。IE 是 PID 控制器的关键组件。它是 PID 算法中积分控制动作的一部分,有助于消除稳态误差并提高系统随时间的精度。
使用 Telegraf 将数据写入 InfluxDB v3
Kafka Consumer Input Plugin 用于从正确的主题获取数据并将其发送到 InfluxDB。配置文件如下所示
TOML or bash
[agent]
interval = "10s"
round_interval = true
debug = true
[[inputs.kafka_consumer]]
brokers = ["kafka:9092"]
topics = ["pid_control"]
data_format = "json_v2"
[[inputs.kafka_consumer.json_v2]]
measurement_name = "CSTR_data"
[[inputs.kafka_consumer.json_v2.field]]
path = "Ca"
type = "float"
[[inputs.kafka_consumer.json_v2.field]]
path = "T"
type = "float"
[[inputs.kafka_consumer.json_v2.field]]
path = "u"
type = "float"
[[inputs.kafka_consumer.json_v2.field]]
path = "setpoint"
type = "float"
[[inputs.kafka_consumer.json_v2.field]]
path = "ie"
type = "float"
[[outputs.file]]
files = ["stdout"]
[[outputs.influxdb_v2]]
urls = ["https://us-east-1-1.aws.cloud2.influxdata.com/"]
token = "xxx"
organization = "xxx"
bucket = "CSTR"`
本质上,我们只是订阅 pid_control
主题并收集所有在那里写入的计算值。
使用 Grafana 监控化学反应器
既然我们的数字孪生正在将反应器温度、浓度、冷却套温度和设定点数据写入 InfluxDB,我们可以使用 Grafana 构建仪表板和警报来监控我们的 CSTR。您可以使用 Grafana InfluxDB v3 数据源来制作这样的可视化效果
Grafana 中 CSTR 和冷却套数据的仪表板。
反应器设定点和温度的右下图显示,冷却套和控制器成功帮助反应器温度保持接近并达到设定点。
为什么使用 Kafka?
在我当前的项目中,我选择 Kafka 作为 CSTR 和 PID 控制器之间实时数据流处理的骨干。尽管每个主题 (cstr
和 pid_control
) 仅使用单个分区,但 Kafka 提供了高度可靠且可扩展的架构,可以随着项目需求的增长而扩展。Kafka 强大的消息代理功能确保实时数据从 CSTR 无缝流向 PID 控制器,从而促进反应器条件的持续监控和调整。
选择 Kafka 的主要原因之一是其容错能力和持久性,这保证了在流处理过程中不会丢失任何数据。这种可靠性对于维持最佳反应器条件和确保过程的完整性至关重要。此外,Kafka 处理高吞吐量、低延迟数据流的能力使其成为此类实时应用的理想选择,在这些应用中,及时的数据处理至关重要。
展望未来,如果项目需要扩展,Kafka 的架构允许轻松分区主题。通过增加分区数量,我们可以将数据负载分布在多个消费者之间,从而提高系统的吞吐量和容错能力。这种可扩展性意味着,随着过程复杂性的增加——可能通过添加更多 CSTR 或 PID 控制器——基于 Kafka 的基础设施可以无缝适应以处理增加的数据量和处理需求。
Faust 的优点和缺点
Faust 是一个流处理库,为处理实时数据的开发者提供了显着的优势。我选择使用它是因为它的 Pythonic API 特别有吸引力。我不需要使用通常与 Kafka Streams 一起使用的 Java 或 Scala。此外,Faust 与 Kafka 的集成使创建强大、可扩展且容错的数据管道成为可能,这些管道可以高效地处理高吞吐量数据流。Faust 还支持异步编程,这增强了其并发处理数据的能力,使其适用于需要实时分析、监控和事件驱动处理的应用。Faust 专注于简单易用,这使得流处理应用的快速开发和部署成为可能。
然而,Faust 也有一些局限性。主要缺点之一是,在需要高并发的环境中,Faust 的效率不如 Java 或 Scala 的同类产品(如 Kafka Streams)。此外,Faust 完全由社区支持,导致文档不太全面,社区资源较少,并且新功能和错误修复的开发可能较慢。
考虑其他数据处理工具
虽然我选择 Faust 是因为它易于使用,但其他几种知名的数据处理工具,包括 Spark、Kafka Streams、Bytewax 和 Quix.io,也值得考虑。
- Kafka Streams,用 Java 和 Scala 编写,经过高度优化以获得性能,并提供与 Kafka 的紧密集成,使其成为企业级流处理应用的强大选择,但代价是需要 Java/Scala 方面的专业知识。
- Apache Spark,虽然主要以批处理而闻名,但也通过 Structured Streaming 支持流处理,并提供了一个强大的统一引擎,可以处理高性能和可扩展性的批处理和流处理。
- Bytewax,类似于 Faust,专为 Python 开发者设计,并利用及时的数据流进行实时分析,为流处理提供了更现代和 Python 原生的方法。
- Quix.io 以其对高频数据流的关注而脱颖而出,提供了一个全面的平台,包括数据摄取、处理和可视化工具。它还包含快速入门模板和插件,以便轻松与 InfluxDB 集成。
每种工具都有其自身的优势和权衡,因此选择取决于具体的项目需求、现有基础设施和团队的专业知识。
运行示例
要自己运行示例,请按照以下步骤操作
- 在此处创建一个 InfluxDB Cloud v3 帐户
- 创建一个 bucket,名为 “CSTR”
- 创建一个令牌
- 编辑 telegraf.conf 以包含您的 InfluxDB 凭据
- 运行
docker-compose up -d
现在,您应该能够在 InfluxDB 中可视化您的 CSTR 数据。在下面的屏幕截图中,我们使用 SQL 查询您的 Ca 和 T 数据。
未来
此数字孪生模型模拟了一个完美的 CSTR,并为此做了很多假设,包括
- 完全混合:反应器中的内容物完全混合,这意味着浓度和温度在整个反应器中是均匀的。
- 反应器体积恒定。
- 稳态流速:进料和浓度速率恒定。
- 一级化学反应动力学。
- 反应器和冷却套之间瞬时传热。
- 此外,还有许多我不会列出的条件。
这里的含义使我们能够创建我们可以实际求解的 ODE。不幸的是,在现实世界中,这些假设并非总是有效。具体来说,cstr_model.py
中的 simulate_cstr
函数使用接收到的控制输入 u
以及当前的 Ca
和 T
值来预测下一个值。然后,函数 odeint
在一个小的时步内积分微分方程,以计算新的浓度和温度。在现实世界的示例中,我们可能没有奢侈地应用这些简化假设,并使用一系列 ODE 轻松预测下一个值。在这种用例中,可以使用其他预测模型来预测下一个值。
在即将发布的博客文章中,我将重点介绍如何利用一些预测模型来预测 Ca 和 T 值,并将这些预测纳入其中,为 CSTR 和 PID 控制器创建更好的模型。
结论
这篇博客文章描述了我们如何使用 Kafka 和 Faust 作为 PID 控制器来制作 CSTR 的数字孪生,同时将温度数据存储在 InfluxDB 中。我们使用 Telegraf 从 Kafka 主题获取数据,并使用 Grafana 可视化结果。与往常一样,在此处开始使用 InfluxDB v3 Cloud。在下一篇文章中,我们将介绍如何运行该项目,深入研究架构和逻辑,并讨论所选堆栈的一些优点和缺点。如果您需要帮助,请在我们的社区网站或 Slack 频道上联系我们。如果您也在使用 InfluxDB 进行数据处理项目,我很乐意收到您的来信!