Kafka 和 Prometheus 集成

强大的性能和简易的集成,由 InfluxData 构建的开源数据连接器 Telegraf 提供支持。

info

这不是大规模实时查询的推荐配置。为了获得查询和压缩优化、高速摄取和高可用性,您可能需要考虑 Kafka 和 InfluxDB

50 亿+

Telegraf 下载量

#1

时序数据库
来源:DB Engines

10 亿+

InfluxDB 下载量

2,800+

贡献者

目录

强大的性能,无限的扩展

收集、组织和处理海量高速数据。当您将任何数据视为时序数据时,它都会变得更有价值。InfluxDB 是第一的时序平台,旨在与 Telegraf 一起扩展。

查看入门方法

输入和输出集成概述

此插件允许您从 Kafka 主题实时收集指标,从而增强 Telegraf 设置中的数据监控和收集能力。

Prometheus 输出插件使 Telegraf 能够在一个 HTTP 端点上公开指标,以供 Prometheus 服务器抓取。此集成允许用户以 Prometheus 可以高效处理的格式从各种来源收集和聚合指标。

集成详情

Kafka

Kafka Telegraf 插件旨在从 Kafka 主题读取数据,并使用支持的输入数据格式创建指标。作为服务输入插件,它持续监听传入的指标和事件,这与以固定间隔运行的标准输入插件不同。此特定插件可以使用来自各种 Kafka 版本的功能,并且能够使用来自指定主题的消息,应用诸如使用 SASL 的安全凭证等配置,以及使用消息偏移和消费者组选项管理消息处理。此插件的灵活性使其能够处理各种消息格式和用例,使其成为依赖 Kafka 进行数据摄取的应用程序的宝贵资产。

Prometheus

此插件促进了与 Prometheus 的集成,Prometheus 是一种著名的开源监控和警报工具包,专为大规模环境中的可靠性和效率而设计。通过充当 Prometheus 客户端,它允许用户通过 Prometheus 可以在指定间隔抓取的 HTTP 服务器公开一组定义的指标。此插件通过允许各种系统以标准化格式发布性能指标,从而在监控各种系统中起着至关重要的作用,从而可以广泛了解系统健康状况和行为。主要功能包括支持配置各种端点、启用 TLS 以进行安全通信以及 HTTP 基本身份验证选项。该插件还与全局 Telegraf 配置设置无缝集成,支持广泛的自定义以适应特定的监控需求。这促进了不同系统必须有效通信性能数据的环境中的互操作性。利用 Prometheus 的指标格式,它可以通过高级配置(例如指标过期和收集器控制)实现灵活的指标管理,为监控和警报工作流程提供了一种复杂的解决方案。

配置

Kafka


[[inputs.kafka_consumer]]
              ## Kafka brokers.
              brokers = ["localhost:9092"]

              ## Set the minimal supported Kafka version. Should be a string contains
              ## 4 digits in case if it is 0 version and 3 digits for versions starting
              ## from 1.0.0 separated by dot. This setting enables the use of new
              ## Kafka features and APIs.  Must be 0.10.2.0(used as default) or greater.
              ## Please, check the list of supported versions at
              ## https://pkg.go.dev/github.com/Shopify/sarama#SupportedVersions
              ##   ex: kafka_version = "2.6.0"
              ##   ex: kafka_version = "0.10.2.0"
              # kafka_version = "0.10.2.0"

              ## Topics to consume.
              topics = ["telegraf"]

              ## Topic regular expressions to consume.  Matches will be added to topics.
              ## Example: topic_regexps = [ "*test", "metric[0-9A-z]*" ]
              # topic_regexps = [ ]

              ## When set this tag will be added to all metrics with the topic as the value.
              # topic_tag = ""

              ## The list of Kafka message headers that should be pass as metric tags
              ## works only for Kafka version 0.11+, on lower versions the message headers
              ## are not available
              # msg_headers_as_tags = []

              ## The name of kafka message header which value should override the metric name.
              ## In case when the same header specified in current option and in msg_headers_as_tags
              ## option, it will be excluded from the msg_headers_as_tags list.
              # msg_header_as_metric_name = ""

              ## Set metric(s) timestamp using the given source.
              ## Available options are:
              ##   metric -- do not modify the metric timestamp
              ##   inner  -- use the inner message timestamp (Kafka v0.10+)
              ##   outer  -- use the outer (compressed) block timestamp (Kafka v0.10+)
              # timestamp_source = "metric"

              ## Optional Client id
              # client_id = "Telegraf"

              ## Optional TLS Config
              # enable_tls = false
              # tls_ca = "/etc/telegraf/ca.pem"
              # tls_cert = "/etc/telegraf/cert.pem"
              # tls_key = "/etc/telegraf/key.pem"
              ## Use TLS but skip chain & host verification
              # insecure_skip_verify = false

              ## Period between keep alive probes.
              ## Defaults to the OS configuration if not specified or zero.
              # keep_alive_period = "15s"

              ## SASL authentication credentials.  These settings should typically be used
              ## with TLS encryption enabled
              # sasl_username = "kafka"
              # sasl_password = "secret"

              ## Optional SASL:
              ## one of: OAUTHBEARER, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI
              ## (defaults to PLAIN)
              # sasl_mechanism = ""

              ## used if sasl_mechanism is GSSAPI
              # sasl_gssapi_service_name = ""
              # ## One of: KRB5_USER_AUTH and KRB5_KEYTAB_AUTH
              # sasl_gssapi_auth_type = "KRB5_USER_AUTH"
              # sasl_gssapi_kerberos_config_path = "/"
              # sasl_gssapi_realm = "realm"
              # sasl_gssapi_key_tab_path = ""
              # sasl_gssapi_disable_pafxfast = false

              ## used if sasl_mechanism is OAUTHBEARER
              # sasl_access_token = ""

              ## SASL protocol version.  When connecting to Azure EventHub set to 0.
              # sasl_version = 1

              # Disable Kafka metadata full fetch
              # metadata_full = false

              ## Name of the consumer group.
              # consumer_group = "telegraf_metrics_consumers"

              ## Compression codec represents the various compression codecs recognized by
              ## Kafka in messages.
              ##  0 : None
              ##  1 : Gzip
              ##  2 : Snappy
              ##  3 : LZ4
              ##  4 : ZSTD
              # compression_codec = 0
              ## Initial offset position; one of "oldest" or "newest".
              # offset = "oldest"

              ## Consumer group partition assignment strategy; one of "range", "roundrobin" or "sticky".
              # balance_strategy = "range"

              ## Maximum number of retries for metadata operations including
              ## connecting. Sets Sarama library's Metadata.Retry.Max config value. If 0 or
              ## unset, use the Sarama default of 3,
              # metadata_retry_max = 0

              ## Type of retry backoff. Valid options: "constant", "exponential"
              # metadata_retry_type = "constant"

              ## Amount of time to wait before retrying. When metadata_retry_type is
              ## "constant", each retry is delayed this amount. When "exponential", the
              ## first retry is delayed this amount, and subsequent delays are doubled. If 0
              ## or unset, use the Sarama default of 250 ms
              # metadata_retry_backoff = 0

              ## Maximum amount of time to wait before retrying when metadata_retry_type is
              ## "exponential". Ignored for other retry types. If 0, there is no backoff
              ## limit.
              # metadata_retry_max_duration = 0

              ## When set to true, this turns each bootstrap broker address into a set of
              ## IPs, then does a reverse lookup on each one to get its canonical hostname.
              ## This list of hostnames then replaces the original address list.
              ## resolve_canonical_bootstrap_servers_only = false

              ## Strategy for making connection to kafka brokers. Valid options: "startup",
              ## "defer". If set to "defer" the plugin is allowed to start before making a
              ## connection. This is useful if the broker may be down when telegraf is
              ## started, but if there are any typos in the broker setting, they will cause
              ## connection failures without warning at startup
              # connection_strategy = "startup"

              ## Maximum length of a message to consume, in bytes (default 0/unlimited);
              ## larger messages are dropped
              max_message_len = 1000000

              ## Max undelivered messages
              ## This plugin uses tracking metrics, which ensure messages are read to
              ## outputs before acknowledging them to the original broker to ensure data
              ## is not lost. This option sets the maximum messages to read from the
              ## broker that have not been written by an output.
              ##
              ## This value needs to be picked with awareness of the agent's
              ## metric_batch_size value as well. Setting max undelivered messages too high
              ## can result in a constant stream of data batches to the output. While
              ## setting it too low may never flush the broker's messages.
              # max_undelivered_messages = 1000

              ## Maximum amount of time the consumer should take to process messages. If
              ## the debug log prints messages from sarama about 'abandoning subscription
              ## to [topic] because consuming was taking too long', increase this value to
              ## longer than the time taken by the output plugin(s).
              ##
              ## Note that the effective timeout could be between 'max_processing_time' and
              ## '2 * max_processing_time'.
              # max_processing_time = "100ms"

              ## The default number of message bytes to fetch from the broker in each
              ## request (default 1MB). This should be larger than the majority of
              ## your messages, or else the consumer will spend a lot of time
              ## negotiating sizes and not actually consuming. Similar to the JVM's
              ## `fetch.message.max.bytes`.
              # consumer_fetch_default = "1MB"

              ## Data format to consume.
              ## Each data format has its own unique set of configuration options, read
              ## more about them here:
              ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
              data_format = "influx"

Prometheus

[[outputs.prometheus_client]]
  ## Address to listen on.
  ##   ex:
  ##     listen = ":9273"
  ##     listen = "vsock://:9273"
  listen = ":9273"

  ## Maximum duration before timing out read of the request
  # read_timeout = "10s"
  ## Maximum duration before timing out write of the response
  # write_timeout = "10s"

  ## Metric version controls the mapping from Prometheus metrics into Telegraf metrics.
  ## See "Metric Format Configuration" in plugins/inputs/prometheus/README.md for details.
  ## Valid options: 1, 2
  # metric_version = 1

  ## Use HTTP Basic Authentication.
  # basic_username = "Foo"
  # basic_password = "Bar"

  ## If set, the IP Ranges which are allowed to access metrics.
  ##   ex: ip_range = ["192.168.0.0/24", "192.168.1.0/30"]
  # ip_range = []

  ## Path to publish the metrics on.
  # path = "/metrics"

  ## Expiration interval for each metric. 0 == no expiration
  # expiration_interval = "60s"

  ## Collectors to enable, valid entries are "gocollector" and "process".
  ## If unset, both are enabled.
  # collectors_exclude = ["gocollector", "process"]

  ## Send string metrics as Prometheus labels.
  ## Unless set to false all string metrics will be sent as labels.
  # string_as_label = true

  ## If set, enable TLS with the given certificate.
  # tls_cert = "/etc/ssl/telegraf.crt"
  # tls_key = "/etc/ssl/telegraf.key"

  ## Set one or more allowed client CA certificate file names to
  ## enable mutually authenticated TLS connections
  # tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"]

  ## Export metric collection time.
  # export_timestamp = false

  ## Specify the metric type explicitly.
  ## This overrides the metric-type of the Telegraf metric. Globbing is allowed.
  # [outputs.prometheus_client.metric_types]
  #   counter = []
  #   gauge = []

输入和输出集成示例

Kafka

  1. 实时数据处理:使用 Kafka 插件将来自 Kafka 主题的实时数据馈送到监控系统。这对于需要即时反馈性能指标或用户活动的应用程序特别有用,使企业能够更快地对其环境中的变化条件做出反应。

  2. 动态指标收集:利用此插件根据 Kafka 内发生的事件动态调整正在捕获的指标。例如,通过与其他服务集成,用户可以让插件即时重新配置自身,确保始终根据业务或应用程序的需求收集相关指标。

  3. 集中式日志记录和监控:实施集中式日志记录系统,使用 Kafka Consumer 插件将来自多个服务的日志聚合到统一的监控仪表板中。此设置可以帮助识别跨不同服务的问题,并提高整体系统可观察性和故障排除能力。

  4. 异常检测系统:将 Kafka 与机器学习算法结合使用,进行实时异常检测。通过不断分析流式数据,此设置可以自动识别异常模式,触发警报并更有效地缓解潜在问题。

Prometheus

  1. 监控多云部署:利用 Prometheus 插件从跨多个云提供商运行的应用程序收集指标。这种情况允许团队通过单个 Prometheus 实例集中监控,该实例从不同环境抓取指标,从而提供跨混合基础设施的统一性能指标视图。它简化了报告和警报,提高了运营效率,而无需复杂的集成。

  2. 增强微服务可见性:实施该插件以公开 Kubernetes 集群内各种微服务的指标。使用 Prometheus,团队可以实时可视化服务指标,识别瓶颈并维护系统健康检查。此设置支持基于从收集的指标生成的见解进行自适应扩展和资源利用率优化。它增强了对服务交互进行故障排除的能力,从而显着提高了微服务架构的弹性。

  3. 电子商务中的实时异常检测:通过将此插件与 Prometheus 一起使用,电子商务平台可以监控关键绩效指标,例如响应时间和错误率。将异常检测算法与抓取的指标集成,可以识别指示潜在问题的意外模式,例如突然的流量高峰或后端服务故障。这种主动监控增强了业务连续性和运营效率,最大限度地减少了潜在的停机时间,同时确保了服务的可靠性。

  4. API 的性能指标报告:利用 Prometheus 输出插件收集和报告 API 性能指标,然后可以在 Grafana 仪表板中可视化这些指标。此用例可以详细分析 API 响应时间、吞吐量和错误率,从而促进 API 服务的持续改进。通过密切监控这些指标,团队可以快速响应性能下降,确保最佳 API 性能并保持高水平的服务可用性。

反馈

感谢您成为我们社区的一份子!如果您有任何一般性反馈或在这些页面上发现了任何错误,我们欢迎并鼓励您提出意见。请在 InfluxDB 社区 Slack 中提交您的反馈。

强大的性能,无限的扩展

收集、组织和处理海量高速数据。当您将任何数据视为时序数据时,它都会变得更有价值。InfluxDB 是第一的时序平台,旨在与 Telegraf 一起扩展。

查看入门方法

相关集成

HTTP 和 InfluxDB 集成

HTTP 插件从一个或多个 HTTP(S) 端点收集指标。它支持各种身份验证方法和数据格式的配置选项。

查看集成

Kafka 和 InfluxDB 集成

此插件从 Kafka 读取消息,并允许基于这些消息创建指标。它支持各种配置,包括不同的 Kafka 设置和消息处理选项。

查看集成

Kinesis 和 InfluxDB 集成

Kinesis 插件允许从 AWS Kinesis 流中读取指标。它支持多种输入数据格式,并提供带有 DynamoDB 的检查点功能,以实现可靠的消息处理。

查看集成