时序数据、InfluxDB和向量数据库

导航至

将时序数据与向量数据库的强大功能相结合,为分析和机器学习应用开辟了新的领域。时序数据以其顺序性和时间戳为特征,在金融市场到物联网设备等各个领域的监控和预测中发挥着关键作用。InfluxDB,作为领先的时序数据库,在高效和可扩展地处理此类数据方面表现出色。

在另一端,是Milvus,这是一种高度通用的向量数据库,旨在管理和查询高维向量数据,实现对于AI驱动的洞察至关重要的高级相似度搜索。

本博客文章探讨了InfluxDB和Milvus之间的无缝协同作用,引导您通过从InfluxDB查询数据、规范化数据、将其转换为向量并将它们写入Milvus的过程。

最后,我们对时间序列进行相似性搜索,以确定监控条件是否熟悉。通过利用数据库的优势,我们解锁了一种全面的方法来管理和利用复杂数据集,以应用于前沿技术。

需求

为了运行此示例,您需要以下内容

  • Docker
  • Jupyter Notebooks
  • 免费的InfluxDB Cloud实例

您需要从您的InfluxDB实例获取以下凭证

  • 桶/数据库
  • 令牌
  • URL

要运行此示例,请克隆以下存储库并遵循README.md中的说明。在此步骤中,您将构建并启动Milvus容器,并运行Jupyter Notebook。

什么是向量数据库?

向量数据库是专为高效处理向量数据(表示复杂数据点如图像、文本或音频特征的数字数组)而设计的专用存储系统。这些数据库在存储、索引和查询高维向量方面表现出色,通常由机器学习模型或算法生成。它们之所以受欢迎,主要原因是它们在执行相似性搜索方面的卓越能力;它们可以快速识别与查询向量最相似的向量,这对于推荐系统、图像和语音识别以及自然语言处理等应用至关重要。

人工智能和机器学习的兴起推动了高效相似性搜索机制的需求,使向量数据库成为焦点。它们使用先进的索引技术来管理维度灾难——在传统数据库方法难以应对性能的高维空间中的一项挑战。这种效率即使在大量数据集中也能实现实时查询,提高了用户体验和机器学习模型的有效性。

此外,向量数据库促进了数据的细致理解和处理,超越了简单的关键词搜索,并拥抱了现实世界数据的复杂性。这种能力使应用程序能够提供更准确和相关的结果,推动了向量数据库在各个行业的广泛应用,这些行业旨在利用人工智能和机器学习的创新来获得竞争优势。

TSDB和向量数据库场景

为了更好地理解时间序列数据库(TSDB)和向量数据库之间的关系,将它们的使用置于一个示例场景中是很有帮助的。想象您正在开发一个平台,该平台提供实时交通监控和模式识别,以改善城市管理。具体来说,您想要创建一个解决方案,用于识别交通条件是否异常,并确定存在的交通异常类型,如事故、拥堵或施工。InfluxDB会存储传感器的时序数据,而Zilliz会存储图像数据并管理复杂查询的搜索索引。在此场景中,您可以在InfluxDB中持续监控如平均速度和车辆数量的交通数据。您也可能在Milvus中收集交通图像数据。当交通条件超出常规时,您将异常交通数据发送到Milvus。然后,在Milvus中进行相似性搜索以确定异常类型。

本帖将突出以下内容

  1. 生成假交通数据
  2. 将其写入InfluxDB v3
  3. 从InfluxDB v3中查询它
  4. 对数据进行归一化和向量化(本节的许多代码来自以下教程https://github.com/KxSystems/kdbai-samples/blob/main/pattern_matching/pattern_matching.ipynb
  5. 将其插入到Milvus
  6. 使用新的时间序列执行相似性搜索,以尝试确定其是否与过去的时序相似

开始步骤

首先,我们将生成一些假时间序列交通数据

def generate_sensor_data(anomaly_type, speed_limit, vehicle_count_range, avg_speed_range, rows=500):
    """
    Generate a DataFrame with sensor data including vehicle count, avg speed, anomaly type, and timestamp.

    Parameters:
    - anomaly_type: string, the type of traffic anomaly (traffic, accident, road work)
    - vehicle_count_range: tuple, the range (min, max) of vehicle count
    - avg_speed_range: tuple, the range (min, max) of average speed
    - rows: int, number of rows to generate

    Returns:
    - df: pandas DataFrame

    # Generate data
    np.random.seed(42)  # For reproducible results
    vehicle_counts = np.random.randint(vehicle_count_range[0], vehicle_count_range[1], size=rows)
    avg_speeds = np.random.uniform(avg_speed_range[0], avg_speed_range[1], size=rows)

    # Generate timestamps
    start_time = datetime.now()
    timestamps = [start_time + timedelta(seconds=5*i) for i in range(rows)]

    # Create DataFrame
    df = pd.DataFrame({
        'Timestamp': timestamps,
        'Vehicle Count': vehicle_counts,
        'Average Speed': avg_speeds,
        'Anomaly Type': anomaly_type,
        'Speed Limit': speed_limit
    })

    return df

# Example usage
vehicle_count_range = (20, 40)  # Configure the min and max vehicle count here
avg_speed_range = (20.0, 45.0)  # Configure the min and max average speed here
df = generate_sensor_data("accident", 45, vehicle_count_range, avg_speed_range)

# Display the DataFrame
df.head()  # Showing the first 5 rows for brevity

我们的数据看起来像这样:

接下来,我们可以使用 InfluxDB v3 Python 客户端库来编写和查询 DataFrame 到 InfluxDB。请参见这个示例了解如何进行操作。在实际应用中,我们可能会从 MQTT 代理收集数据,并订阅主题以将数据写入 InfluxDB。然后,我们可以使用批量任务来识别数据是否超出正常范围。

从 InfluxDB 编写和查询 Pandas DataFrame

本教程的目标不是突出如何从 InfluxDB 查询 Pandas DataFrame。假设您已经在 InfluxDB 实例中存储了一些数据,您偶尔会查询这些数据,将其插入到 Milvus 中,并进行相似性搜索以帮助对其进行分类。

然而,您可以使用以下方式查询和将 df 写入 InfluxDB

from influxdb_client_3 import InfluxDBClient3
client = InfluxDBClient3(token="DATABASE_TOKEN",
                         host="HOST",
                         database="DATABASE_NAME")

client.write(bucket="DATABASE_NA<E", record=df, data_frame_measurement_name='generated_data', data_frame_tag_columns=['Anomaly Type', 'Speed Limit'], data_frame_timestamp_column='Timestamp')

query = "SELECT * FROM generated_data WHERE time >= now() - INTERVAL '90 days'"
pd = client.query(query=query, mode="pandas")

请参阅InfluxDB v3 Python 客户端库以获取更多详细信息。

窗口、归一化和向量化时间序列数据

归一化时间序列数据是数据分析中一个关键的前处理步骤,特别是在处理来自不同来源和尺度的时序数据,或者准备数据用于机器学习模型时。归一化将不同的时序数据集带到共同尺度,而不会扭曲值范围的差异或丢失信息。本质上,它使您能够在相似性搜索中保留并比较时序数据的形状,而不将比较限制在某个范围内。有几种归一化方法,如 Min-Max 归一化、Z-Score 归一化(标准化)、小数缩放等。在本例中,我们将使用 Z-Score 归一化,该方法涉及对数据进行重新缩放,使其具有 0 均值和 1 标准差。

归一化的目标包括

  • 一致性:归一化将数据带到共同尺度,使得比较和组合来自不同来源或具有不同单位的时间序列数据变得更容易。这在您想在向量中包含多个相关时间序列时特别有用。例如,如果您正在监控天气模式,您可能希望创建一个包含湿度时间序列数据和温度时间序列数据的多维向量。
  • 改进模型性能:归一化数据通常会导致机器学习模型或相似性搜索的准确性提高,因为它确保了数据尺度不会偏置学习过程。
  • 兼容性:向量化并归一化的数据与一系列分析工具和算法兼容,使得分析和效果更加直接和有效。

在归一化我们的时序数据之后,我们可以将其向量化。向量化是一个将时序转换为适合机器学习模型、统计分析或其他计算过程(如存储在向量数据库中)的格式的过程。这可能包括特征提取、分段/窗口、编码、重塑和填充等步骤。在本教程中,我们将专注于分段/窗口,因为它使您能够捕获数据中的时间依赖性,并且可能是最重要的向量化类型。窗口涉及将时序数据分段为更小、固定大小的窗口或序列。您可以将其每个部分视为一个向量。

以下是归一化我们平均速度数据后的相应代码和 DataFrame 输出

# Set the window size (number of rows in each window)

window_size = 24
step_size = 10
# define windows
windows = [
    df.iloc[i : i + window_size]
    for i in range(0, len(df) - window_size + 1, step_size)
]
# Iterate through the windows & extract column values
start_times = [w["Timestamp"].iloc[0] for w in windows]
end_times = [w["Timestamp"].iloc[-1] for w in windows]
avg_speed_values = [w["Average Speed"].tolist() for w in windows]
vehicle_count_values = [w["Vehicle Count"].tolist() for w in windows]

# Create a new DataFrame from the collected data
embedding_df = pd.DataFrame(
    {"start_time": start_times, "end_time": end_times, "vectors": avg_speed_values}
)

# Function to normalize the sensor column
def normalize_vector(vectors: list) -> list:
    min_val = min(vectors)
    max_val = max(vectors)
    return (
        [0.0] * len(vectors)
        if max_val == min_val
        else [(v - min_val) / (max_val - min_val) for v in vectors]
    )
embedding_df["vectors"] = embedding_df["vectors"].apply(normalize_vector)

# Apply a lambda function to convert timestamps to Unix timestamp format. 
embedding_df['start_time'] = embedding_df['start_time'].apply(lambda x: pd.Timestamp(x).timestamp()).astype(int)
embedding_df['end_time'] = embedding_df['end_time'].apply(lambda x: pd.Timestamp(x).timestamp()).astype(int)

embedding_df.head()


创建集合并将实体插入到 Milvus 中

在Milvus的语境下,嵌入是存储实体的核心组件。您使用嵌入来执行相似性搜索,可以根据各种距离度量(例如,欧几里得距离、余弦相似度)找到与给定查询嵌入最接近的实体。一个实体是一个记录,包含一个或多个嵌入(向量)以及可能的其他标量字段。

现在,我们已经创建了窗口化和归一化平均速度交通数据的嵌入,我们最终可以将数据或实体插入到Milvus中。让我们首先创建一个实体。

# Data to insert from DataFrame into Milvus
data_to_insert = [
    {"vector_field": v, "start_time_field": start, "stop_time_field": stop, "anomaly_type_field": anomaly_type}
    for v, start, stop, anomaly_type in zip(embedding_df["vectors"], embedding_df["start_time"], embedding_df["end_time"], embedding_df["anomaly_type"])
]
# Including output of the data_to_insert or entities to understand the dictionary structure. 

其中data_to_insert[1]看起来像

{'vector_field': [0.4152889075137363,
  0.0,
  0.5160786959257689,
  0.9339400228588051,
  0.9594734908121221,
  0.005125718915339806,
  1.0,
  0.5317701130450202,
  0.1734560938192518,
  0.7041917323428484,
  0.3683492576103222,
  0.2146284560942277,
  0.6996001992375626,
  0.022975347039706807,
  0.14385381806643827,
  0.29834547817214996,
  0.03032638050008518,
  0.25497817191418676,
  0.5015006484653414,
  0.7528119491160616,
  0.7916886403499066,
  0.8741175973964925,
  0.7576124843394686,
  0.6639498473288101],
 'start_time_field': 1710977934,
 'stop_time_field': 1710978049,
 'anomaly_type_field': 'accident'}

现在,我们已准备好将数据插入到Milvuz中。首先,我们导入客户端并建立连接。然后,我们可以定义一个模式并创建一个索引。Milvus支持多种索引,每种索引针对不同类型的向量搜索操作进行了优化。索引的选择取决于数据集的大小、向量的维度、查询延迟要求以及可接受的搜索精度和速度之间的权衡。在这个上下文中,索引的目的是优化与查询向量相似的向量的搜索。这被称为相似性搜索或最近邻搜索。

  • 平面索引:这是最简单的索引形式,本质上是一种穷举搜索,其中查询向量与数据库中的每个向量进行比较,以找到最接近的匹配项。虽然准确,但计算量也最大。
  • IVF(倒排文件)索引:这种方法涉及根据相似性将向量聚类成组,然后在查询期间只搜索最有希望的簇,显著减少了所需的比较次数。
  • HNSW(分层可导航小世界)索引:这创建了一个分层图结构,其中每个节点是一个向量,边连接在向量空间中彼此接近的节点。搜索从顶层向下导航以有效地找到邻近的节点。
  • PQ(乘积量化)索引:这种方法通过将向量分成更小的块并对每个块进行量化(将每个块量化为有限数量的质心)来压缩向量。它允许通过近似原始向量来实现更快的搜索和更低的存储。
  • ANNOY(近似最近邻哦耶)索引:ANNOY构建了一棵二叉树森林,其中每棵树都是通过使用随机选择的超平面将数据集分成两部分来构建的。这种结构允许进行高效的近似搜索。

对于这个教程,我们将使用平面索引,因为我们既不插入也不在大型数据集上搜索。虽然平面索引对于大型数据集来说效率不高,但它提供了最高的精度,并且可以适用于较小数据集的实时搜索或极端精度至关重要的情况。

from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility

import pandas as pd

# Establish a connection to Milvus server (adjust host and port as necessary)
connections.connect("default", host="127.0.0.1", port="19530")

In [17]:
# Define the schema for your collection
pk = FieldSchema(name="pk", dtype=DataType.INT64, is_primary=True, auto_id=True)
vector_field = FieldSchema(name="vector_field", dtype=DataType.FLOAT_VECTOR, dim=24)
start_time_field = FieldSchema(name="start_time_field", dtype=DataType.INT64)
stop_time_field = FieldSchema(name="stop_time_field", dtype=DataType.INT64)
anomaly_type_field = FieldSchema(name="anomaly_type_field", dtype=DataType.VARCHAR, max_length=100)

# Create a collection schema
schema = CollectionSchema(fields=[pk, vector_field, start_time_field, stop_time_field, anomaly_type_field], description="test collection")

# Define your collection name
collection_name = "example_collection"

# Fix: Correctly check if the collection exists
if not utility.has_collection(collection_name):
    collection = Collection(name=collection_name, schema=schema)
    print(f"Collection {collection_name} created.")
else:
    collection = Collection(name=collection_name)
    print(f"Collection {collection_name} already exists.")

# Insert the data into Milvus
insert_result = collection.insert(data_to_insert)
print("Insertion is successful, IDs assigned to the inserted entities:", insert_result.primary_keys)

# Create an index for better search performance 
index_params = {
    "index_type": "IVF_FLAT",
    "metric_type": "L2",
    "params": {"nlist": 128}
}
collection.create_index(field_name="vector_field", index_params=index_params)
print("Index created.")

# Load the collection into memory (before searching)
collection.load()

现在,我们已准备好执行相似性搜索以找到与我们第一个嵌入或时间序列窗口最接近的其他时间序列。搜索将返回结果以及距离。我们可以从Milvus提供的许多不同的相似性度量中选择。相似性度量(也称为距离度量或相似性度量)是一个数学函数,用于量化两个嵌入之间的相似度或差异度。

我们选择欧几里得距离(L2),因为它提供了向量之间相似度的直观和直接度量。然而,请记住,我们正在进行基于先前插入的时间序列嵌入的相似性搜索,并且我们的数据是随机的。因此,我们可以预计我们的嵌入结果将包括一个与距离为0相同的嵌入。我们也可以预计其余的搜索结果将返回不同的嵌入,因为我们插入了随机数据。

# Prepare for search

vectors_to_search = [data_to_insert[1]["vector_field"]]  # Using the second vector for search
search_params = {
    "metric_type": "L2",
    "params": {"L2": 10},
}

result = collection.search(vectors_to_search, "vector_field", search_params, limit=3, output_fields=["vector_field", "anomaly_type_field"])

ids = []
# Display search results
for hits in result:
    for hit in hits:
        ids.append(hit.id)
        print(f"ID: {hit.id}, Distance: {hit.distance}")

``

我们得到了以下结果,这证实了我们的假设:


注意向量1和搜索向量是如何重叠的,因为具有最小差异的向量将是相同的向量 data_to_insert[1]。我们还执行了MAE(平均绝对误差)来量化相似度。请注意,在这个上下文中,MAE没有太多意义,因为我们生成了随机数据。在现实世界的例子中,你会比较表示事件并遵循一般模式或行为的时序数据。你也会在更多数据中搜索。

# Calculate MAE
mae = np.mean(np.abs(np.array(vectors_to_search[0]) - np.array(data_to_plot[1]["Vectors"])))
print(f"MAE: {mae}")
# A MAE of 0.3 indicates moderate to high dissimilarity which is expected when comparing random data. 
MAE: 0.3145208857923114

最后思考

希望这篇教程可以作为结合时序数据库(如InfluxDB)和Milvus以识别时序数据中相似模式的模板。

在此处开始使用 InfluxDB Cloud 3.0。如果您需要帮助,请通过我们的 社区网站Slack频道 联系。