InfluxDB的多个数据中心复制
作者:Dave Patton / 产品,开发者
2017年9月29日
导航到
简介
InfluxDB和Kapacitor的灾难恢复以及多数据中心复制是两个经常被询问的话题。在这篇文章中,我将介绍一些实现这一目标的建议模式。此外,我还会讨论每种方法的优缺点以及它们如何组合。
通常,有几种模式可以用于将数据复制到InfluxDB的多数据中心。第一种是在数据进入InfluxDB时将其复制到第二个数据中心集群。第二种模式是从一个集群复制数据到后端另一个集群。
数据导入时的复制
要讨论的第一组模式是在InfluxDB中导入数据时的数据复制。这可能是设置最简单,对所有从外部来源进入我们集群的新数据最有用。大多数这些模式都以某种形式依赖于Telegraf。
Telegraf是TICK堆栈的多用途瑞士军刀,可以用作代理、数据聚合器或帮助设置数据导入管道。Telegraf使用输入和输出插件。截至本文撰写时,大约有100多个插件。有关Telegraf的完整讨论,请参阅Telegraf文档。
在大多数情况下,我们建议在几乎所有的Influx部署中使用Telegraf。至少,Telegraf可以用作批量写入数据库的所有请求;这是一定要做的。
Telegraf
图1. Telegraf复制
要讨论的第一个模式是使用Telegraf本身在两个集群上复制所有导入的数据。为此,我们在Telegraf配置文件中指定两个集群的URL。我们将使用InfluxDB Telegraf输出插件。在outputs.influxdb部分,我们将设置以下内容
## Cluster 1
[[outputs.influxdb]]
urls = ["http://cluster1:8086"] # URL of the cluster load balancer
database = "telegraf" # Name of the DB you want to write to
retention_policy = "myRetentionPolicy"
## Write consistency (clusters only), can be: "any", "one", "quorum", "all"
write_consistency = "any"
timeout = "5s"
content_encoding = "gzip"
## Cluster 2
[[outputs.influxdb]]
urls = ["http://cluster2:8086"] # URL of the cluster load balancer
database = "telegraf" # Name of the DB you want to write to
retention_policy = "myRetentionPolicy"
## Write consistency (clusters only), can be: "any", "one", "quorum", "all"
write_consistency = "any"
timeout = "5s"
content_encoding = "gzip"
此配置指定我们想要写入两个不同的InfluxDB集群。需要注意的是,您指定的URL应该是集群前面的负载均衡器的URL或集群中每个数据节点的URL列表。在这种情况下,我们的配置如下所示
## Cluster 1
[[outputs.influxdb]]
urls = ["http://Cluster1DataNode1:8086",
"http://Cluster1DataNode2:8086"] # URLs of the cluster data Nodes
database = "telegraf" # Name of the DB you want to write to
retention_policy = "myRetentionPolicy"
## Write consistency (clusters only), can be: "any", "one", "quorum", "all"
write_consistency = "any"
timeout = "5s"
content_encoding = "gzip"
## Cluster 2
[[outputs.influxdb]]
urls = ["http://Cluster2DataNode1:8086",
"http://Cluster2DataNode2:8086"] # URLs of the cluster data Nodes
database = "telegraf" # Name of the DB you want to write to
retention_policy = "myRetentionPolicy"
## Write consistency (clusters only), can be: "any", "one", "quorum", "all"
write_consistency = "any"
timeout = "5s"
content_encoding = "gzip"
使用数据节点URL列表,Telegraf将每个批次写入列表中的一个URL,而不是所有URL。数据库将处理将每个批次复制到所需的数据节点。
## Cluster 1
[agent]
metric_buffer_limit = 123456789 # Buffer size in bytes
这种模式是最容易设置的。在Telegraf和集群之间的网络分区的情况下,Telegraf将尝试重写失败的写入。此外,它会在内存缓冲区中存储要写入的数据。此缓冲区的大小可以在Telegraf中配置。
在设置此值时,您要确保缓冲区足够大,可以存储典型故障期间的数据。一个很好的经验法则是将缓冲区设置为在故障情况下存储一小时的值得的数据。显然,如果您正在写入大量数据,这可能不可行,因此相应地设置它。这个缓冲区不是一个持久的写入队列;如果Telegraf失败或关闭,缓冲区就会消失,因为它在内存中。那么,如果我们需要持久的写入队列怎么办呢?
Kafka和Telegraf
图2. 使用Kafka的Telegraf复制
在这个模式中,我在Telegraf实例前面放置了Kafka。Kafka将为所有进入集群的数据提供持久的写入队列。此外,这还将为我们可以对进入的所有数据做什么添加一些灵活性。例如,我们可能还想将所有数据发送到长期存储,例如S3,或者将其发送到其他分析平台进行其他类型的分析。假设您已经安装了Kafka。在这个模式中,我们的Telegraf配置将如下所示:
# Read metrics from Kafka topic(s)
[[inputs.kafka_consumer]]
## topic(s) to consume
topics = ["telegraf"]
brokers = ["kafkaBrokerHost:9092"]
## the name of the consumer group
consumer_group = "telegraf_metrics_consumers"
## Offset (must be either "oldest" or "newest")
offset = "oldest"
## Data format to consume.
data_format = "influx"
## Maximum length of a message to consume, in bytes (default 0/unlimited);
## larger messages are dropped
max_message_len = 65536
## Cluster 1
[[outputs.influxdb]]
urls = ["http://clusterLB:8086"] # URL of the cluster load balancer
database = "telegraf" # Name of the DB you want to write to
retention_policy = "myRetentionPolicy"
## Write consistency (clusters only), can be: "any", "one", "quorum", "all"
write_consistency = "any"
timeout = "5s"
content_encoding = "gzip"
上述示例显示我们正在使用Kafka消费者输入插件。主题应该是我们的数据对应的正确Kafka主题。代理列表可以是一个或多个Kafka代理。在每次调用Kafka时,Telegraf将只向其中一个代理发送请求。消费者组的使用是可选的,但如果您要从Kafka中提取大量数据,则可以设置多个Telegraf实例,每个实例都从相同的消费者组提取。这将允许您提取更多数据,并且不会有重复的数据从Kafka中,因为消费者组将跟踪每个消费者客户端的主题偏移量。
摄入后复制
接下来要讨论的第二组模式是在我们摄入数据后发生数据复制的模式。这些模式可以分为两个一般类别。第一个类别是所谓的“透明传输”,我们只是复制我们摄入的数据并将其发送到另一个InfluxDB实例或集群。第二个类别是派生数据或类似Kapacitor作业、连续查询或InfluxQL查询结果集的数据。
图3. 通过订阅复制
订阅最初是为了向Kapacitor发送基于流的TICK脚本而构建的。但这里有一个鲜为人知的事实关于订阅:它们可以通过HTTP或UDP将数据发送到任何地方。这使得它们非常有用。当您在InfluxDB中设置订阅时,它所做的只是转发所有匹配您指定的database.retentionpolicy的输入数据。
需要注意的是,每个集群上的订阅不应具有相同的数据库名称和保留策略的组合。这将设置一个无限循环,最终会在时空连续体中撕开一个洞;或者至少会导致您的两个集群崩溃。
您的订阅目的地可以是一个或多个条目。此外,您可以指定订阅将数据发送到“ANY”或“ALL”这些目的地之一。这意味着除了跨集群复制之外,您还可以将其发送到另一个系统进行其他类型的分析或长期存储原始数据。
# On Cluster 1
CREATE SUBSCRIPTION 'mySubscription' ON "myDB"."myRetentionPolicy" DESTINATIONS ALL 'http://cluster2LoadBalancer:port'
# On Cluster 2
CREATE SUBSCRIPTION 'mySubscription' ON "myDB"."myOtherRetentionPolicy" DESTINATIONS ALL 'http://cluster1LoadBalancer:port'
Kapacitor
图4:与Kapacitor复制
对后端复制的另一种看法与订阅非常相似,那就是使用Kapacitor。这种模式更适用于输出TICK脚本的数据,其中我们正在创建新数据、装饰或转换现有数据。例如,这是用于将一个集群的数据聚合或汇总到另一个集群的模式。假设我有一个以下TICK脚本来执行批量汇总到5分钟分辨率的操作:
var data = batch
|query('SELECT median(usage_idle) as usage_idle FROM "telegraf"."autogen"."cpu"')
.period(5m)
.every(5m)
.groupBy(*)
我们在脚本中将此分配给变量data。一旦设置好,我们就将输出分支到两个集群,如下所示:
data
|influxDBOut()
.cluster('localCluster')
.database('telegraf')
.retentionPolicy('rollup_5m')
.measurement('median_cpu_idle')
.precision('s')
data
|influxDBOut()
.cluster('remoteCluster')
.database('telegraf')
.retentionPolicy('rollup_5m')
.measurement('median_cpu_idle')
.precision('s')
注意集群名称吗?这并不是集群的URL;而是一个来自我们kapacitor.conf文件的命名变量。有关kapacitor.conf文件的更多详细信息,请在此处查看。
# Multiple InfluxDB configurations can be defined.
# Exactly one must be marked as the default.
# Each one will be given a name and can be referenced
# in batch queries and InfluxDBOut nodes.
[[influxdb]]
# Connect to an InfluxDB cluster
# Kapacitor can subscribe, query and write to this cluster.
# Using InfluxDB is not required and can be disabled.
enabled = true
default = true
name = "localcluster"
urls = ["http://cluster1LoadBalancer:8086"]
username = ""
password = ""
timeout = 0
[[influxdb]]
# Connect to an InfluxDB cluster
# Kapacitor can subscribe, query and write to this cluster.
# Using InfluxDB is not required and can be disabled.
enabled = true
default = true
name = "remoteCluster"
urls = ["http://cluster2LoadBalancer:8086"]
username = ""
password = ""
timeout = 0
备份和恢复
最后要讨论的模式是老旧的备份和恢复功能。它应该被视为一种“跑鞋网络”选项,因为它不是在近实时运行的。正如你可能猜到的,这使用了InfluxDB Enterprise的备份和恢复命令。目前,InfluxDB Enterprise支持备份集群中的所有数据,单个数据库和保留策略,或者单个分片。
备份命令会创建数据和元存储的完整副本,从而在备份时创建一个快照。可以采取两种类型的备份:完整备份或增量备份。当使用此模式时,你可能需要使用cron作业自动化,每小时或每天调用一个脚本来执行备份。这个脚本可能看起来像这样
#!/bin/bash
BKUPDIR=[path to backup dir]
BKUPFILE= Backup_$( date +"%Y%m%d%H%M").tar.gz
influx-ctl -bind [metahost]:8091 backup –incremental -db [db-name] $BKUPDIR
tar –cvzf $BACKUPDIR ./$BKUPFILE
scp –i credentialFIle $BKUPFILE root@cluster2DataNode:path/to/save/file
第一次使用备份命令时,你可能需要执行完整备份。然而,–incremental标志是默认的,但没有现有的增量备份时,系统将首先执行完整备份。
在目标集群上,你可能需要在后台运行一个脚本,该脚本将监视你的丢弃文件夹,并在备份文件发送过来时执行恢复。这个脚本可能看起来像这样
#!/bin/bash
MONITORDIR="/path/to/the/dir/to/monitor/"
inotifywait -m -r -e create --format '%w%f' "${MONITORDIR}" | while read NEWFILE
do
influx-ctl restore ${NEWFILE}
done
核选项
尽管这不是一个真正的集群间复制模式,但我认为它仍然值得讨论。什么是“核选项”?这是当整个世界(或者说你的集群和所有数据)都变得非常糟糕时的最后手段选项。我应该承认,我是一个喜欢尽可能长时间保留原始数据的粉丝。这样做使我们能够在系统中有更多的灵活性,并可能在未来的某个时间点分析事物。实施此方法应使用复制在摄取模式之一,我们将从输入流中提取并存储所有原始数据在长期存储如S3或Glacier。这样做使我们能够从源重建我们的系统。
因此,核选项。如果我们有所有源数据,需要从头开始重建我们的集群,包括所有衍生数据,现在我们可以通过重新播放我们的源数据来实现这一点。显然,仍然有一些关于你有多少源数据以及重新播放它所需时间的考虑,但希望你能看到它的好处。
结论
在这篇文章中,我讨论了在InfluxDB Enterprise的两个不同集群之间复制数据的一些模式。这些只是最基本的模式,我认为可能至少存在另外20多种。我想提出一些你可以用作起点模式。这篇文章的目的不是提供一个详尽的列表,也不是说这个方法或那个方法是最佳使用方法。实际上,没有一种单一的最佳方法或模式。对你来说最好的模式是满足你的商业目标,并适应你组织的基础设施、流程和实践的模式。现在去复制吧。