Google Cloud PubSub 和 PostgreSQL 集成

强大的性能和简单的集成,由 Telegraf 提供支持,Telegraf 是 InfluxData 构建的开源数据连接器。

info

对于大规模实时查询,这不是推荐的配置。 为了查询和压缩优化、高速摄取和高可用性,您可能需要考虑 Google Cloud PubSub 和 InfluxDB

50 亿+

Telegraf 下载量

#1

时序数据库
来源:DB Engines

10 亿+

InfluxDB 下载量

2,800+

贡献者

目录

强大的性能,无限的扩展能力

收集、组织和处理海量高速数据。 当您将任何数据视为时序数据时,它会更有价值。 使用 InfluxDB,第一时序平台,旨在与 Telegraf 一起扩展。

查看入门方法

输入和输出集成概述

此插件从 Google Cloud PubSub 摄取指标,从而实现实时数据处理和集成到监控设置中。

Telegraf PostgreSQL 插件允许您高效地将指标写入 PostgreSQL 数据库,同时自动管理数据库架构。

集成详情

Google Cloud PubSub

Google Cloud PubSub 输入插件旨在从 Google Cloud PubSub 摄取指标,Google Cloud PubSub 是一种消息传递服务,可促进不同系统之间的实时通信。 它允许用户通过从 Google Cloud 项目中的指定订阅中提取消息来创建和处理指标。 此插件的关键功能之一是它能够作为服务输入运行,主动侦听传入消息,而不是仅仅按设定的时间间隔轮询指标。 通过各种配置选项,用户可以自定义消息摄取的行为,例如处理凭据、管理消息大小以及调整确认设置,以确保仅在成功处理后才确认消息。 通过利用 Google PubSub 的优势,此插件与云原生架构无缝集成,使用户能够构建强大且可扩展的应用程序,这些应用程序可以实时响应事件。

PostgreSQL

PostgreSQL 插件使用户能够将指标写入 PostgreSQL 数据库或兼容的数据库,通过自动更新缺失的列,为架构管理提供强大的支持。 该插件旨在促进与监控解决方案的集成,使用户能够高效地存储和管理时序数据。 它为连接设置、并发和错误处理提供了可配置的选项,并支持高级功能,例如用于标签和字段的 JSONB 存储、外键标记、模板化架构修改以及通过 pguint 扩展对无符号整数数据类型的支持。

配置

Google Cloud PubSub

[[inputs.cloud_pubsub]]
  project = "my-project"
  subscription = "my-subscription"
  data_format = "influx"
  # credentials_file = "path/to/my/creds.json"
  # retry_delay_seconds = 5
  # max_message_len = 1000000
  # max_undelivered_messages = 1000
  # max_extension = 0
  # max_outstanding_messages = 0
  # max_outstanding_bytes = 0
  # max_receiver_go_routines = 0
  # base64_data = false
  # content_encoding = "identity"
  # max_decompression_size = "500MB"

PostgreSQL

# Publishes metrics to a postgresql database
[[outputs.postgresql]]
  ## Specify connection address via the standard libpq connection string:
  ##   host=... user=... password=... sslmode=... dbname=...
  ## Or a URL:
  ##   postgres://[user[:password]]@localhost[/dbname]?sslmode=[disable|verify-ca|verify-full]
  ## See https://postgresql.ac.cn/docs/current/libpq-connect.html#LIBPQ-CONNSTRING
  ##
  ## All connection parameters are optional. Environment vars are also supported.
  ## e.g. PGPASSWORD, PGHOST, PGUSER, PGDATABASE
  ## All supported vars can be found here:
  ##  https://postgresql.ac.cn/docs/current/libpq-envars.html
  ##
  ## Non-standard parameters:
  ##   pool_max_conns (default: 1) - Maximum size of connection pool for parallel (per-batch per-table) inserts.
  ##   pool_min_conns (default: 0) - Minimum size of connection pool.
  ##   pool_max_conn_lifetime (default: 0s) - Maximum age of a connection before closing.
  ##   pool_max_conn_idle_time (default: 0s) - Maximum idle time of a connection before closing.
  ##   pool_health_check_period (default: 0s) - Duration between health checks on idle connections.
  # connection = ""

  ## Postgres schema to use.
  # schema = "public"

  ## Store tags as foreign keys in the metrics table. Default is false.
  # tags_as_foreign_keys = false

  ## Suffix to append to table name (measurement name) for the foreign tag table.
  # tag_table_suffix = "_tag"

  ## Deny inserting metrics if the foreign tag can't be inserted.
  # foreign_tag_constraint = false

  ## Store all tags as a JSONB object in a single 'tags' column.
  # tags_as_jsonb = false

  ## Store all fields as a JSONB object in a single 'fields' column.
  # fields_as_jsonb = false

  ## Name of the timestamp column
  ## NOTE: Some tools (e.g. Grafana) require the default name so be careful!
  # timestamp_column_name = "time"

  ## Type of the timestamp column
  ## Currently, "timestamp without time zone" and "timestamp with time zone"
  ## are supported
  # timestamp_column_type = "timestamp without time zone"

  ## Templated statements to execute when creating a new table.
  # create_templates = [
  #   '''CREATE TABLE {{ .table }} ({{ .columns }})''',
  # ]

  ## Templated statements to execute when adding columns to a table.
  ## Set to an empty list to disable. Points containing tags for which there is no column will be skipped. Points
  ## containing fields for which there is no column will have the field omitted.
  # add_column_templates = [
  #   '''ALTER TABLE {{ .table }} ADD COLUMN IF NOT EXISTS {{ .columns|join ", ADD COLUMN IF NOT EXISTS " }}''',
  # ]

  ## Templated statements to execute when creating a new tag table.
  # tag_table_create_templates = [
  #   '''CREATE TABLE {{ .table }} ({{ .columns }}, PRIMARY KEY (tag_id))''',
  # ]

  ## Templated statements to execute when adding columns to a tag table.
  ## Set to an empty list to disable. Points containing tags for which there is no column will be skipped.
  # tag_table_add_column_templates = [
  #   '''ALTER TABLE {{ .table }} ADD COLUMN IF NOT EXISTS {{ .columns|join ", ADD COLUMN IF NOT EXISTS " }}''',
  # ]

  ## The postgres data type to use for storing unsigned 64-bit integer values (Postgres does not have a native
  ## unsigned 64-bit integer type).
  ## The value can be one of:
  ##   numeric - Uses the PostgreSQL "numeric" data type.
  ##   uint8 - Requires pguint extension (https://github.com/petere/pguint)
  # uint64_type = "numeric"

  ## When using pool_max_conns>1, and a temporary error occurs, the query is retried with an incremental backoff. This
  ## controls the maximum backoff duration.
  # retry_max_backoff = "15s"

  ## Approximate number of tag IDs to store in in-memory cache (when using tags_as_foreign_keys).
  ## This is an optimization to skip inserting known tag IDs.
  ## Each entry consumes approximately 34 bytes of memory.
  # tag_cache_size = 100000

  ## Enable & set the log level for the Postgres driver.
  # log_level = "warn" # trace, debug, info, warn, error, none

输入和输出集成示例

Google Cloud PubSub

  1. 物联网设备实时分析:利用 Google Cloud PubSub 插件聚合来自分布在不同位置的物联网设备的指标。 通过将来自设备的数据流式传输到 Google PubSub 并使用此插件摄取指标,组织可以创建一个集中式仪表板,用于实时监控和警报。 此设置可以立即深入了解设备性能,从而促进主动维护和运营效率。

  2. 动态日志处理和监控:通过 Google Cloud PubSub 将来自多个来源的日志摄取到 Telegraf 管道中,利用该插件解析和分析日志消息。 这可以帮助团队快速识别日志中的异常或模式,并简化跨分布式系统的问题排查过程。 通过整合日志数据,组织可以增强其可观察性和响应能力。

  3. 事件驱动的工作流集成:使用 Google Cloud PubSub 插件连接各种云函数或服务。 每次新消息被推送到订阅时,都可以在云架构的其他部分触发操作,例如启动数据处理作业、通知,甚至更新报告。 这种事件驱动的方法允许构建更具反应性的系统架构,以适应不断变化的业务需求。

PostgreSQL

  1. 使用复杂查询进行实时分析:利用 PostgreSQL 插件将来自各种来源的指标存储在 PostgreSQL 数据库中,从而使用复杂查询实现实时分析。 这种设置可以帮助数据科学家和分析师发现模式和趋势,因为他们在利用 PostgreSQL 强大的查询优化功能的同时,跨多个表操作关系数据。 具体来说,用户可以使用跨不同指标表的 JOIN 操作创建复杂的报告,从而揭示通常在嵌入式系统中隐藏的见解。

  2. 与 TimescaleDB 集成以进行时序数据分析:在 TimescaleDB 实例中使用 PostgreSQL 插件来高效地处理和分析时序数据。 通过实施超表,用户可以在时间维度上实现更高的性能和主题分区。 这种集成允许用户对大量的时序数据运行分析查询,同时保留 PostgreSQL SQL 查询的全部功能,从而确保指标分析的可靠性和效率。

  3. 数据版本控制和历史分析:实施使用 PostgreSQL 插件的策略,以维护指标在不同时间点的不同版本。 用户可以设置不可变的数据表结构,其中保留旧版本的表,从而轻松进行历史分析。 这种方法不仅提供了对数据演变的深入了解,而且有助于遵守数据保留策略,确保数据集的历史完整性保持不变。

  4. 针对不断变化的指标的动态架构管理:使用插件的模板功能创建动态变化的架构,以响应指标变化。 此用例允许组织在指标演变时调整其数据结构,添加必要的字段并确保遵守数据完整性策略。 通过利用模板化的 SQL 命令,用户无需手动干预即可扩展其数据库,从而促进敏捷数据管理实践。

反馈

感谢您成为我们社区的一份子! 如果您有任何一般性反馈或在这些页面上发现任何错误,我们欢迎并鼓励您提出意见。 请在 InfluxDB 社区 Slack 中提交您的反馈。

强大的性能,无限的扩展能力

收集、组织和处理海量高速数据。 当您将任何数据视为时序数据时,它会更有价值。 使用 InfluxDB,第一时序平台,旨在与 Telegraf 一起扩展。

查看入门方法

相关集成

HTTP 和 InfluxDB 集成

HTTP 插件从一个或多个 HTTP(S) 端点收集指标。 它支持各种身份验证方法和数据格式的配置选项。

查看集成

Kafka 和 InfluxDB 集成

此插件从 Kafka 读取消息,并允许根据这些消息创建指标。 它支持各种配置,包括不同的 Kafka 设置和消息处理选项。

查看集成

Kinesis 和 InfluxDB 集成

Kinesis 插件允许从 AWS Kinesis 流中读取指标。 它支持多种输入数据格式,并提供与 DynamoDB 的检查点功能,以实现可靠的消息处理。

查看集成