使用 Python 将数据写入 InfluxDB

导航至

我们经常看到 Python 开发者使用 InfluxDB 时遇到的问题之一是如何提高使用 InfluxDB 客户端库的程序写入性能。通常,他们正在尝试导入或传输大量数据,并确保数据能够以足够快的速度插入到数据库中,以满足他们的业务需求。在这篇文章中,我们将讨论人们经常遇到的常见陷阱,以及一些性能最佳实践。

脚本

让我们来看一个示例 Python 脚本,它将数据写入 InfluxDB。您可以在这里找到它。脚本的前一部分(直到第 56 行)用于创建我们将用于填充测量的数据;有一个位置列表、一个水果列表和一个生成 1000 个 UUID 的函数。

我们创建一个数组,并使用以下方式将第一个值追加到它中

data = []
data.append("{measurement},location={location},fruit={fruit},id={id} x={x},y={y},z={z}i {timestamp}"
            .format(measurement=measurement_name,
                    location=random.choice(location_tags),
                    fruit=random.choice(fruit_tags),
                    id=random.choice(id_tags),
                    x=round(random.random(),4),
                    y=round(random.random(),4),
                    z=random.randint(0,50),
                    timestamp=data_start_time))

这将追加一个看起来像以下字符串的字符串

m1,location=location,fruit=fruit,id=id x=10,y=1,z=42i 1562458785618

在脚本的开头,我们定义了一个变量,其中包含我们想要写入的点数:在本例中,250,000。一旦数据被生成并加载到数组中,我们就可以按照以下方式将其写入数据库

client.write_points(data, database='python-throughput', time_precision='ms', batch_size=10000, protocol='line')

测量性能

大多数 Unix 系统都有一版 time 命令,它测量进程执行所需的时间,并将其打印到命令行。

$ time echo "hello world"
hello world

real	0m0.000s
user	0m0.000s
sys	0m0.000s

这是一个许多 Linux 用户熟悉的工具,但以这种方式测量写入性能的问题是,你最终测量的是整个程序。如果你像我们这样生成随机数据,或者解析 CSV,这可能会是一段有意义的时间。为了测量程序中的各个部分,我们将使用 time 模块perf_counter() 函数。

time.perf_counter() accesses the CPU's hardware performance counter and returns a value in fractional seconds. The performance counter is the highest resolution timer available for your system. It starts when the CPU is powered on, and runs continuously, which means that to measure the elapsed time of something you can get the time of the counter before and after an event occur and then subtract the start value from the end value.

由于我们想要了解写入数据的表现,我们使用 time.perf_counter() 来计时 write_points() 函数。

client_write_start_time = time.perf_counter()

client.write_points(data, database='python-throughput', time_precision='ms', batch_size=10000, protocol='line')

client_write_end_time = time.perf_counter()

print("Client Library Write: {time}s".format(time=client_write_end_time - client_write_start_time))

这对我们的用例来说很完美,但如果你有其他需求,Python 还提供了一个名为 timeit 的模块,它“提供了一个简单的方式来计时 Python 代码的片段”。对于更大、更复杂的应用程序,你可能想 考虑使用分析器,这将为你提供有关程序整个执行过程的详细信息。

让我们运行这个脚本!我们将使用时间函数来计时整个脚本的执行时间,以便我们可以将其与 write_points() 函数的执行时间进行比较。

$ time python3 write_test.py 
Client Library Write: 3.4734167899999995s

real	0m6.060s
user	0m2.387s
sys	0m0.114s

我运行了5次脚本,每次运行之间都删除数据库并重新开始:在2018年双核MacBook Pro(承认其具有快速的SSD)上,250,000个点的平均写入时间为3.81秒,而整个脚本(包括生成数据和初始化和退出程序所需的时间)的平均运行时间为6.35秒。

优先使用行协议而非JSON

Python客户端库的write_points函数可以接受JSON或InfluxDB行协议中的数据。然而,通过查看代码,我们发现任何以JSON格式提供的数据在发送到InfluxDB之前都会被转换为行协议

if protocol == 'json':
  data = make_lines(data, precision).encode('utf-8')

可以在这里找到make_lines()函数,该函数负责将JSON转换为行协议。

如果我们从一开始就提供行协议数据来避免这项工作,我们就可以节省一些时间。让我们进行一些测试,看看数据是什么样的。您可以在这里找到脚本的JSON版本。我运行了5次,并再次计算了平均结果。与直接使用行协议相比,使用JSON格式插入所有数据平均需要7.65秒,慢了两倍。

优化2:批量处理您的点

InfluxDB在批量将数据写入数据库时表现最佳。这有助于通过一次传输更多数据来最小化打开和关闭HTTP连接的网络开销。InfluxDB的理想批量大小为5,000-10,000点。

默认情况下,write_points() 函数将传输传递给函数的数组中的所有点,但您也可以提供一个参数来定义批量大小。在示例脚本中,我们事先生成250,000个点,然后以10,000点的批量写入这些点。

client.write_points(data, database='python-throughput', time_precision='ms', batch_size=10000, protocol='line')

相比之下,我使用50的批量大小运行了脚本;在这种情况下,我们的代码的write_points部分平均约为29秒!这是一个数量级的差距!

下一步:测量更多事物!

如果您的程序运行速度仍然太慢,那么检查您的代码运行所在系统的其他属性总是值得的。资源如何?RAM、CPU、磁盘空间?您的网络连接的吞吐量或磁盘I/O如何?有许多外部因素可以影响代码的执行:有系统地测试它们并收集数据以排除原因。

如果您有关于使用Python库优化写入性能的更多想法,我们非常乐意听取!提交一个pull request、在社区论坛上评论,或直接联系我[email protected]