PID 控制器和 InfluxDB:第二部分 - 数字孪生
作者 Anais Dotis-Georgiou / 开发者
2024 年 8 月 29 日
导航至
在之前的文章中,我们描述了一个 CSTR 和 PID 控制器。本文将介绍来自项目仓库的数字孪生的代码和架构。该项目利用 Kafka 进行数据流处理,Faust 进行数据处理,InfluxDB 存储时间序列数据,Telegraf 将数据从主题写入 InfluxDB。我们还将讨论此堆栈的优缺点。本文对应的仓库可以在这里找到。
本文描述的项目架构图。CSTR 的数字孪生产生数据,由 Kafka 收集。Faust 充当 PID 控制器。Telegraf 订阅 Kafka 主题,将传感器和控制数据发送到 InfluxDB。反应器和冷却夹套由 Grafana 监控。
数字孪生逻辑架构
我们将重点关注 cstr_kafka_influxdb 目录中的登录。此目录包含使用 InfluxDB、Kafka、Telegraf 和 Python 模型 CSTR 的示例。两个脚本 cstr_model.py
和 pid_controller.py
分别负责模拟 CSTR 和 PID 控制器。高级架构如下所示: 结果是一个 CSTR 数字孪生,模拟在设定的温度设定点条件下执行反应。我们生成以下数据:
CSTR 数字孪生的结果。顶部图表显示了反应物 A (Ca) 随时间的浓度,中间图表显示了反应器温度 (T) 和设定点,底部图表显示了冷却夹套的温度 (u)。
cstr_model.py 解释
此脚本模拟连续搅拌罐反应器 (CSTR) 并处理来自 PID 控制器的输入控制信号。脚本包含以下部分
导入和应用程序配置
import faust
import numpy as np
from scipy.integrate import odeint
app = faust.App(
'cstr_model',
broker='kafka://127.0.0.1:9092',
store='memory://',
value_serializer='json',
web_port=6066
)
cstr_topic = app.topic('cstr')
pid_control_topic = app.topic('pid_control')
定义了两个 Kafka 主题:cstr
(用于发送 CSTR 状态)和 pid_control
(用于接收控制信号)。
初始值和常量
# The process count and iterations is used to stop the number of times values are produced and consumed from the respective topics.
process_count = 0
max_iterations = 300
# Tf is the temperature of the feed. This is assumed to be a constant.
Tf = 350
# Caf is the concentration of the reactant A in the feed. This is assumed to be a constant.
Caf = 1
# ts represents the time duration between points. Its assumed that all time is regular here.
ts = [0,0.03333]
# initial_Ca and initial T are the initial values of the concentration of reactant A and the temperature inside the reactor, respectively.
initial_Ca = 0.87725294608097
initial_T = 324.475443431599
CSTR 模型函数
定义了模型 CSTR 的微分方程。
def cstr_model_func(x, t, u, Tf, Caf):
Ca, T = x
q = 100
V = 100
rho = 1000
Cp = 0.239
mdelH = 5e4
EoverR = 8750
k0 = 7.2e10
UA = 5e4
rA = k0 * np.exp(-EoverR / T) * Ca
dCadt = q / V * (Caf - Ca) - rA
dTdt = q / V * (Tf - T) + mdelH / (rho * Cp) * rA + UA / V / rho / Cp * (u - T)
return [dCadt, dTdt]`
模拟函数
此函数将用于从 pid_control
主题中消费 u
值。该函数还使用 odeint
对微分方程进行积分并模拟下一时间戳的CSTR。这些 new_Ca
和 new_T
值将被发送到 cstr
主题,以便PID控制器可以评估下一个 u
值。
def simulate_cstr(Ca, T, ts, u, Tf, Caf):
x0 = [Ca, T]
y = odeint(cstr_model_func, x0, ts, args=(u, Tf, Caf))
new_Ca = y[-1][0]
new_T = y[-1][1]
return new_Ca, new_T
定义Faust代理
接下来,我们定义代理和函数,这些代理和函数用于监听主题中的值,并发送 new_Ca 和 new_T 值以及我们的初始值。
@app.agent(cstr_topic)
async def cstr(cstr):
global process_count
async for Ca_T_values in cstr:
Ca = Ca_T_values.get('Ca')
T = Ca_T_values.get('T')
print(f"[model] cstr func")
print(f"[model] Received Ca cstr: {Ca}, T: {T}")
@app.agent(pid_control_topic)
async def consume_u(events):
global process_count
print("[model] Starting PID control loop")
initial_values = {
'Ca': initial_Ca,
'T': initial_T`
}`
print(f"[model] Sending initial values: Ca: {initial_Ca}, T: {initial_T}")
await cstr_topic.send(value=initial_values)
最后,我们处理来自 pid_control
Kafka 主题的传入事件,以获取 u
(冷却套温度)的新值,解算常微分方程以获得下一个 Ca
和 T
值,并将结果发送到 cstr
Kafka 主题。
async for event in events:
print(f"[model] Received event: {event}")
u = event.get('u')
Ca = event.get('Ca')
T = event.get('T')
print(f"[model] Into simulate_cstr u: {u}, Into simulate_cstr Ca: {Ca}, Into simulate_cstr T: {T}")
if u is not None and Ca is not None and T is not None:
new_Ca, new_T = simulate_cstr(Ca, T, ts, u, Tf, Caf)
new_values = {
'Ca': new_Ca,
'T': new_T,
}
print(f"[model] consume sent")
print(f"[model] Received u: {u}, Computed new Ca: {new_Ca}, new T: {new_T}")
await cstr_topic.send(value=new_values)
pid_controller.py 说明
从 Kafka 的角度来看,此脚本基本上与 cstr_model.py
脚本相同,只是它监听来自 cstr
主题的值,并计算要发送到 pid_control
主题的 u
值。
唯一的真正区别是PID控制器的逻辑,该逻辑由该函数提供
def pid_control(T_ss, u_ss, ts, Tf, Caf, Ca, T, sp, ie_previous):
"""Compute the u value based on PID control."""
delta_t = ts[1] - ts[0]
e = sp - T
if process_count >= 1:
ie = ie_previous + e * delta_t
else:
ie = 0.0
P = Kc * e
I = Kc / tauI * ie
print(f"[pid] delta_t: {delta_t}, e: {e}, ie_previous: {ie_previous}, ie: {ie}") # Debugging print
op = u_ss + P + I
# Upper and Lower limits on OP
op_hi = 350.0
op_lo = 250.0
if op > op_hi:
op = op_hi
ie = ie - e * delta_t
if op < op_lo:
op = op_lo
ie = ie - e * delta_t
u = op
return u, ie
此函数返回冷却套温度以及误差项的积分。IE是PID控制器的一个关键组件。它是PID算法中的积分控制动作的一部分,有助于消除稳态误差并随着时间的推移提高系统精度。
使用 Telegraf 将数据写入 InfluxDB v3
使用 Kafka Consumer 输入插件 从正确的主题获取数据并将其发送到 InfluxDB。配置文件如下所示
TOML or bash
[agent]
interval = "10s"
round_interval = true
debug = true
[[inputs.kafka_consumer]]
brokers = ["kafka:9092"]
topics = ["pid_control"]
data_format = "json_v2"
[[inputs.kafka_consumer.json_v2]]
measurement_name = "CSTR_data"
[[inputs.kafka_consumer.json_v2.field]]
path = "Ca"
type = "float"
[[inputs.kafka_consumer.json_v2.field]]
path = "T"
type = "float"
[[inputs.kafka_consumer.json_v2.field]]
path = "u"
type = "float"
[[inputs.kafka_consumer.json_v2.field]]
path = "setpoint"
type = "float"
[[inputs.kafka_consumer.json_v2.field]]
path = "ie"
type = "float"
[[outputs.file]]
files = ["stdout"]
[[outputs.influxdb_v2]]
urls = ["https://us-east-1-1.aws.cloud2.influxdata.com/"]
token = "xxx"
organization = "xxx"
bucket = "CSTR"`
本质上,我们只是订阅了 pid_control
主题,并收集了其中写入的所有计算值。
使用 Grafana 监控化学反应器
现在,我们的数字孪生将反应器温度、浓度、冷却套温度和设定点数据写入 InfluxDB,我们可以使用 Grafana 构建仪表板和警报来监控我们的CSTR。您可以使用 Grafana InfluxDB v3 数据源 创建此类可视化
Grafana中的CSTR和冷却套数据仪表板。
右下角的设定点和反应器温度图表显示,冷却套和控制器成功帮助反应器温度保持在附近并达到设定点。
为什么使用 Kafka?
在我的当前项目中,我选择 Kafka 作为 CSTR 和 PID 控制器之间实时数据流和处理的骨干。尽管每个主题(cstr
和 pid_control
)仅使用一个分区,但 Kafka 提供了一个高度可靠和可扩展的架构,可以根据项目需求扩展。Kafka 的强大消息代理功能确保实时数据从 CSTR 无缝流向 PID 控制器,从而促进对反应器条件的持续监控和调整。
选择 Kafka 的主要原因之一是其容错性和持久性,这保证了在流过程中不会丢失任何数据。这种可靠性对于维持最佳反应器条件并确保工艺的完整性至关重要。此外,Kafka 处理高吞吐量、低延迟数据流的能力使其非常适合此类实时应用,其中及时数据处理至关重要。
展望未来,如果项目需要扩展,Kafka的架构允许轻松地对主题进行分区。通过增加分区数量,我们可以将数据负载分配到多个消费者,提高系统的吞吐量和容错性。这种可扩展性意味着随着流程复杂性的增长——比如增加更多的CSTR或PID控制器——基于Kafka的基础设施可以无缝适应处理增加的数据量和处理需求。
Faust的优缺点
Faust是一个流处理库,为与实时数据工作的开发者提供了显著的优势。我选择使用它,因为它的Pythonic API使其特别吸引人。我无需使用Java或Scala,这两者通常与Kafka Streams一起使用。此外,Faust与Kafka的集成使得创建健壮、可扩展和容错的数据管道成为可能,这些管道可以高效地处理高吞吐量的数据流。Faust还支持异步编程,这增强了其并行处理数据的能力,使其适用于需要实时分析、监控和事件驱动处理的应用。Faust注重简单易用,这有助于快速开发和部署流应用程序。
然而,Faust也有一些局限性。主要缺点是,在需要高并发的环境中,Faust比Java或Scala的对应产品(如Kafka Streams)效率较低。此外,Faust完全是社区支持的,导致文档较少、社区资源较少,并且新特性和错误修复的开发可能较慢。
考虑其他数据处理工具
虽然我选择Faust因为它易于使用,但还有其他一些知名的数据处理工具,包括Spark、Kafka Streams、Bytewax和Quix.io,也值得考虑。
- Kafka Streams是用Java和Scala编写的,针对性能进行了高度优化,并与Kafka紧密集成,这使得它成为企业级流应用的稳健选择,但代价是需要Java/Scala的专家知识。
- Apache Spark虽然主要以其批处理功能而闻名,但也支持通过Structured Streaming进行流处理,并提供了一个强大的、统一的引擎,它可以以高性能和可扩展性处理批处理和流处理。
- Bytewax类似于Faust,是为Python开发者设计的,利用及时数据流进行实时分析,为流处理提供了一种更现代、更原生的方法。
- Quix.io专注于高频数据流,提供了一个综合平台,包括数据摄取、处理和可视化工具。它还包含快速入门模板和插件,便于与InfluxDB集成。
每个工具都有其自身优点和权衡,因此选择取决于具体的项目需求、现有基础设施以及团队的专业知识。
运行示例
要自己运行示例,请按照以下步骤操作
- 在此处创建InfluxDB Cloud v3账户
- 创建一个名为“CSTR”的桶
- 创建一个令牌
- 编辑telegraf.conf以包含您的InfluxDB凭据
- 运行
docker-compose up -d
现在,您应该能够在InfluxDB中可视化您的CSTR数据。在下图截图中,我们使用SQL查询您的Ca和T数据。
未来
这个数字孪生模型了一个完美的CSTR,并为此做出了很多假设,包括
- 完美的混合:反应器的内容被完全混合,这意味着浓度和温度在整个反应器中是均匀的。
- 反应器的体积恒定。
- 稳态流速:进料和浓度率是恒定的。
- 一级化学反应动力学。
- 反应器和冷却夹套之间的瞬间传热。
- 还有更多条件,这里不一一列举。
这些假设的影响使我们能够创建可以解决的常微分方程(ODE)。不幸的是,在现实世界中,这些假设并不总是有效的。具体来说,simulate_cstr
函数在cstr_model.py
中使用接收到的控制输入u
和当前值Ca
和T
来预测下一个值。然后,函数odeint
在微小的时间步长上积分微分方程,以计算新的浓度和温度。在现实世界的例子中,我们可能会有应用这些简化假设并使用一系列ODE轻松预测下一个值的奢侈。在这种情况下,可以使用其他预测模型来预测下一个值。
在即将发表的博客文章中,我将重点介绍如何利用一些预测模型来预测Ca
和T
值,并将这些预测纳入创建更好的CSTR和PID控制器的模型中。
结论
本博客文章描述了如何使用Kafka和Faust作为PID控制器创建CSTR的数字孪生,同时将温度数据存储在InfluxDB中。我们使用Telegraf从Kafka主题获取数据,并使用Grafana可视化结果。一如既往,您可以在InfluxDB v3 Cloud开始使用。在下一篇文章中,我们将介绍如何运行项目,深入了解架构和逻辑,并讨论所选堆栈的一些优缺点。如果您需要帮助,请通过我们的社区网站或Slack频道联系我们。如果您也在使用InfluxDB进行数据处理项目,我非常乐意听到您的声音!