MQTT 和 TimescaleDB 集成

强大的性能和简单的集成,由 InfluxData 构建的开源数据连接器 Telegraf 提供支持。

info

对于大规模实时查询,这不是推荐的配置。为了进行查询和压缩优化、高速摄取和高可用性,您可能需要考虑MQTT 和 InfluxDB

50 亿+

Telegraf 下载量

#1

时间序列数据库
来源:DB Engines

10 亿+

InfluxDB 下载量

2,800+

贡献者

目录

强大性能,无限扩展

收集、组织和处理海量高速数据。当您将任何数据视为时间序列数据时,它都更有价值。使用 InfluxDB,这是排名第一的时间序列平台,旨在与 Telegraf 一起扩展。

查看入门方法

输入和输出集成概述

MQTT Telegraf 插件旨在从指定的 MQTT 主题读取数据并创建指标,使用户能够利用 MQTT 进行实时数据收集和监控。

此输出插件提供了一种可靠而高效的机制,用于将 Telegraf 收集的指标直接路由到 TimescaleDB 中。通过利用 PostgreSQL 的强大生态系统以及 TimescaleDB 的时间序列优化,它支持高性能数据摄取和高级查询功能。

集成详情

MQTT

MQTT 插件允许从指定的 MQTT 主题读取指标,并使用支持的输入数据格式创建指标。此插件作为服务输入运行,它监听传入的指标或事件,而不是像普通插件那样按设定的间隔收集它们。该插件的灵活性通过支持各种代理 URL、主题和连接功能(包括服务质量 (QoS) 级别和持久会话)得到增强。其配置选项包含全局设置,可修改指标并有效处理启动错误。它还支持密钥存储配置,以保护用户名和密码选项,确保与 MQTT 服务器的安全连接。

TimescaleDB

TimescaleDB 是一个开源时间序列数据库,构建为 PostgreSQL 的扩展,旨在高效处理大规模、面向时间的数据。TimescaleDB 于 2017 年推出,是为了响应对能够管理海量数据、具有高插入率和复杂查询的强大、可扩展解决方案日益增长的需求。通过利用 PostgreSQL 熟悉的 SQL 接口并通过专门的时间序列功能对其进行增强,TimescaleDB 迅速在希望将时间序列功能集成到现有关系数据库中的开发人员中流行起来。其混合方法允许用户受益于 PostgreSQL 的灵活性、可靠性和生态系统,同时为时间序列数据提供优化的性能。

该数据库在需要快速摄取数据点并对历史时期进行复杂分析查询的环境中尤其有效。TimescaleDB 具有许多创新功能,例如将数据透明地分区为可管理块的超表和内置的连续聚合。这些功能可以显着提高查询速度和资源效率。

配置

MQTT


[[inputs.mqtt_consumer]]
  servers = ["tcp://127.0.0.1:1883"]
  topics = [
    "telegraf/host01/cpu",
    "telegraf/+/mem",
    "sensors/#",
  ]
  # topic_tag = "topic"
  # qos = 0
  # connection_timeout = "30s"
  # keepalive = "60s"
  # ping_timeout = "10s"
  # max_undelivered_messages = 1000
  # persistent_session = false
  # client_id = ""
  # username = "telegraf"
  # password = "metricsmetricsmetricsmetrics"
  # tls_ca = "/etc/telegraf/ca.pem"
  # tls_cert = "/etc/telegraf/cert.pem"
  # tls_key = "/etc/telegraf/key.pem"
  # insecure_skip_verify = false
  # client_trace = false
  data_format = "influx"
  # [[inputs.mqtt_consumer.topic_parsing]]
  #   topic = ""
  #   measurement = ""
  #   tags = ""
  #   fields = ""
  #   [inputs.mqtt_consumer.topic_parsing.types]
  #      key = type

TimescaleDB

# Publishes metrics to a TimescaleDB database
[[outputs.postgresql]]
  ## Specify connection address via the standard libpq connection string:
  ##   host=... user=... password=... sslmode=... dbname=...
  ## Or a URL:
  ##   postgres://[user[:password]]@localhost[/dbname]?sslmode=[disable|verify-ca|verify-full]
  ## See https://postgresql.ac.cn/docs/current/libpq-connect.html#LIBPQ-CONNSTRING
  ##
  ## All connection parameters are optional. Environment vars are also supported.
  ## e.g. PGPASSWORD, PGHOST, PGUSER, PGDATABASE
  ## All supported vars can be found here:
  ##  https://postgresql.ac.cn/docs/current/libpq-envars.html
  ##
  ## Non-standard parameters:
  ##   pool_max_conns (default: 1) - Maximum size of connection pool for parallel (per-batch per-table) inserts.
  ##   pool_min_conns (default: 0) - Minimum size of connection pool.
  ##   pool_max_conn_lifetime (default: 0s) - Maximum connection age before closing.
  ##   pool_max_conn_idle_time (default: 0s) - Maximum idle time of a connection before closing.
  ##   pool_health_check_period (default: 0s) - Duration between health checks on idle connections.
  # connection = ""

  ## Postgres schema to use.
  # schema = "public"

  ## Store tags as foreign keys in the metrics table. Default is false.
  # tags_as_foreign_keys = false

  ## Suffix to append to table name (measurement name) for the foreign tag table.
  # tag_table_suffix = "_tag"

  ## Deny inserting metrics if the foreign tag can't be inserted.
  # foreign_tag_constraint = false

  ## Store all tags as a JSONB object in a single 'tags' column.
  # tags_as_jsonb = false

  ## Store all fields as a JSONB object in a single 'fields' column.
  # fields_as_jsonb = false

  ## Name of the timestamp column
  ## NOTE: Some tools (e.g. Grafana) require the default name so be careful!
  # timestamp_column_name = "time"

  ## Type of the timestamp column
  ## Currently, "timestamp without time zone" and "timestamp with time zone"
  ## are supported
  # timestamp_column_type = "timestamp without time zone"

  ## Templated statements to execute when creating a new table.
  # create_templates = [
  #   '''CREATE TABLE {{ .table }} ({{ .columns }})''',
  # ]

  ## Templated statements to execute when adding columns to a table.
  ## Set to an empty list to disable. Points containing tags for which there is
  ## no column will be skipped. Points containing fields for which there is no
  ## column will have the field omitted.
  # add_column_templates = [
  #   '''ALTER TABLE {{ .table }} ADD COLUMN IF NOT EXISTS {{ .columns|join ", ADD COLUMN IF NOT EXISTS " }}''',
  # ]

  ## Templated statements to execute when creating a new tag table.
  # tag_table_create_templates = [
  #   '''CREATE TABLE {{ .table }} ({{ .columns }}, PRIMARY KEY (tag_id))''',
  # ]

  ## Templated statements to execute when adding columns to a tag table.
  ## Set to an empty list to disable. Points containing tags for which there is
  ## no column will be skipped.
  # tag_table_add_column_templates = [
  #   '''ALTER TABLE {{ .table }} ADD COLUMN IF NOT EXISTS {{ .columns|join ", ADD COLUMN IF NOT EXISTS " }}''',
  # ]

  ## The postgres data type to use for storing unsigned 64-bit integer values
  ## (Postgres does not have a native unsigned 64-bit integer type).
  ## The value can be one of:
  ##   numeric - Uses the PostgreSQL "numeric" data type.
  ##   uint8 - Requires pguint extension (https://github.com/petere/pguint)
  # uint64_type = "numeric"

  ## When using pool_max_conns > 1, and a temporary error occurs, the query is
  ## retried with an incremental backoff. This controls the maximum duration.
  # retry_max_backoff = "15s"

  ## Approximate number of tag IDs to store in in-memory cache (when using
  ## tags_as_foreign_keys). This is an optimization to skip inserting known
  ## tag IDs. Each entry consumes approximately 34 bytes of memory.
  # tag_cache_size = 100000

  ## Cut column names at the given length to not exceed PostgreSQL's
  ## 'identifier length' limit (default: no limit)
  ## (see https://postgresql.ac.cn/docs/current/limits.html)
  ## Be careful to not create duplicate column names!
  # column_name_length_limit = 0

  ## Enable & set the log level for the Postgres driver.
  # log_level = "warn" # trace, debug, info, warn, error, none

输入和输出集成示例

MQTT

  1. 智能家居监控:使用 MQTT Consumer 插件监控智能家居设置中的各种传感器。在这种情况下,可以将插件配置为订阅不同设备的主题,例如温度、湿度和能耗。通过聚合这些数据,房主可以可视化趋势并接收异常模式的警报,从而提高家庭自动化系统的整体质量和效率。

  2. 物联网环境传感:部署 MQTT Consumer 以收集来自分布在不同位置的传感器的环境数据。例如,这可以包括来自空气质量传感器、温度传感器和噪声水平计的读数。可以将插件配置为从 MQTT 主题中提取相关的标签和字段,从而可以对大规模的环境条件进行详细分析和报告,从而为城市规划或环境倡议提供更好的决策支持。

  3. 实时车辆跟踪和遥测:在车辆遥测系统中集成 MQTT Consumer 插件,该系统实时收集来自各种传感器的数据。借助该插件,与车辆性能、位置和燃油消耗相关的指标可以发送到中央监控仪表板。这种实时遥测数据使车队管理人员能够通过主动数据分析来优化路线、降低燃油成本并改进车辆维护计划。

  4. 农业监控系统:利用此插件收集来自农业传感器的数据,这些传感器监控土壤湿度、作物健康状况和天气状况。MQTT Consumer 可以订阅与农业设备和环境传感器相关的多个主题,使农民能够做出数据驱动的决策,以提高作物产量,同时节约资源,从而提高农业的可持续性。

TimescaleDB

  1. 实时物联网数据摄取:使用该插件实时收集和存储来自数千个物联网设备的传感器数据。此设置有助于立即分析,帮助组织监控运营效率并快速响应不断变化的条件。

  2. 云应用程序性能监控:利用该插件将来自分布式云应用程序的详细性能指标馈送到 TimescaleDB 中。此集成支持实时仪表板和警报,使团队能够快速识别和缓解性能瓶颈。

  3. 历史数据分析和报告:实施一个系统,将长期指标存储在 TimescaleDB 中,以进行全面的历史分析。这种方法使企业能够执行趋势分析、生成详细报告并根据存档的时间序列数据做出数据驱动的决策。

  4. 自适应警报和异常检测:将插件与自动化异常检测工作流程集成。通过将指标持续流式传输到 TimescaleDB,机器学习模型可以分析数据模式,并在发生异常时触发警报,从而提高系统可靠性和主动维护能力。

反馈

感谢您成为我们社区的一份子!如果您有任何一般性反馈或在这些页面上发现任何错误,我们欢迎并鼓励您提供意见。请在 InfluxDB 社区 Slack 中提交您的反馈。

强大性能,无限扩展

收集、组织和处理海量高速数据。当您将任何数据视为时间序列数据时,它都更有价值。使用 InfluxDB,这是排名第一的时间序列平台,旨在与 Telegraf 一起扩展。

查看入门方法

相关集成

HTTP 和 InfluxDB 集成

HTTP 插件从一个或多个 HTTP(S) 端点收集指标。它支持各种身份验证方法和数据格式的配置选项。

查看集成

Kafka 和 InfluxDB 集成

此插件从 Kafka 读取消息,并允许根据这些消息创建指标。它支持各种配置,包括不同的 Kafka 设置和消息处理选项。

查看集成

Kinesis 和 InfluxDB 集成

Kinesis 插件允许从 AWS Kinesis 流中读取指标。它支持多种输入数据格式,并提供带有 DynamoDB 的检查点功能,以实现可靠的消息处理。

查看集成