NSQ 和 TimescaleDB 集成

强大的性能和简单的集成,由 InfluxData 构建的开源数据连接器 Telegraf 提供支持。

info

这不是大规模实时查询的推荐配置。 为了进行查询和压缩优化、高速摄取和高可用性,您可能需要考虑NSQ 和 InfluxDB

50 亿以上

Telegraf 下载量

#1

时间序列数据库
来源:DB Engines

10 亿以上

InfluxDB 下载量

2,800+

贡献者

目录

强大的性能,无限的扩展能力

收集、组织和处理海量高速数据。 当您将任何数据视为时间序列数据时,它都更有价值。 借助 InfluxDB,第一的时间序列平台旨在与 Telegraf 一起扩展。

查看入门方法

输入和输出集成概述

NSQ Telegraf 插件从 NSQD 消息传递系统读取指标,从而实现实时数据处理和监控。

此输出插件提供了一种可靠且高效的机制,用于将 Telegraf 收集的指标直接路由到 TimescaleDB 中。 通过利用 PostgreSQL 的强大生态系统以及 TimescaleDB 的时间序列优化,它支持高性能数据摄取和高级查询功能。

集成详细信息

NSQ

NSQ 插件与实时消息传递平台 NSQ 接口,从而可以从 NSQD 读取消息。 此插件被归类为服务插件,这意味着它主动侦听指标和事件,而不是按固定的时间间隔轮询它们。 该插件侧重于可靠性,通过跟踪未传递的消息直到输出确认它们为止,从而防止数据丢失。 该插件允许进行配置,例如指定 NSQLookupd 端点、主题和通道,并且它支持多种数据格式,以实现数据处理的灵活性。

TimescaleDB

TimescaleDB 是一个开源时间序列数据库,构建为 PostgreSQL 的扩展,旨在高效处理大规模、面向时间的数据。 TimescaleDB 于 2017 年推出,是为了响应对稳健、可扩展的解决方案日益增长的需求而出现的,该解决方案可以管理大量具有高插入率和复杂查询的数据。 通过利用 PostgreSQL 熟悉的 SQL 接口并通过专门的时间序列功能对其进行增强,TimescaleDB 迅速在希望将时间序列功能集成到现有关系数据库中的开发人员中流行起来。 它的混合方法使用户可以受益于 PostgreSQL 的灵活性、可靠性和生态系统,同时为时间序列数据提供优化的性能。

该数据库在需要快速摄取数据点以及对历史时期进行复杂分析查询的环境中尤其有效。 TimescaleDB 具有许多创新功能,如透明地将数据划分为可管理区块的超表和内置的连续聚合。 这些功能可以显着提高查询速度和资源效率。

配置

NSQ

# Read metrics from NSQD topic(s)
[[inputs.nsq_consumer]]
  ## Server option still works but is deprecated, we just prepend it to the nsqd array.
  # server = "localhost:4150"

  ## An array representing the NSQD TCP HTTP Endpoints
  nsqd = ["localhost:4150"]

  ## An array representing the NSQLookupd HTTP Endpoints
  nsqlookupd = ["localhost:4161"]
  topic = "telegraf"
  channel = "consumer"
  max_in_flight = 100

  ## Max undelivered messages
  ## This plugin uses tracking metrics, which ensure messages are read to
  ## outputs before acknowledging them to the original broker to ensure data
  ## is not lost. This option sets the maximum messages to read from the
  ## broker that have not been written by an output.
  ##
  ## This value needs to be picked with awareness of the agent's
  ## metric_batch_size value as well. Setting max undelivered messages too high
  ## can result in a constant stream of data batches to the output. While
  ## setting it too low may never flush the broker's messages.
  # max_undelivered_messages = 1000

  ## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "influx"

TimescaleDB

# Publishes metrics to a TimescaleDB database
[[outputs.postgresql]]
  ## Specify connection address via the standard libpq connection string:
  ##   host=... user=... password=... sslmode=... dbname=...
  ## Or a URL:
  ##   postgres://[user[:password]]@localhost[/dbname]?sslmode=[disable|verify-ca|verify-full]
  ## See https://postgresql.ac.cn/docs/current/libpq-connect.html#LIBPQ-CONNSTRING
  ##
  ## All connection parameters are optional. Environment vars are also supported.
  ## e.g. PGPASSWORD, PGHOST, PGUSER, PGDATABASE
  ## All supported vars can be found here:
  ##  https://postgresql.ac.cn/docs/current/libpq-envars.html
  ##
  ## Non-standard parameters:
  ##   pool_max_conns (default: 1) - Maximum size of connection pool for parallel (per-batch per-table) inserts.
  ##   pool_min_conns (default: 0) - Minimum size of connection pool.
  ##   pool_max_conn_lifetime (default: 0s) - Maximum connection age before closing.
  ##   pool_max_conn_idle_time (default: 0s) - Maximum idle time of a connection before closing.
  ##   pool_health_check_period (default: 0s) - Duration between health checks on idle connections.
  # connection = ""

  ## Postgres schema to use.
  # schema = "public"

  ## Store tags as foreign keys in the metrics table. Default is false.
  # tags_as_foreign_keys = false

  ## Suffix to append to table name (measurement name) for the foreign tag table.
  # tag_table_suffix = "_tag"

  ## Deny inserting metrics if the foreign tag can't be inserted.
  # foreign_tag_constraint = false

  ## Store all tags as a JSONB object in a single 'tags' column.
  # tags_as_jsonb = false

  ## Store all fields as a JSONB object in a single 'fields' column.
  # fields_as_jsonb = false

  ## Name of the timestamp column
  ## NOTE: Some tools (e.g. Grafana) require the default name so be careful!
  # timestamp_column_name = "time"

  ## Type of the timestamp column
  ## Currently, "timestamp without time zone" and "timestamp with time zone"
  ## are supported
  # timestamp_column_type = "timestamp without time zone"

  ## Templated statements to execute when creating a new table.
  # create_templates = [
  #   '''CREATE TABLE {{ .table }} ({{ .columns }})''',
  # ]

  ## Templated statements to execute when adding columns to a table.
  ## Set to an empty list to disable. Points containing tags for which there is
  ## no column will be skipped. Points containing fields for which there is no
  ## column will have the field omitted.
  # add_column_templates = [
  #   '''ALTER TABLE {{ .table }} ADD COLUMN IF NOT EXISTS {{ .columns|join ", ADD COLUMN IF NOT EXISTS " }}''',
  # ]

  ## Templated statements to execute when creating a new tag table.
  # tag_table_create_templates = [
  #   '''CREATE TABLE {{ .table }} ({{ .columns }}, PRIMARY KEY (tag_id))''',
  # ]

  ## Templated statements to execute when adding columns to a tag table.
  ## Set to an empty list to disable. Points containing tags for which there is
  ## no column will be skipped.
  # tag_table_add_column_templates = [
  #   '''ALTER TABLE {{ .table }} ADD COLUMN IF NOT EXISTS {{ .columns|join ", ADD COLUMN IF NOT EXISTS " }}''',
  # ]

  ## The postgres data type to use for storing unsigned 64-bit integer values
  ## (Postgres does not have a native unsigned 64-bit integer type).
  ## The value can be one of:
  ##   numeric - Uses the PostgreSQL "numeric" data type.
  ##   uint8 - Requires pguint extension (https://github.com/petere/pguint)
  # uint64_type = "numeric"

  ## When using pool_max_conns > 1, and a temporary error occurs, the query is
  ## retried with an incremental backoff. This controls the maximum duration.
  # retry_max_backoff = "15s"

  ## Approximate number of tag IDs to store in in-memory cache (when using
  ## tags_as_foreign_keys). This is an optimization to skip inserting known
  ## tag IDs. Each entry consumes approximately 34 bytes of memory.
  # tag_cache_size = 100000

  ## Cut column names at the given length to not exceed PostgreSQL's
  ## 'identifier length' limit (default: no limit)
  ## (see https://postgresql.ac.cn/docs/current/limits.html)
  ## Be careful to not create duplicate column names!
  # column_name_length_limit = 0

  ## Enable & set the log level for the Postgres driver.
  # log_level = "warn" # trace, debug, info, warn, error, none

输入和输出集成示例

NSQ

  1. 实时分析仪表板:将此插件与可视化工具集成,以创建仪表板,显示来自 NSQ 中各种主题的实时指标。 通过订阅特定主题,用户可以动态监控系统健康状况和应用程序性能,从而可以立即获得洞察力并及时响应任何异常。

  2. 事件驱动的自动化:将 NSQ 与无服务器架构相结合,以根据传入的消息触发自动化工作流程。 此用例可能涉及处理机器学习模型的数据或响应应用程序中的用户操作,从而简化操作并通过快速处理增强用户体验。

  3. 多服务通信中心:在分布式架构中使用 NSQ 插件充当不同微服务之间的集中式消息传递中心。 通过使服务能够通过 NSQ 进行通信,开发人员可以确保可靠的消息传递,同时保持解耦的服务交互,从而显着提高可扩展性和弹性。

  4. 用于增强监控的指标聚合:实施 NSQ 插件以聚合来自多个来源的指标,然后再将它们发送到分析工具。 这种设置使企业能够整合来自各种应用程序和服务的数据,从而创建统一的视图,以实现更好的决策和战略规划。

TimescaleDB

  1. 实时物联网数据摄取:使用该插件实时收集和存储来自数千个物联网设备的传感器数据。 这种设置有助于立即分析,帮助组织监控运营效率并快速响应不断变化的条件。

  2. 云应用程序性能监控:利用该插件将来自分布式云应用程序的详细性能指标馈送到 TimescaleDB 中。 这种集成支持实时仪表板和警报,使团队能够快速识别和缓解性能瓶颈。

  3. 历史数据分析和报告:实施一个系统,将长期指标存储在 TimescaleDB 中,以便进行全面的历史分析。 这种方法使企业能够执行趋势分析、生成详细报告并根据存档的时间序列数据做出数据驱动的决策。

  4. 自适应警报和异常检测:将插件与自动异常检测工作流程集成。 通过将指标持续流式传输到 TimescaleDB,机器学习模型可以分析数据模式,并在发生异常时触发警报,从而提高系统可靠性和主动维护。

反馈

感谢您成为我们社区的一份子!如果您有任何一般性反馈或在这些页面上发现任何错误,我们欢迎并鼓励您提出意见。 请在 InfluxDB 社区 Slack 中提交您的反馈。

强大的性能,无限的扩展能力

收集、组织和处理海量高速数据。 当您将任何数据视为时间序列数据时,它都更有价值。 借助 InfluxDB,第一的时间序列平台旨在与 Telegraf 一起扩展。

查看入门方法

相关集成

HTTP 和 InfluxDB 集成

HTTP 插件从一个或多个 HTTP(S) 端点收集指标。 它支持各种身份验证方法和数据格式的配置选项。

查看集成

Kafka 和 InfluxDB 集成

此插件从 Kafka 读取消息,并允许基于这些消息创建指标。 它支持各种配置,包括不同的 Kafka 设置和消息处理选项。

查看集成

Kinesis 和 InfluxDB 集成

Kinesis 插件允许从 AWS Kinesis 流中读取指标。 它支持多种输入数据格式,并提供带有 DynamoDB 的检查点功能,以实现可靠的消息处理。

查看集成