目录
输入和输出集成概述
NATS Consumer 输入插件支持从 NATS 消息主题实时数据消费,无缝集成到 Telegraf 数据管道中,用于监控和指标收集。
Telegraf PostgreSQL 插件允许您高效地将指标写入 PostgreSQL 数据库,同时自动管理数据库模式。
集成详情
NATS
NATS Consumer 插件允许 Telegraf 从指定的 NATS 主题读取指标,并根据支持的输入数据格式创建指标。利用队列组允许多个 Telegraf 实例并行地从 NATS 集群读取数据,从而提高吞吐量和可靠性。此插件还支持多种身份验证方法,包括用户名/密码、NATS 凭据文件和 nkey 种子文件,确保与 NATS 服务器的安全通信。由于 JetStream 等功能有助于消费历史消息,因此在数据持久性和消息可靠性至关重要的环境中,此插件尤其有用。此外,配置各种操作参数的能力使此插件适用于高吞吐量场景,同时保持性能完整性。
PostgreSQL
PostgreSQL 插件使用户能够将指标写入 PostgreSQL 数据库或兼容数据库,通过自动更新缺失的列来提供对模式管理的强大支持。该插件旨在促进与监控解决方案的集成,允许用户高效地存储和管理时序数据。它为连接设置、并发和错误处理提供了可配置的选项,并支持高级功能,例如用于标签和字段的 JSONB 存储、外键标记、模板化模式修改以及通过 pguint 扩展对无符号整数数据类型的支持。
配置
NATS
[[inputs.nats_consumer]]
## urls of NATS servers
servers = ["nats://localhost:4222"]
## subject(s) to consume
## If you use jetstream you need to set the subjects
## in jetstream_subjects
subjects = ["telegraf"]
## jetstream subjects
## jetstream is a streaming technology inside of nats.
## With jetstream the nats-server persists messages and
## a consumer can consume historical messages. This is
## useful when telegraf needs to restart it don't miss a
## message. You need to configure the nats-server.
## https://docs.nats.io/nats-concepts/jetstream.
jetstream_subjects = ["js_telegraf"]
## name a queue group
queue_group = "telegraf_consumers"
## Optional authentication with username and password credentials
# username = ""
# password = ""
## Optional authentication with NATS credentials file (NATS 2.0)
# credentials = "/etc/telegraf/nats.creds"
## Optional authentication with nkey seed file (NATS 2.0)
# nkey_seed = "/etc/telegraf/seed.txt"
## Use Transport Layer Security
# secure = false
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## Sets the limits for pending msgs and bytes for each subscription
## These shouldn't need to be adjusted except in very high throughput scenarios
# pending_message_limit = 65536
# pending_bytes_limit = 67108864
## Max undelivered messages
## This plugin uses tracking metrics, which ensure messages are read to
## outputs before acknowledging them to the original broker to ensure data
## is not lost. This option sets the maximum messages to read from the
## broker that have not been written by an output.
##
## This value needs to be picked with awareness of the agent's
## metric_batch_size value as well. Setting max undelivered messages too high
## can result in a constant stream of data batches to the output. While
## setting it too low may never flush the broker's messages.
# max_undelivered_messages = 1000
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
PostgreSQL
# Publishes metrics to a postgresql database
[[outputs.postgresql]]
## Specify connection address via the standard libpq connection string:
## host=... user=... password=... sslmode=... dbname=...
## Or a URL:
## postgres://[user[:password]]@localhost[/dbname]?sslmode=[disable|verify-ca|verify-full]
## See https://postgresql.ac.cn/docs/current/libpq-connect.html#LIBPQ-CONNSTRING
##
## All connection parameters are optional. Environment vars are also supported.
## e.g. PGPASSWORD, PGHOST, PGUSER, PGDATABASE
## All supported vars can be found here:
## https://postgresql.ac.cn/docs/current/libpq-envars.html
##
## Non-standard parameters:
## pool_max_conns (default: 1) - Maximum size of connection pool for parallel (per-batch per-table) inserts.
## pool_min_conns (default: 0) - Minimum size of connection pool.
## pool_max_conn_lifetime (default: 0s) - Maximum age of a connection before closing.
## pool_max_conn_idle_time (default: 0s) - Maximum idle time of a connection before closing.
## pool_health_check_period (default: 0s) - Duration between health checks on idle connections.
# connection = ""
## Postgres schema to use.
# schema = "public"
## Store tags as foreign keys in the metrics table. Default is false.
# tags_as_foreign_keys = false
## Suffix to append to table name (measurement name) for the foreign tag table.
# tag_table_suffix = "_tag"
## Deny inserting metrics if the foreign tag can't be inserted.
# foreign_tag_constraint = false
## Store all tags as a JSONB object in a single 'tags' column.
# tags_as_jsonb = false
## Store all fields as a JSONB object in a single 'fields' column.
# fields_as_jsonb = false
## Name of the timestamp column
## NOTE: Some tools (e.g. Grafana) require the default name so be careful!
# timestamp_column_name = "time"
## Type of the timestamp column
## Currently, "timestamp without time zone" and "timestamp with time zone"
## are supported
# timestamp_column_type = "timestamp without time zone"
## Templated statements to execute when creating a new table.
# create_templates = [
# '''CREATE TABLE {{ .table }} ({{ .columns }})''',
# ]
## Templated statements to execute when adding columns to a table.
## Set to an empty list to disable. Points containing tags for which there is no column will be skipped. Points
## containing fields for which there is no column will have the field omitted.
# add_column_templates = [
# '''ALTER TABLE {{ .table }} ADD COLUMN IF NOT EXISTS {{ .columns|join ", ADD COLUMN IF NOT EXISTS " }}''',
# ]
## Templated statements to execute when creating a new tag table.
# tag_table_create_templates = [
# '''CREATE TABLE {{ .table }} ({{ .columns }}, PRIMARY KEY (tag_id))''',
# ]
## Templated statements to execute when adding columns to a tag table.
## Set to an empty list to disable. Points containing tags for which there is no column will be skipped.
# tag_table_add_column_templates = [
# '''ALTER TABLE {{ .table }} ADD COLUMN IF NOT EXISTS {{ .columns|join ", ADD COLUMN IF NOT EXISTS " }}''',
# ]
## The postgres data type to use for storing unsigned 64-bit integer values (Postgres does not have a native
## unsigned 64-bit integer type).
## The value can be one of:
## numeric - Uses the PostgreSQL "numeric" data type.
## uint8 - Requires pguint extension (https://github.com/petere/pguint)
# uint64_type = "numeric"
## When using pool_max_conns>1, and a temporary error occurs, the query is retried with an incremental backoff. This
## controls the maximum backoff duration.
# retry_max_backoff = "15s"
## Approximate number of tag IDs to store in in-memory cache (when using tags_as_foreign_keys).
## This is an optimization to skip inserting known tag IDs.
## Each entry consumes approximately 34 bytes of memory.
# tag_cache_size = 100000
## Enable & set the log level for the Postgres driver.
# log_level = "warn" # trace, debug, info, warn, error, none
输入和输出集成示例
NATS
-
实时分析仪表板:使用 NATS 插件从各种 NATS 主题实时收集指标,并将它们馈送到集中的分析仪表板。此设置允许立即查看实时应用程序性能,使团队能够快速响应操作问题或性能下降。
-
分布式系统监控:在分布式架构中部署配置了 NATS 插件的多个 Telegraf 实例。此方法允许团队有效地聚合来自各种微服务的指标,提供系统健康和性能的整体视图,同时确保在传输过程中不会丢失任何消息。
-
历史消息恢复:利用 NATS JetStream 的功能以及此插件来恢复和处理 Telegraf 重新启动后的历史消息。此功能对于需要高可靠性的应用程序尤其有益,确保即使在服务中断的情况下也不会丢失任何关键指标。
-
动态负载均衡:实施动态负载均衡场景,其中 Telegraf 实例根据负载从 NATS 集群消费消息。调整队列组设置以控制活动消费者的数量,从而在需求波动发生时实现更好的资源利用率和性能扩展。
PostgreSQL
-
使用复杂查询进行实时分析:利用 PostgreSQL 插件将来自各种来源的指标存储在 PostgreSQL 数据库中,从而可以使用复杂查询进行实时分析。此设置可以帮助数据科学家和分析师发现模式和趋势,因为他们可以跨多个表操作关系数据,同时利用 PostgreSQL 强大的查询优化功能。具体来说,用户可以使用跨不同指标表的 JOIN 操作创建复杂的报告,从而揭示通常在嵌入式系统中隐藏的见解。
-
与 TimescaleDB 集成以进行时序数据处理:在 TimescaleDB 实例中使用 PostgreSQL 插件,以高效地处理和分析时序数据。通过实施超表,用户可以在时间维度上实现更高的性能和主题分区。此集成允许用户对大量的时序数据运行分析查询,同时保留 PostgreSQL SQL 查询的全部功能,从而确保指标分析的可靠性和效率。
-
数据版本控制和历史分析:实施使用 PostgreSQL 插件的策略,以维护指标随时间的不同版本。用户可以设置不可变的数据表结构,其中保留旧版本的表,从而轻松进行历史分析。这种方法不仅提供了对数据演变的见解,还有助于遵守数据保留策略,确保数据集的历史完整性保持不变。
-
动态模式管理以适应不断变化的指标:使用插件的模板功能来创建动态变化的模式,以响应指标变化。此用例允许组织在指标演变时调整其数据结构,添加必要的字段并确保遵守数据完整性策略。通过利用模板化的 SQL 命令,用户无需手动干预即可扩展其数据库,从而促进敏捷的数据管理实践。
反馈
感谢您成为我们社区的一份子!如果您有任何一般性反馈或在这些页面上发现任何错误,我们欢迎并鼓励您提出意见。请在 InfluxDB 社区 Slack 中提交您的反馈。