NATS 和 InfluxDB 集成

强大的性能和简单的集成,由 Telegraf 提供支持,Telegraf 是 InfluxData 构建的开源数据连接器。

50亿+

Telegraf 下载量

#1

时间序列数据库
来源:DB Engines

10亿+

InfluxDB 下载量

2,800+

贡献者

目录

强大性能,无限扩展

收集、组织和处理海量高速数据。当您将其视为时间序列数据时,任何数据都更有价值。借助 InfluxDB,第一的时间序列平台,它与 Telegraf 一起构建以实现扩展。

查看入门方法

输入和输出集成概述

NATS Consumer Input Plugin 实现了从 NATS 消息主题实时数据消费,无缝集成到 Telegraf 数据管道中,用于监控和指标收集。

InfluxDB 插件将指标写入 InfluxDB HTTP 服务,从而可以高效地存储和检索时间序列数据。

集成详情

NATS

NATS Consumer 插件允许 Telegraf 从指定的 NATS 主题读取指标,并基于支持的输入数据格式创建指标。利用队列组允许多个 Telegraf 实例并行地从 NATS 集群读取数据,从而提高吞吐量和可靠性。此插件还支持各种身份验证方法,包括用户名/密码、NATS 凭据文件和 nkey 种子文件,确保与 NATS 服务器的安全通信。由于 JetStream 等功能有助于消费历史消息,因此该插件在数据持久性和消息可靠性至关重要的环境中尤其有用。此外,配置各种操作参数的能力使此插件适用于高吞吐量场景,同时保持性能完整性。

InfluxDB

InfluxDB Telegraf 插件用于将指标发送到 InfluxDB HTTP API,从而以结构化方式促进时间序列数据的存储和查询。此插件与 InfluxDB 无缝集成,提供诸如基于令牌的身份验证和对多个 InfluxDB 集群节点的支持等基本功能,确保可靠且可扩展的数据摄取。通过其可配置性,用户可以指定诸如组织、目标存储桶和 HTTP 特定设置等选项,从而灵活地定制数据的发送和存储方式。该插件还支持敏感数据的密钥管理,从而增强了生产环境中的安全性。在实时分析和时间序列数据存储至关重要的现代可观测性堆栈中,此插件尤其有利。

配置

NATS

[[inputs.nats_consumer]]
  ## urls of NATS servers
  servers = ["nats://localhost:4222"]

  ## subject(s) to consume
  ## If you use jetstream you need to set the subjects
  ## in jetstream_subjects
  subjects = ["telegraf"]

  ## jetstream subjects
  ## jetstream is a streaming technology inside of nats.
  ## With jetstream the nats-server persists messages and
  ## a consumer can consume historical messages. This is
  ## useful when telegraf needs to restart it don't miss a
  ## message. You need to configure the nats-server.
  ## https://docs.nats.io/nats-concepts/jetstream.
  jetstream_subjects = ["js_telegraf"]

  ## name a queue group
  queue_group = "telegraf_consumers"

  ## Optional authentication with username and password credentials
  # username = ""
  # password = ""

  ## Optional authentication with NATS credentials file (NATS 2.0)
  # credentials = "/etc/telegraf/nats.creds"

  ## Optional authentication with nkey seed file (NATS 2.0)
  # nkey_seed = "/etc/telegraf/seed.txt"

  ## Use Transport Layer Security
  # secure = false

  ## Optional TLS Config
  # tls_ca = "/etc/telegraf/ca.pem"
  # tls_cert = "/etc/telegraf/cert.pem"
  # tls_key = "/etc/telegraf/key.pem"
  ## Use TLS but skip chain & host verification
  # insecure_skip_verify = false

  ## Sets the limits for pending msgs and bytes for each subscription
  ## These shouldn't need to be adjusted except in very high throughput scenarios
  # pending_message_limit = 65536
  # pending_bytes_limit = 67108864

  ## Max undelivered messages
  ## This plugin uses tracking metrics, which ensure messages are read to
  ## outputs before acknowledging them to the original broker to ensure data
  ## is not lost. This option sets the maximum messages to read from the
  ## broker that have not been written by an output.
  ##
  ## This value needs to be picked with awareness of the agent's
  ## metric_batch_size value as well. Setting max undelivered messages too high
  ## can result in a constant stream of data batches to the output. While
  ## setting it too low may never flush the broker's messages.
  # max_undelivered_messages = 1000

  ## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "influx"

InfluxDB

[[outputs.influxdb]]
  ## The full HTTP or UDP URL for your InfluxDB instance.
  ##
  ## Multiple URLs can be specified for a single cluster, only ONE of the
  ## urls will be written to each interval.
  # urls = ["unix:///var/run/influxdb.sock"]
  # urls = ["udp://127.0.0.1:8089"]
  # urls = ["http://127.0.0.1:8086"]

  ## Local address to bind when connecting to the server
  ## If empty or not set, the local address is automatically chosen.
  # local_address = ""

  ## The target database for metrics; will be created as needed.
  ## For UDP url endpoint database needs to be configured on server side.
  # database = "telegraf"

  ## The value of this tag will be used to determine the database.  If this
  ## tag is not set the 'database' option is used as the default.
  # database_tag = ""

  ## If true, the 'database_tag' will not be included in the written metric.
  # exclude_database_tag = false

  ## If true, no CREATE DATABASE queries will be sent.  Set to true when using
  ## Telegraf with a user without permissions to create databases or when the
  ## database already exists.
  # skip_database_creation = false

  ## Name of existing retention policy to write to.  Empty string writes to
  ## the default retention policy.  Only takes effect when using HTTP.
  # retention_policy = ""

  ## The value of this tag will be used to determine the retention policy.  If this
  ## tag is not set the 'retention_policy' option is used as the default.
  # retention_policy_tag = ""

  ## If true, the 'retention_policy_tag' will not be included in the written metric.
  # exclude_retention_policy_tag = false

  ## Write consistency (clusters only), can be: "any", "one", "quorum", "all".
  ## Only takes effect when using HTTP.
  # write_consistency = "any"

  ## Timeout for HTTP messages.
  # timeout = "5s"

  ## HTTP Basic Auth
  # username = "telegraf"
  # password = "metricsmetricsmetricsmetrics"

  ## HTTP User-Agent
  # user_agent = "telegraf"

  ## UDP payload size is the maximum packet size to send.
  # udp_payload = "512B"

  ## Optional TLS Config for use on HTTP connections.
  # tls_ca = "/etc/telegraf/ca.pem"
  # tls_cert = "/etc/telegraf/cert.pem"
  # tls_key = "/etc/telegraf/key.pem"
  ## Use TLS but skip chain & host verification
  # insecure_skip_verify = false

  ## HTTP Proxy override, if unset values the standard proxy environment
  ## variables are consulted to determine which proxy, if any, should be used.
  # http_proxy = "http://corporate.proxy:3128"

  ## Additional HTTP headers
  # http_headers = {"X-Special-Header" = "Special-Value"}

  ## HTTP Content-Encoding for write request body, can be set to "gzip" to
  ## compress body or "identity" to apply no encoding.
  # content_encoding = "gzip"

  ## When true, Telegraf will output unsigned integers as unsigned values,
  ## i.e.: "42u".  You will need a version of InfluxDB supporting unsigned
  ## integer values.  Enabling this option will result in field type errors if
  ## existing data has been written.
  # influx_uint_support = false

  ## When true, Telegraf will omit the timestamp on data to allow InfluxDB
  ## to set the timestamp of the data during ingestion. This is generally NOT
  ## what you want as it can lead to data points captured at different times
  ## getting omitted due to similar data.
  # influx_omit_timestamp = false

输入和输出集成示例

NATS

  1. 实时分析仪表板:利用 NATS 插件从各种 NATS 主题实时收集指标,并将它们馈送到集中式分析仪表板。此设置允许立即了解实时应用程序性能,使团队能够快速响应操作问题或性能下降。

  2. 分布式系统监控:在分布式架构中部署配置了 NATS 插件的多个 Telegraf 实例。这种方法允许团队有效地聚合来自各种微服务的指标,提供系统健康和性能的整体视图,同时确保在传输过程中不会丢失任何消息。

  3. 历史消息恢复:利用 NATS JetStream 的功能以及此插件来恢复和处理 Telegraf 重启后的历史消息。此功能对于需要高可靠性的应用程序尤其有益,即使在服务中断的情况下也能确保不会丢失任何关键指标。

  4. 动态负载均衡:实施动态负载均衡场景,其中 Telegraf 实例根据负载从 NATS 集群消费消息。调整队列组设置以控制活动消费者的数量,从而随着需求波动实现更好的资源利用和性能扩展。

InfluxDB

  1. 实时系统监控:利用 InfluxDB 插件捕获和存储来自各种系统组件(例如 CPU 使用率、内存消耗和磁盘 I/O)的指标。通过将这些指标推送到 InfluxDB,您可以创建一个实时仪表板,可视化实时系统性能。此设置不仅有助于识别性能瓶颈,还有助于通过分析长期趋势来进行主动容量规划。

  2. Web 应用程序的性能跟踪:自动收集并将与 Web 应用程序性能相关的指标(例如请求持续时间、错误率和用户交互)推送到 InfluxDB。通过在您的监控堆栈中使用此插件,您可以使用存储的指标生成报告和分析,以帮助了解用户行为和应用程序效率,从而指导开发和优化工作。

  3. 物联网数据聚合:利用 InfluxDB Telegraf 插件从各种 IoT 设备收集传感器数据,并将其存储在集中式 InfluxDB 实例中。此用例使您能够分析环境或机器数据随时间变化的趋势和模式,从而促进更智能的决策和预测性维护策略。通过将 IoT 数据集成到 InfluxDB 中,组织可以利用历史数据分析的力量来推动创新和运营效率。

  4. 分析历史指标以进行预测:设置 InfluxDB 插件以将历史指标数据发送到 InfluxDB,并使用它来驱动预测模型。通过分析过去的性能指标,您可以创建预测未来趋势和需求的预测模型。此应用程序对于商业智能目的尤其有用,有助于组织根据历史使用模式为资源需求的波动做好准备。

反馈

感谢您成为我们社区的一份子!如果您有任何一般性反馈或在这些页面上发现任何错误,我们欢迎并鼓励您提出意见。请在 InfluxDB 社区 Slack 中提交您的反馈。

强大性能,无限扩展

收集、组织和处理海量高速数据。当您将其视为时间序列数据时,任何数据都更有价值。借助 InfluxDB,第一的时间序列平台,它与 Telegraf 一起构建以实现扩展。

查看入门方法

相关集成

HTTP 和 InfluxDB 集成

HTTP 插件从一个或多个 HTTP(S) 端点收集指标。它支持各种身份验证方法和数据格式的配置选项。

查看集成

Kafka 和 InfluxDB 集成

此插件从 Kafka 读取消息,并允许基于这些消息创建指标。它支持各种配置,包括不同的 Kafka 设置和消息处理选项。

查看集成

Kinesis 和 InfluxDB 集成

Kinesis 插件允许从 AWS Kinesis 流读取指标。它支持多种输入数据格式,并提供使用 DynamoDB 的检查点功能,以实现可靠的消息处理。

查看集成