C# 和 InfluxDB 入门

导航至

这篇文章由 James Hickey 撰写。向下滚动查看完整的作者简介和图片。 

时间序列数据库 (TSDB) 可以改变您处理实时数据流或物联网应用的方式。在本教程中,您将学习如何在 C# 应用程序中设置一个。

关系数据库有其用武之地。它们在数据规范化、避免重复、对特定数据点(如列)进行索引以及处理模式的原子更改方面非常出色。

但是,当涉及到存储、操作和查询针对时间戳或基于时间的查询的大型数据集时,TSDB 表现出色 —— 其中 InfluxDB 是最受欢迎的。

您需要 TSDB 来处理大量基于时间的数据。这包括传感器数据、服务器指标、应用程序事件、网络数据、通用物联网数据,或者只是您的业务领域生成的基于时间的数据。TSDB 不仅针对高效写入大量时间戳数据进行了优化,而且还可以在毫秒内运行复杂的、基于时间的查询。

在本文中,您将学习如何开始使用 InfluxDB 和 C#。您将学习如何在您的 PC 上配置 InfluxDB,将其连接到 .NET 应用程序,以及在 InfluxDB 中读取/写入一些数据。

要先查看所有代码,请查看此 GitHub 存储库中的示例代码:https://github.com/jamesmh/influxdb-csharp-client-sample

让我们开始吧!

使用 Docker 安装 InfluxDB

在继续之前,请确保您已安装 Docker

接下来,从终端执行以下 Docker 命令

docker pull influxdb:2.0.7

注意:版本 2.0.7 是当前 InfluxDB 的最新版本。有关最新的 docker pull 命令,请参阅 InfluxDB 的文档

使用此命令运行 InfluxDB 的新运行实例

docker run --name influxdb -p 8086:8086 influxdb:2.0.7

接下来,在您的本地计算机上导航到 http://localhost:8086/。单击“Get Started”并填写以下值

  • 用户:admin
  • 密码:adminadmin
  • 组织:organization
  • 存储桶:bucket

接下来,导航到 InfluxDB 登录页面 http://localhost:8086/。

通过单击“Data”,然后单击“Buckets”来创建一个新的存储桶。单击“Create Bucket”,并将其命名为 test-bucket。这将创建一个空的存储桶来写入您的数据。

InfluxDB create bucket

您需要捕获用户的身份验证令牌。登录后,单击“Data”,然后单击“Tokens”,并选择“admin’s Token”。

InfluxDB load data tokens

复制显示的令牌 —— 您很快就需要它。

创建 .NET 控制台应用程序

您将创建一个将与 InfluxDB 交互的 MVC .NET Web 应用程序。让我们通过在终端中运行此命令来创建它

dotnet new mvc -o app

注意:如果您没有安装 .NET CLI,您可以在此处下载最新的 .NET SDK

接下来,cd app 进入 MVC 应用程序文件夹。然后,通过在终端中执行此命令来安装 InfluxDB C# SDK

dotnet add package InfluxDB.Client

您还需要 Coravel 包来轻松调度后台作业

dotnet add package Coravel

构建 InfluxDB 应用程序服务

打开 appsettings.json 并将其替换为以下内容

{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft": "Warning",
      "Microsoft.Hosting.Lifetime": "Information"
    }
  },
  "AllowedHosts": "*",

  "InfluxDB": {
    "Token": "Your_Token_Goes_Here"
  }
}

接下来,创建一个新文件 —— Services/InfluxDBService.cs —— 并将其内容替换为以下内容

using System;
using System.Threading.Tasks;
using InfluxDB.Client;
using Microsoft.Extensions.Configuration;

namespace app.Services
{
    public class InfluxDBService
    {
        private readonly string _token;

        public InfluxDBService(IConfiguration configuration)
        {
            _token = configuration.GetValue<string>("InfluxDB:Token");
        }

        public void Write(Action<WriteApi> action)
        {
            using var client = InfluxDBClientFactory.Create("http://localhost:8086", _token);
            using var write = client.GetWriteApi();
            action(write);
        }

        public async Task<T> QueryAsync<T>(Func<QueryApi, Task<T>> action)
        {
            using var client = InfluxDBClientFactory.Create("http://localhost:8086", _token);
            var query = client.GetQueryApi();
            return await action(query);
        }
    }
}

使用通用的 .NET 和 C# 约定,这个包装器类将允许您以可靠的方式使用 InfluxDB C# 客户端;当不再需要时,它将被正确地处理掉。

您的令牌由 IConfiguration 服务从 appsettings.json 文件中获取。有两个便捷方法:一个用于写入,一个用于从 InfluxDB 读取。

Startup.csConfigureServices() 方法中,添加以下行

services.AddSingleton<InfluxDBService>();

您还需要将命名空间引用添加到文件顶部

using app.Services;

现在您的服务将可以从 .NET 的内置依赖注入提供程序中使用。

写入 InfluxDB

现在,让我们模拟一架飞机每五秒钟将其当前高度发送回中央位置。要开始,您将使用 Coravel 的任务调度程序

再次打开 Startup.cs ,并将以下引用添加到文件顶部

using Coravel;
using app.Invocables; // You'll create this namespace soon :)

接下来,将以下行添加到 ConfigureServices 方法

// You'll create this class soon :)
services.AddTransient<WriteRandomPlaneAltitudeInvocable>();
services.AddScheduler();

然后,将以下行添加到 Configure 方法的末尾

app.ApplicationServices.UseScheduler(scheduler =>
{
    scheduler
        .Schedule<WriteRandomPlaneAltitudeInvocable>()
        .EveryFiveSeconds();
});

现在,创建一个名为 Invocables/WriteRandomPlaneAltitudeInvocable.cs 的文件。将其内容替换为以下内容

using System;
using System.Threading.Tasks;
using app.Services;
using Coravel.Invocable;
using InfluxDB.Client.Api.Domain;
using InfluxDB.Client.Writes;

namespace app.Invocables
{
    public class WriteRandomPlaneAltitudeInvocable : IInvocable
    {
        private readonly InfluxDBService _service;
        private static readonly Random _random = new Random();

        public WriteRandomPlaneAltitudeInvocable(InfluxDBService service)
        {
            _service = service;
        }

        public Task Invoke()
        {
            _service.Write(write =>
            {
                var point = PointData.Measurement("altitude")
                    .Tag("plane", "test-plane")
                    .Field("value", _random.Next(1000, 5000))
                    .Timestamp(DateTime.UtcNow, WritePrecision.Ns);

                write.WritePoint("test-bucket", "organization", point);
            });

            return Task.CompletedTask;
        }
    }
}

此类计划每五秒钟执行一次。同样,它模拟一架飞机将其当前高度发送回中央位置以进行数据存储和报告。

从 InfluxDB 读取

当您运行应用程序时,它将每五秒钟写入一个随机高度,您需要查看这些值。

首先,让我们创建一个控制器将传递给其视图的模型。创建文件 Models/AltitudeModel.cs,并将其替换为以下内容

namespace app.Models
{
    public class AltitudeModel
    {
        public string Time { get; init; }
        public int Altitude { get; init; }
        public string DisplayText => $"Plane was at altitude {Altitude} ft. at {Time}.";
    }
}

接下来,打开 Controllers/HomeController.cs,并将 Index 方法替换为以下内容

public async Task<IActionResult> Index([FromServices] InfluxDBService service)
{
    var results = await service.QueryAsync(async query =>
    {
        var flux = "from(bucket:\"test-bucket\") |> range(start: 0)";
        var tables = await query.QueryAsync(flux, "organization");
        return tables.SelectMany(table =>
            table.Records.Select(record =>
                new AltitudeModel
                {
                    Time = record.GetTime().ToString(),
                    Altitude = int.Parse(record.GetValue().ToString())
                }));
    });

    return View(results);
}

不要忘记将所需的引用添加到文件顶部

using app.Models;
using app.Services;

接下来,打开 Views/Home/Index.cshtml,并将其替换为以下内容

@model IEnumerable<app.Models.AltitudeModel>

@{
    ViewData["Title"] = "Home Page";
}

<ul>
    @foreach (var record in Model)
    {
        <li>@record.DisplayText</li>
    }
</ul>

现在,通过在终端中执行 dotnet run 来运行应用程序。每隔五秒钟,当您刷新页面时,您应该会看到写入的新值。

application results screenshot

使用 Flux 过滤您的查询

为了演示对您的数据执行更复杂的查询有多么容易,请再次打开 Controllers/HomeController.cs。在 Index 方法中,将您已有的 Flux 查询替换为以下行

var flux = "from(bucket:\"test-bucket\") " +
           "|> range(start: 0) " +
           "|> filter(fn: (r) => " +
           "r._measurement == \"altitude\" and " +
           "r._value > 3500)";

再次运行应用程序。您将看到显示的唯一数据是海拔高于 3500 英尺的数据。

查看 Flux 文档以执行更高级的查询,例如计算总和、分组数据、获取平均值等等。

结论

在本文中,您学习了如何在本地计算机上安装和配置 InfluxDB,创建一个新的 .NET Web 应用程序,以及在 InfluxDB 中读取/写入数据。您还学习了如何从定期发送其当前状态以存储在特定时间点的源收集和查看数据。

通过使用 InfluxDB 的 Flux 查询语言或类似 SQL 的 InfluxQL 查询语言,您可以使用更高级的查询(如 C# 客户端)进行数据处理。

下次您收集需要根据其时间戳进行查询和聚合的数据时,请查看 InfluxDB 和可用的各种编程语言 SDK

关于作者

james hickey

James 是一位 Microsoft MVP,拥有在金融科技和保险行业构建 Web 和移动应用程序的背景。他是《Refactoring TypeScript》的作者,也是名为 Coravel 的 .NET 开源工具的创建者。他住在加拿大,离世界最高潮汐仅十分钟路程。