AMQP 和 Prometheus 集成

强大的性能和简单的集成,由 InfluxData 构建的开源数据连接器 Telegraf 提供支持。

info

对于大规模实时查询,这不是推荐的配置。 为了进行查询和压缩优化、高速摄取和高可用性,您可能需要考虑 AMQP 和 InfluxDB

50亿+

Telegraf 下载量

#1

时序数据库
来源:DB Engines

10亿+

InfluxDB 下载量

2,800+

贡献者

目录

强大的性能,无限的扩展能力

收集、组织和处理海量高速数据。 当您将任何数据视为时间序列数据时,它会更有价值。 借助 InfluxDB,排名第一的、旨在与 Telegraf 一起扩展的时序平台。

查看入门方法

输入和输出集成概述

AMQP Consumer 输入插件允许您从兼容 AMQP 0-9-1 的消息代理(如 RabbitMQ)中摄取数据,从而实现无缝的数据收集,用于监控和分析目的。

Prometheus 输出插件使 Telegraf 能够在 HTTP 端点公开指标,以供 Prometheus 服务器抓取。此集成允许用户以 Prometheus 可以高效处理的格式从各种来源收集和聚合指标。

集成详情

AMQP

此插件为 AMQP 0-9-1 提供了一个消费者,RabbitMQ 是其一个突出的实现。 AMQP 或高级消息队列协议最初是为了在网络中不同的系统之间实现可靠的、可互操作的消息传递而开发的。 该插件使用配置的队列和绑定键从主题交换中读取指标,从而提供了一种灵活高效的方式来从兼容 AMQP 的消息传递系统中收集数据。 这使用户能够利用现有的 RabbitMQ 实现来有效地监控其应用程序,方法是捕获详细的指标以进行分析和告警。

Prometheus

此插件有助于与 Prometheus 集成,Prometheus 是一种著名的开源监控和告警工具包,专为大规模环境中的可靠性和效率而设计。 通过充当 Prometheus 客户端,它允许用户通过 HTTP 服务器公开一组定义的指标,Prometheus 可以按指定的间隔抓取这些指标。 此插件在监控各种系统中起着至关重要的作用,因为它允许它们以标准化格式发布性能指标,从而可以广泛了解系统健康状况和行为。 主要功能包括支持配置各种端点、启用 TLS 以进行安全通信以及 HTTP 基本身份验证的选项。 该插件还与全局 Telegraf 配置设置无缝集成,支持广泛的自定义以适应特定的监控需求。 这促进了不同系统必须有效通信性能数据的环境中的互操作性。 利用 Prometheus 的指标格式,它可以通过指标过期和收集器控制等高级配置来实现灵活的指标管理,为监控和告警工作流程提供了一个复杂的解决方案。

配置

AMQP

[[inputs.amqp_consumer]]
  ## Brokers to consume from.  If multiple brokers are specified a random broker
  ## will be selected anytime a connection is established.  This can be
  ## helpful for load balancing when not using a dedicated load balancer.
  brokers = ["amqp://localhost:5672/influxdb"]

  ## Authentication credentials for the PLAIN auth_method.
  # username = ""
  # password = ""

  ## Name of the exchange to declare.  If unset, no exchange will be declared.
  exchange = "telegraf"

  ## Exchange type; common types are "direct", "fanout", "topic", "header", "x-consistent-hash".
  # exchange_type = "topic"

  ## If true, exchange will be passively declared.
  # exchange_passive = false

  ## Exchange durability can be either "transient" or "durable".
  # exchange_durability = "durable"

  ## Additional exchange arguments.
  # exchange_arguments = { }
  # exchange_arguments = {"hash_property" = "timestamp"}

  ## AMQP queue name.
  queue = "telegraf"

  ## AMQP queue durability can be "transient" or "durable".
  queue_durability = "durable"

  ## If true, queue will be passively declared.
  # queue_passive = false

  ## Additional arguments when consuming from Queue
  # queue_consume_arguments = { }
  # queue_consume_arguments = {"x-stream-offset" = "first"}

  ## A binding between the exchange and queue using this binding key is
  ## created.  If unset, no binding is created.
  binding_key = "#"

  ## Maximum number of messages server should give to the worker.
  # prefetch_count = 50

  ## Max undelivered messages
  ## This plugin uses tracking metrics, which ensure messages are read to
  ## outputs before acknowledging them to the original broker to ensure data
  ## is not lost. This option sets the maximum messages to read from the
  ## broker that have not been written by an output.
  ##
  ## This value needs to be picked with awareness of the agent's
  ## metric_batch_size value as well. Setting max undelivered messages too high
  ## can result in a constant stream of data batches to the output. While
  ## setting it too low may never flush the broker's messages.
  # max_undelivered_messages = 1000

  ## Timeout for establishing the connection to a broker
  # timeout = "30s"

  ## Auth method. PLAIN and EXTERNAL are supported
  ## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
  ## described here: https://rabbitmq.cn/plugins.html
  # auth_method = "PLAIN"

  ## Optional TLS Config
  # tls_ca = "/etc/telegraf/ca.pem"
  # tls_cert = "/etc/telegraf/cert.pem"
  # tls_key = "/etc/telegraf/key.pem"
  ## Use TLS but skip chain & host verification
  # insecure_skip_verify = false

  ## Content encoding for message payloads, can be set to
  ## "gzip", "identity" or "auto"
  ## - Use "gzip" to decode gzip
  ## - Use "identity" to apply no encoding
  ## - Use "auto" determine the encoding using the ContentEncoding header
  # content_encoding = "identity"

  ## Maximum size of decoded message.
  ## Acceptable units are B, KiB, KB, MiB, MB...
  ## Without quotes and units, interpreted as size in bytes.
  # max_decompression_size = "500MB"

  ## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "influx"

Prometheus

[[outputs.prometheus_client]]
  ## Address to listen on.
  ##   ex:
  ##     listen = ":9273"
  ##     listen = "vsock://:9273"
  listen = ":9273"

  ## Maximum duration before timing out read of the request
  # read_timeout = "10s"
  ## Maximum duration before timing out write of the response
  # write_timeout = "10s"

  ## Metric version controls the mapping from Prometheus metrics into Telegraf metrics.
  ## See "Metric Format Configuration" in plugins/inputs/prometheus/README.md for details.
  ## Valid options: 1, 2
  # metric_version = 1

  ## Use HTTP Basic Authentication.
  # basic_username = "Foo"
  # basic_password = "Bar"

  ## If set, the IP Ranges which are allowed to access metrics.
  ##   ex: ip_range = ["192.168.0.0/24", "192.168.1.0/30"]
  # ip_range = []

  ## Path to publish the metrics on.
  # path = "/metrics"

  ## Expiration interval for each metric. 0 == no expiration
  # expiration_interval = "60s"

  ## Collectors to enable, valid entries are "gocollector" and "process".
  ## If unset, both are enabled.
  # collectors_exclude = ["gocollector", "process"]

  ## Send string metrics as Prometheus labels.
  ## Unless set to false all string metrics will be sent as labels.
  # string_as_label = true

  ## If set, enable TLS with the given certificate.
  # tls_cert = "/etc/ssl/telegraf.crt"
  # tls_key = "/etc/ssl/telegraf.key"

  ## Set one or more allowed client CA certificate file names to
  ## enable mutually authenticated TLS connections
  # tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"]

  ## Export metric collection time.
  # export_timestamp = false

  ## Specify the metric type explicitly.
  ## This overrides the metric-type of the Telegraf metric. Globbing is allowed.
  # [outputs.prometheus_client.metric_types]
  #   counter = []
  #   gauge = []

输入和输出集成示例

AMQP

  1. 集成应用程序指标与 AMQP:使用 AMQP Consumer 插件来收集发布到 RabbitMQ 交换的应用程序指标。 通过配置插件以侦听特定的队列,团队可以深入了解应用程序性能,跟踪请求速率、错误计数和延迟指标,所有这些都是实时的。 这种设置不仅有助于异常检测,而且还为容量规划和系统优化提供了有价值的数据。

  2. 事件驱动的监控:每当应用程序中满足某些条件时,配置 AMQP Consumer 以触发特定的监控事件。 例如,如果收到指示高错误率的消息,则插件可以将此数据馈送到监控工具中,从而生成告警或扩展事件。 这种集成可以提高对问题的响应速度,并自动化部分操作工作流程。

  3. 跨平台数据聚合:利用 AMQP Consumer 插件来整合来自分布在不同平台上的各种应用程序的指标。 通过利用 RabbitMQ 作为集中式消息代理,组织可以统一其监控数据,从而允许通过 Telegraf 进行全面的分析和仪表板显示,从而在异构环境中保持可见性。

  4. 实时日志处理:扩展 AMQP Consumer 的使用范围,以捕获发送到 RabbitMQ 交换的日志数据,实时处理日志以进行监控和告警。 此应用程序确保通过分析日志模式、趋势和异常情况来及时检测和解决操作问题。

Prometheus

  1. 监控多云部署:利用 Prometheus 插件来收集来自跨多个云提供商运行的应用程序的指标。 这种情况允许团队通过单个 Prometheus 实例集中监控,该实例从不同环境中抓取指标,从而提供跨混合基础设施的统一性能指标视图。 它简化了报告和告警,提高了运营效率,而无需复杂的集成。

  2. 增强微服务可见性:实施该插件以公开 Kubernetes 集群中各种微服务的指标。 通过使用 Prometheus,团队可以实时可视化服务指标、识别瓶颈并维护系统健康检查。 这种设置支持基于从收集的指标生成的洞察进行自适应扩展和资源利用率优化。 它增强了对服务交互进行故障排除的能力,从而显着提高了微服务架构的弹性。

  3. 电子商务中的实时异常检测:通过将此插件与 Prometheus 一起使用,电子商务平台可以监控关键绩效指标,例如响应时间和错误率。 将异常检测算法与抓取的指标集成,可以识别指示潜在问题的意外模式,例如突然的流量高峰或后端服务故障。 这种主动监控增强了业务连续性和运营效率,最大限度地减少了潜在的停机时间,同时确保了服务可靠性。

  4. API 的性能指标报告:利用 Prometheus Output Plugin 收集和报告 API 性能指标,然后可以在 Grafana 仪表板中可视化这些指标。 此用例支持对 API 响应时间、吞吐量和错误率进行详细分析,从而促进 API 服务的持续改进。 通过密切监控这些指标,团队可以快速响应性能下降,确保最佳的 API 性能并保持高水平的服务可用性。

反馈

感谢您成为我们社区的一份子! 如果您有任何一般性反馈或在这些页面上发现了任何错误,我们欢迎并鼓励您提出意见。 请在 InfluxDB 社区 Slack 中提交您的反馈。

强大的性能,无限的扩展能力

收集、组织和处理海量高速数据。 当您将任何数据视为时间序列数据时,它会更有价值。 借助 InfluxDB,排名第一的、旨在与 Telegraf 一起扩展的时序平台。

查看入门方法

相关集成

HTTP 和 InfluxDB 集成

HTTP 插件从一个或多个 HTTP(S) 端点收集指标。 它支持各种身份验证方法和数据格式的配置选项。

查看集成

Kafka 和 InfluxDB 集成

此插件从 Kafka 读取消息,并允许基于这些消息创建指标。 它支持各种配置,包括不同的 Kafka 设置和消息处理选项。

查看集成

Kinesis 和 InfluxDB 集成

Kinesis 插件允许从 AWS Kinesis 流中读取指标。 它支持多种输入数据格式,并提供带有 DynamoDB 的检查点功能,以实现可靠的消息处理。

查看集成