NSQ 和 Elasticsearch 集成

强大的性能和简单的集成,由 Telegraf 提供支持,Telegraf 是 InfluxData 构建的开源数据连接器。

info

这不是大规模实时查询的推荐配置。为了优化查询和压缩、高速摄取和高可用性,您可能需要考虑 NSQ 和 InfluxDB

50 亿+

Telegraf 下载量

#1

时序数据库
来源:DB-Engines

10 亿+

InfluxDB 下载量

2,800+

贡献者

目录

强大的性能,无限的扩展

收集、组织和处理海量高速数据。当您将任何数据视为时序数据时,它都会更有价值。使用 InfluxDB,排名第一的时序平台,旨在通过 Telegraf 进行扩展。

查看入门方法

输入和输出集成概述

NSQ Telegraf 插件从 NSQD 消息传递系统读取指标,从而实现实时数据处理和监控。

Telegraf Elasticsearch 插件无缝地将指标发送到 Elasticsearch 服务器。该插件处理模板创建和动态索引管理,并支持各种 Elasticsearch 特有功能,以确保数据格式正确,以便存储和检索。

集成详情

NSQ

NSQ 插件与 NSQ(一个实时消息传递平台)接口,从而能够从 NSQD 读取消息。此插件被归类为服务插件,这意味着它主动侦听指标和事件,而不是以固定的时间间隔轮询它们。为了强调可靠性,它会跟踪未传递的消息,直到输出确认收到为止,从而防止数据丢失。该插件允许进行配置,例如指定 NSQLookupd 端点、主题和通道,并且它支持多种数据格式,以实现数据处理的灵活性。

Elasticsearch

此插件将指标写入 Elasticsearch,Elasticsearch 是一个分布式、RESTful 搜索和分析引擎,能够以近乎实时的速度存储大量数据。它旨在处理 Elasticsearch 5.x 到 7.x 版本,并利用其动态模板功能来正确管理数据类型映射。该插件支持高级功能,例如模板管理、动态索引命名以及与 OpenSearch 的集成。它还允许配置 Elasticsearch 节点的身份验证和运行状况监控。

配置

NSQ

# Read metrics from NSQD topic(s)
[[inputs.nsq_consumer]]
  ## Server option still works but is deprecated, we just prepend it to the nsqd array.
  # server = "localhost:4150"

  ## An array representing the NSQD TCP HTTP Endpoints
  nsqd = ["localhost:4150"]

  ## An array representing the NSQLookupd HTTP Endpoints
  nsqlookupd = ["localhost:4161"]
  topic = "telegraf"
  channel = "consumer"
  max_in_flight = 100

  ## Max undelivered messages
  ## This plugin uses tracking metrics, which ensure messages are read to
  ## outputs before acknowledging them to the original broker to ensure data
  ## is not lost. This option sets the maximum messages to read from the
  ## broker that have not been written by an output.
  ##
  ## This value needs to be picked with awareness of the agent's
  ## metric_batch_size value as well. Setting max undelivered messages too high
  ## can result in a constant stream of data batches to the output. While
  ## setting it too low may never flush the broker's messages.
  # max_undelivered_messages = 1000

  ## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "influx"

Elasticsearch


[[outputs.elasticsearch]]
  ## The full HTTP endpoint URL for your Elasticsearch instance
  ## Multiple urls can be specified as part of the same cluster,
  ## this means that only ONE of the urls will be written to each interval
  urls = [ "http://node1.es.example.com:9200" ] # required.
  ## Elasticsearch client timeout, defaults to "5s" if not set.
  timeout = "5s"
  ## Set to true to ask Elasticsearch a list of all cluster nodes,
  ## thus it is not necessary to list all nodes in the urls config option
  enable_sniffer = false
  ## Set to true to enable gzip compression
  enable_gzip = false
  ## Set the interval to check if the Elasticsearch nodes are available
  ## Setting to "0s" will disable the health check (not recommended in production)
  health_check_interval = "10s"
  ## Set the timeout for periodic health checks.
  # health_check_timeout = "1s"
  ## HTTP basic authentication details.
  ## HTTP basic authentication details
  # username = "telegraf"
  # password = "mypassword"
  ## HTTP bearer token authentication details
  # auth_bearer_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9"

  ## Index Config
  ## The target index for metrics (Elasticsearch will create if it not exists).
  ## You can use the date specifiers below to create indexes per time frame.
  ## The metric timestamp will be used to decide the destination index name
  # %Y - year (2016)
  # %y - last two digits of year (00..99)
  # %m - month (01..12)
  # %d - day of month (e.g., 01)
  # %H - hour (00..23)
  # %V - week of the year (ISO week) (01..53)
  ## Additionally, you can specify a tag name using the notation {{tag_name}}
  ## which will be used as part of the index name. If the tag does not exist,
  ## the default tag value will be used.
  # index_name = "telegraf-{{host}}-%Y.%m.%d"
  # default_tag_value = "none"
  index_name = "telegraf-%Y.%m.%d" # required.

  ## Optional Index Config
  ## Set to true if Telegraf should use the "create" OpType while indexing
  # use_optype_create = false

  ## Optional TLS Config
  # tls_ca = "/etc/telegraf/ca.pem"
  # tls_cert = "/etc/telegraf/cert.pem"
  # tls_key = "/etc/telegraf/key.pem"
  ## Use TLS but skip chain & host verification
  # insecure_skip_verify = false

  ## Template Config
  ## Set to true if you want telegraf to manage its index template.
  ## If enabled it will create a recommended index template for telegraf indexes
  manage_template = true
  ## The template name used for telegraf indexes
  template_name = "telegraf"
  ## Set to true if you want telegraf to overwrite an existing template
  overwrite_template = false
  ## If set to true a unique ID hash will be sent as sha256(concat(timestamp,measurement,series-hash)) string
  ## it will enable data resend and update metric points avoiding duplicated metrics with different id's
  force_document_id = false

  ## Specifies the handling of NaN and Inf values.
  ## This option can have the following values:
  ##    none    -- do not modify field-values (default); will produce an error if NaNs or infs are encountered
  ##    drop    -- drop fields containing NaNs or infs
  ##    replace -- replace with the value in "float_replacement_value" (default: 0.0)
  ##               NaNs and inf will be replaced with the given number, -inf with the negative of that number
  # float_handling = "none"
  # float_replacement_value = 0.0

  ## Pipeline Config
  ## To use a ingest pipeline, set this to the name of the pipeline you want to use.
  # use_pipeline = "my_pipeline"
  ## Additionally, you can specify a tag name using the notation {{tag_name}}
  ## which will be used as part of the pipeline name. If the tag does not exist,
  ## the default pipeline will be used as the pipeline. If no default pipeline is set,
  ## no pipeline is used for the metric.
  # use_pipeline = "{{es_pipeline}}"
  # default_pipeline = "my_pipeline"
  #
  # Custom HTTP headers
  # To pass custom HTTP headers please define it in a given below section
  # [outputs.elasticsearch.headers]
  #    "X-Custom-Header" = "custom-value"

  ## Template Index Settings
  ## Overrides the template settings.index section with any provided options.
  ## Defaults provided here in the config
  # template_index_settings = {
  #   refresh_interval = "10s",
  #   mapping.total_fields.limit = 5000,
  #   auto_expand_replicas = "0-1",
  #   codec = "best_compression"
  # }

输入和输出集成示例

NSQ

  1. 实时分析仪表板:将此插件与可视化工具集成,以创建仪表板,显示来自 NSQ 中各种主题的实时指标。通过订阅特定主题,用户可以动态监控系统运行状况和应用程序性能,从而立即获得洞察力,并及时响应任何异常情况。

  2. 事件驱动的自动化:将 NSQ 与无服务器架构结合使用,以基于传入消息触发自动化工作流程。此用例可能涉及处理机器学习模型的数据或响应应用程序中的用户操作,从而通过快速处理来简化操作并增强用户体验。

  3. 多服务通信中心:在分布式架构中,使用 NSQ 插件充当不同微服务之间的集中式消息传递中心。通过使服务能够通过 NSQ 进行通信,开发人员可以确保可靠的消息传递,同时保持服务交互解耦,从而显着提高可扩展性和弹性。

  4. 用于增强监控的指标聚合:实施 NSQ 插件以聚合来自多个来源的指标,然后再将其发送到分析工具。此设置使企业能够整合来自各种应用程序和服务的数据,从而创建统一的视图,以便更好地进行决策和战略规划。

Elasticsearch

  1. 基于时间的索引:使用此插件将指标存储在 Elasticsearch 中,以根据收集时间为每个指标编制索引。例如,CPU 指标可以存储在名为telegraf-2023.01.01的每日索引中,从而可以轻松进行基于时间的查询和保留策略。

  2. 动态模板管理:利用模板管理功能自动创建针对您的指标量身定制的自定义模板。这使您可以定义如何索引和分析不同的字段,而无需手动配置 Elasticsearch,从而确保用于查询的最佳数据结构。

  3. OpenSearch 兼容性:如果您正在使用 AWS OpenSearch,则可以通过激活兼容模式来配置此插件以实现无缝工作,从而确保您现有的 Elasticsearch 客户端保持功能正常,并且与较新的集群设置兼容。

反馈

感谢您成为我们社区的一份子!如果您有任何一般性反馈或在这些页面上发现了任何错误,我们欢迎并鼓励您提出意见。请在 InfluxDB 社区 Slack 中提交您的反馈。

强大的性能,无限的扩展

收集、组织和处理海量高速数据。当您将任何数据视为时序数据时,它都会更有价值。使用 InfluxDB,排名第一的时序平台,旨在通过 Telegraf 进行扩展。

查看入门方法

相关集成

HTTP 和 InfluxDB 集成

HTTP 插件从一个或多个 HTTP(S) 端点收集指标。它支持各种身份验证方法和数据格式的配置选项。

查看集成

Kafka 和 InfluxDB 集成

此插件从 Kafka 读取消息,并允许根据这些消息创建指标。它支持各种配置,包括不同的 Kafka 设置和消息处理选项。

查看集成

Kinesis 和 InfluxDB 集成

Kinesis 插件允许从 AWS Kinesis 流中读取指标。它支持多种输入数据格式,并提供与 DynamoDB 的检查点功能,以实现可靠的消息处理。

查看集成