使用 Python 将数据写入 InfluxDB

导航至

我们经常从使用 InfluxDB 的 Python 开发者那里看到一个问题,即如何提高使用 InfluxDB 客户端库的程序的写入性能。通常,他们试图导入或传输大量数据,并希望确保数据可以足够快地插入数据库以满足其业务需求。在这篇文章中,我们将介绍人们常遇到的常见陷阱,以及一些性能最佳实践。

脚本

让我们看一个将数据写入 InfluxDB 的 Python 脚本示例。您可以在这里找到它。脚本的第一部分(直到第 56 行)用于创建我们将用来填充测量值的数据;有一个位置列表、一个水果列表和一个生成 1000 个 UUID 的函数。

我们创建一个数组,并使用我们的数据将第一个值附加到它,如下所示

data = []
data.append("{measurement},location={location},fruit={fruit},id={id} x={x},y={y},z={z}i {timestamp}"
            .format(measurement=measurement_name,
                    location=random.choice(location_tags),
                    fruit=random.choice(fruit_tags),
                    id=random.choice(id_tags),
                    x=round(random.random(),4),
                    y=round(random.random(),4),
                    z=random.randint(0,50),
                    timestamp=data_start_time))

这将附加一个看起来像这样的字符串

m1,location=location,fruit=fruit,id=id x=10,y=1,z=42i 1562458785618

在脚本的开头,我们定义了一个变量,其中包含我们要写入的点数:在本例中为 250,000。一旦数据生成并加载到数组中,我们就可以将其写入数据库,如下所示

client.write_points(data, database='python-throughput', time_precision='ms', batch_size=10000, protocol='line')

测量性能

大多数 Unix 系统都有某种版本的 time 命令,该命令测量进程执行所需的时间,然后将其打印到命令行。

$ time echo "hello world"
hello world

real	0m0.000s
user	0m0.000s
sys	0m0.000s

这是一个许多 Linux 用户熟悉的工具,但以这种方式测量写入性能的问题在于,您最终会计时整个程序。如果您像我们一样生成随机数据或解析 CSV,这可能是一段有意义的时间。为了测量程序的各个部分,我们将使用 time 模块perf_counter() 函数。

time.perf_counter() accesses the CPU's hardware performance counter and returns a value in fractional seconds. The performance counter is the highest resolution timer available for your system. It starts when the CPU is powered on, and runs continuously, which means that to measure the elapsed time of something you can get the time of the counter before and after an event occur and then subtract the start value from the end value.

由于我们想了解写入数据的性能,我们使用 time.perf_counter() 来计时 write_points() 函数。

client_write_start_time = time.perf_counter()

client.write_points(data, database='python-throughput', time_precision='ms', batch_size=10000, protocol='line')

client_write_end_time = time.perf_counter()

print("Client Library Write: {time}s".format(time=client_write_end_time - client_write_start_time))

这非常适合我们的用例,但如果您的需求不同,Python 还提供了一个名为 timeit 的模块,该模块“提供了一种简单的方法来计时小段 Python 代码”。对于更大、更复杂的应用程序,您可能需要 考虑使用性能分析器,它将为您提供有关程序执行的详细信息。

让我们运行脚本!我们将继续使用 time 函数来计时整个脚本的执行,以便我们可以将其与 write_points() 函数的执行进行比较。

$ time python3 write_test.py 
Client Library Write: 3.4734167899999995s

real	0m6.060s
user	0m2.387s
sys	0m0.114s

我运行了该脚本 5 次,在每次运行之间删除数据库并重新开始:在我的 2018 年双核 MacBook Pro(它有一个公认的快速 SSD)上,写入 250,000 个点的平均写入时间为 3.81 秒,而整个脚本的平均经过时间(包括生成数据以及初始化和退出程序所需的时间)为 6.35 秒。

优先使用 Line Protocol 而不是 JSON

Python 客户端库的 write_points 函数 可以接受 JSON 或 InfluxDB Line Protocol 格式的数据。查看代码,我们发现以 JSON 格式提供的任何数据在发送到 InfluxDB 之前都会转换为 Line Protocol

if protocol == 'json':
  data = make_lines(data, precision).encode('utf-8')

make_lines() 函数 可以在这里找到,它完成了从 JSON 到 Line Protocol 的转换工作。

如果我们能够通过一开始就以 Line Protocol 格式提供数据来避免这项工作,我们将节省一些时间。让我们运行一些测试,看看数据是什么样的。您可以在此处找到 JSON 版本的脚本。我运行了 5 次,再次对结果取平均值。使用 JSON 而不是 Line Protocol 格式的数据,插入所有数据平均花费 7.65 秒,比直接使用 Line Protocol 慢大约两倍。

优化 2:批量处理您的点

当数据以批处理方式写入数据库时,InfluxDB 性能最佳。这有助于通过一次传输更多数据来最大限度地减少打开和关闭 HTTP 连接的网络开销。InfluxDB 的理想批量大小为 5,000-10,000 个点。

默认情况下,write_points() 函数将传输传递给该函数数组中的所有点,但您也可以提供一个参数来定义批量大小。在示例脚本中,我们预先生成 250,000 个点,然后以 10,000 个点的批次写入这些点

client.write_points(data, database='python-throughput', time_precision='ms', batch_size=10000, protocol='line')

为了比较,我运行了批量大小为 50 的脚本;使用该设置,我们代码的 write_points 部分平均约为 29 秒!高出一个数量级!

后续步骤:测量更多内容!

如果您的程序执行速度仍然太慢,那么始终值得检查运行代码系统的其他属性。您的资源看起来如何?RAM、CPU、磁盘空间?您的网络连接或磁盘 I/O 的吞吐量是多少?有很多外部因素会影响代码的执行:有条不紊地测试它们并收集数据以排除原因。

如果您有更多关于使用 Python 库优化写入性能的想法,我们很乐意听到!打开一个拉取请求,在我们的社区论坛上发表评论,或直接通过 [email protected] 与我联系。