使用 InfluxDB 的多数据中心复制
作者:Dave Patton / 产品, 开发者
2017 年 9 月 29 日
导航至
简介
灾难恢复以及 InfluxDB 和 Kapacitor 的多数据中心复制是两个经常被问及的话题。在这篇文章中,我将介绍一些用于实现此目的的建议模式。此外,我将讨论每种方法的优缺点以及如何将它们结合起来。
一般来说,有两种模式可用于将数据多数据中心复制到 InfluxDB 中。第一种是在数据摄取到 InfluxDB 时将数据复制到第二个数据中心集群。第二种模式是在后端将数据从一个集群复制到另一个集群。
摄取时复制
要讨论的第一组模式是在数据摄取到 InfluxDB 时进行数据复制。这可能是最容易设置的,并且对于所有来自外部来源的新数据最有用。这些模式中的大多数在某种程度上依赖于 Telegraf。
Telegraf 是 TICK stack 的瑞士军刀,可以用作代理、数据聚合器或帮助设置数据摄取管道。Telegraf 使用输入和输出插件。截至本文撰写之时,大约有 100 多个插件。有关 Telegraf 的全面讨论,请参阅 Telegraf 文档。
在大多数情况下,我们建议在几乎所有 Influx 部署中使用 Telegraf。至少 Telegraf 可以用于批量处理所有写入数据库的请求;这是您始终应该确保做的事情。
Telegraf
图 1. Telegraf 复制
要讨论的第一种模式是单独使用 Telegraf 将所有数据在摄取时复制到两个集群。要设置此功能,我们在 Telegraf 配置文件中指定两个集群的 URL。我们将使用 InfluxDB Telegraf 输出插件。在 outputs.influxdb 部分,我们将设置以下内容
## Cluster 1
[[outputs.influxdb]]
urls = ["http://cluster1:8086"] # URL of the cluster load balancer
database = "telegraf" # Name of the DB you want to write to
retention_policy = "myRetentionPolicy"
## Write consistency (clusters only), can be: "any", "one", "quorum", "all"
write_consistency = "any"
timeout = "5s"
content_encoding = "gzip"
## Cluster 2
[[outputs.influxdb]]
urls = ["http://cluster2:8086"] # URL of the cluster load balancer
database = "telegraf" # Name of the DB you want to write to
retention_policy = "myRetentionPolicy"
## Write consistency (clusters only), can be: "any", "one", "quorum", "all"
write_consistency = "any"
timeout = "5s"
content_encoding = "gzip"
此配置指定我们要写入两个独立的 InfluxDB 集群。需要注意的一点是,您指定的 URL 应该是集群前端的负载均衡器的 URL,或者是集群中每个数据节点的 URL 列表。在这种情况下,我们的配置如下所示
## Cluster 1
[[outputs.influxdb]]
urls = ["http://Cluster1DataNode1:8086",
"http://Cluster1DataNode2:8086"] # URLs of the cluster data Nodes
database = "telegraf" # Name of the DB you want to write to
retention_policy = "myRetentionPolicy"
## Write consistency (clusters only), can be: "any", "one", "quorum", "all"
write_consistency = "any"
timeout = "5s"
content_encoding = "gzip"
## Cluster 2
[[outputs.influxdb]]
urls = ["http://Cluster2DataNode1:8086",
"http://Cluster2DataNode2:8086"] # URLs of the cluster data Nodes
database = "telegraf" # Name of the DB you want to write to
retention_policy = "myRetentionPolicy"
## Write consistency (clusters only), can be: "any", "one", "quorum", "all"
write_consistency = "any"
timeout = "5s"
content_encoding = "gzip"
使用数据节点 URL 列表,Telegraf 会将每个批次写入列表中的 URL 之一,而不是全部 URL。数据库将处理将每个批次复制到所需数据节点的过程。
## Cluster 1
[agent]
metric_buffer_limit = 123456789 # Buffer size in bytes
此模式是最容易设置的。在 Telegraf 和集群之间发生网络分区的情况下,Telegraf 将尝试重写失败的写入。此外,它会将要写入的数据存储在内存缓冲区中。此缓冲区的大小可以在 Telegraf 中配置。
设置此值时,您需要确保缓冲区足够大,可以容纳典型中断的数据。一个好的经验法则是将缓冲区设置为容纳一小时的数据,以防发生故障。显然,如果您要写入大量数据,这可能不可行,因此请相应地设置它。此缓冲区不是持久写入队列;如果 Telegraf 发生故障或关闭,则缓冲区将消失,因为它在内存中。那么,如果我们需要持久写入队列,该怎么办呢?
Kafka 和 Telegraf
图 2. 使用 Kafka 的 Telegraf 复制
在这种模式中,我在 Telegraf 实例的前面放置了 Kafka。Kafka 将为我们所有进入集群的数据提供持久写入队列。此外,这将为我们可以对所有传入数据执行的操作增加一些灵活性。例如,我们可能还希望将所有数据发送到 S3 等长期存储中,或者将其发送到其他分析平台以进行其他类型的分析。假设您已经安装了 Kafka。在这种模式下,我们的 Telegraf 配置如下所示
# Read metrics from Kafka topic(s)
[[inputs.kafka_consumer]]
## topic(s) to consume
topics = ["telegraf"]
brokers = ["kafkaBrokerHost:9092"]
## the name of the consumer group
consumer_group = "telegraf_metrics_consumers"
## Offset (must be either "oldest" or "newest")
offset = "oldest"
## Data format to consume.
data_format = "influx"
## Maximum length of a message to consume, in bytes (default 0/unlimited);
## larger messages are dropped
max_message_len = 65536
## Cluster 1
[[outputs.influxdb]]
urls = ["http://clusterLB:8086"] # URL of the cluster load balancer
database = "telegraf" # Name of the DB you want to write to
retention_policy = "myRetentionPolicy"
## Write consistency (clusters only), can be: "any", "one", "quorum", "all"
write_consistency = "any"
timeout = "5s"
content_encoding = "gzip"
上面的示例显示我们正在使用 Kafka Consumer Input Plugin。主题应该是我们数据的正确 Kafka 主题。brokers 列表可以是一个或多个 kafka brokers。每次调用 Kafka 时,Telegraf 将仅将请求发送到其中一个 brokers。consumer group 的使用是可选的,但是如果您有大量数据要从 Kafka 中提取,则可以设置多个 Telegraf 实例,每个实例都从同一个 consumer group 中提取。这将允许您提取更多数据,并且不会从 Kafka 中获得重复数据,因为 consumer group 将跟踪每个 consumer 客户端的主题偏移量。
摄取后复制
要讨论的第二组模式是那些在摄取数据后发生数据复制的模式。这些模式可以分为两个通用类别。第一个类别我称之为“直通”,我们实际上只是复制我们已摄取的数据,并将其发送到另一个 InfluxDB 实例或集群。第二个类别用于派生数据或作为 Kapacitor 作业、连续查询或 InfluxQL 查询结果集的数据。
图 3. 使用订阅进行复制
订阅最初是为将数据发送到 Kapacitor 以进行基于流的 TICK 脚本而构建的。但是,关于订阅的一个鲜为人知的事实是:它们可以通过 HTTP 或 UDP 将数据发送到您想要的任何地方。这使得它们非常方便。当您在 InfluxDB 中设置订阅时,它所做的只是转发与您指定的 database.retentionpolicy 匹配的所有输入数据。
需要注意的一个重要事项是,每个集群上的订阅不应具有相同的数据库名称和保留策略组合。这将设置一个无限循环,最终会在时空连续体中撕开一个洞;或者至少会使您的两个集群崩溃。
您的订阅的目标可以是一个或多个条目。此外,您可以指定订阅将其发送到这些目标中的“ANY”或“ALL”。这意味着除了跨集群复制之外,您还可以让它将数据发送到另一个系统以进行其他类型的分析或原始数据的长期存储。
# On Cluster 1
CREATE SUBSCRIPTION 'mySubscription' ON "myDB"."myRetentionPolicy" DESTINATIONS ALL 'http://cluster2LoadBalancer:port'
# On Cluster 2
CREATE SUBSCRIPTION 'mySubscription' ON "myDB"."myOtherRetentionPolicy" DESTINATIONS ALL 'http://cluster1LoadBalancer:port'
Kapacitor
图 4:使用 Kapacitor 进行复制
另一种与订阅非常相似的后端复制方法是使用 Kapacitor。这种模式更适用于 TICK 脚本的输出数据,我们在其中创建新数据、修饰或转换现有数据。例如,这是用于将数据从一个集群聚合或汇总到另一个集群的模式。假设我有以下 TICK 脚本,该脚本正在执行数据批处理汇总到 5 分钟分辨率
var data = batch
|query('SELECT median(usage_idle) as usage_idle FROM "telegraf"."autogen"."cpu"')
.period(5m)
.every(5m)
.groupBy(*)
我们将此分配给脚本中的变量数据。设置完成后,我们将输出分支到两个集群,如下所示
data
|influxDBOut()
.cluster('localCluster')
.database('telegraf')
.retentionPolicy('rollup_5m')
.measurement('median_cpu_idle')
.precision('s')
data
|influxDBOut()
.cluster('remoteCluster')
.database('telegraf')
.retentionPolicy('rollup_5m')
.measurement('median_cpu_idle')
.precision('s')
注意集群名称?这不是集群的 URL;而是来自我们的 kapacitor.conf 文件的命名变量。有关 kapacitor.conf 文件的更多详细信息,请参见此处。
# Multiple InfluxDB configurations can be defined.
# Exactly one must be marked as the default.
# Each one will be given a name and can be referenced
# in batch queries and InfluxDBOut nodes.
[[influxdb]]
# Connect to an InfluxDB cluster
# Kapacitor can subscribe, query and write to this cluster.
# Using InfluxDB is not required and can be disabled.
enabled = true
default = true
name = "localcluster"
urls = ["http://cluster1LoadBalancer:8086"]
username = ""
password = ""
timeout = 0
[[influxdb]]
# Connect to an InfluxDB cluster
# Kapacitor can subscribe, query and write to this cluster.
# Using InfluxDB is not required and can be disabled.
enabled = true
default = true
name = "remoteCluster"
urls = ["http://cluster2LoadBalancer:8086"]
username = ""
password = ""
timeout = 0
备份和恢复
要讨论的最后一种模式是良好的旧式备份和恢复功能。它应该被认为是一种“运动鞋网络”选项,因为它不是近乎实时的操作。您可能已经猜到,这使用了 InfluxDB Enterprise 的备份和恢复命令。当前,InfluxDB Enterprise 支持备份集群中的所有数据、单个数据库和保留策略或单个分片。
backup 命令创建数据和元存储的完整副本,从而创建拍摄备份时的快照。可以进行两种类型的备份:完整备份或增量备份。使用此模式时,您可能应该使用 cron 作业自动执行此操作,该作业每小时或每天调用一个脚本,该脚本可以执行备份。该脚本可能如下所示
#!/bin/bash
BKUPDIR=[path to backup dir]
BKUPFILE= Backup_$( date +"%Y%m%d%H%M").tar.gz
influx-ctl -bind [metahost]:8091 backup –incremental -db [db-name] $BKUPDIR
tar –cvzf $BACKUPDIR ./$BKUPFILE
scp –i credentialFIle $BKUPFILE root@cluster2DataNode:path/to/save/file
首次使用 backup 命令时,您可能应该执行完整备份。但是,–incremental
标志是默认设置,但是如果没有现有的增量备份,系统将首先执行完整备份。
在目标集群上,您可能希望在后台运行一个脚本,该脚本将监视您的 drop 文件夹,并在备份文件发送过来时执行恢复。该脚本可能如下所示
#!/bin/bash
MONITORDIR="/path/to/the/dir/to/monitor/"
inotifywait -m -r -e create --format '%w%f' "${MONITORDIR}" | while read NEWFILE
do
influx-ctl restore ${NEWFILE}
done
核选项
尽管这不是一种适当的集群到集群复制模式,但我认为仍然值得讨论。什么是“核选项”?如果整个世界,或者更确切地说,您的集群和所有数据都变得非常糟糕,这是最后的选择。我应该承认,我非常喜欢尽可能长时间地保留原始数据。这样做使我们的系统以及我们将来可能想要分析事物的方式具有更大的灵活性。实施此操作应使用摄取时复制模式之一完成,在这种模式下,我们将从输入流中分流并将所有原始数据存储在 S3 或 Glacier 等长期存储中。这样做使我们能够从源重建系统。
因此,核选项。如果我们拥有所有源数据并且需要从头开始重建集群,包括所有派生数据,那么我们现在可以通过重放源数据来做到这一点。显然,关于您拥有的源数据量以及重放它所需的时间仍然有一些考虑因素,但希望您能看到好处。
结论
在这篇文章中,我讨论了几种在两个不同的 InfluxDB Enterprise 集群之间复制数据的模式。这些只是最基本的模式,我认为可能至少存在 20 种以上的模式。我想展示一些您可以作为起点的模式。这篇文章的重点不是提供详尽的列表或说这种方法或那种方法是最佳的使用方法。实际上,没有单一的最佳方法或模式。最适合您的模式是满足您的业务目标并适合您组织的 инфраструктура、流程和实践的模式。现在去复制吧。