C# 和 InfluxDB 入门

导航到

本文由 James Hickey 撰写。向下滚动以查看本文之后的完整简介和图片。

时序数据库(TSDB)可以改变您处理实时数据或物联网应用中的数据流的方式。在本教程中,您将学习如何在 C# 应用程序中设置一个。

关系数据库有其位置。它们擅长数据规范化、避免重复、对特定数据点(如列)进行索引以及处理模式的原子更改。

但是,当涉及到存储、操作和针对基于时间戳或基于时间查询的大型数据集进行查询时,TSDB 闪耀——其中 InfluxDB 是最受欢迎的。

您需要一个 TSDB 来消费大量基于时间的数据。这包括传感器数据、服务器指标、应用程序事件、网络数据、通用物联网数据,或者仅由您的业务领域生成的时间数据。TSDB 不仅优化了高效写入大量时间戳数据,还可以以毫秒级运行复杂的基于时间的查询。

在本篇文章中,您将学习如何开始使用 InfluxDB 和 C#。您将学习如何配置 PC 上的 InfluxDB,将其连接到 .NET 应用程序,并将一些数据写入/读取到 InfluxDB。

要首先查看所有代码,请查看来自 此 GitHub 存储库 的示例代码。

让我们开始吧!

使用 Docker 安装 InfluxDB

在您继续之前,请确保您已安装 Docker

接下来,从终端执行以下 Docker 命令

docker pull influxdb:2.0.7

注意:2.0.7 版本是目前 InfluxDB 的最新版本。请参阅 InfluxDB 文档 了解可用的最新 docker pull 命令。

使用此命令运行一个新的 InfluxDB 运行实例

docker run --name influxdb -p 8086:8086 influxdb:2.0.7

接下来,在您的本地计算机上导航到 https://127.0.0.1:8086/。点击 开始使用 并填写以下值

  • 用户:admin
  • 密码:adminadmin
  • 组织:organization
  • 桶:bucket

接下来,导航到 InfluxDB 登录页面 https://127.0.0.1:8086/。

通过点击 数据,然后 创建一个新桶。将其命名为 test-bucket。这将创建一个空的桶以写入您的数据。

InfluxDB create bucket

您需要捕获您的用户身份验证令牌。登录后,点击 数据,然后 令牌,并选择 admin 令牌

InfluxDB load data tokens

复制显示的令牌——您很快就会需要它。

创建一个.NET控制台应用程序

您将创建一个与InfluxDB交互的MVC .NET Web应用程序。让我们通过在您的终端中运行以下命令来创建它

dotnet new mvc -o app

注意:如果您还没有安装.NET CLI,您可以在此处下载最新的.NET SDK:https://dotnet.microsoft.com/download

接下来,进入MVC应用程序文件夹中的app目录。然后,在终端中执行以下命令来安装InfluxDB C# SDK

dotnet add package InfluxDB.Client

您还需要Coravel包来轻松地安排后台作业https://github.com/jamesmh/coravel

dotnet add package Coravel

构建InfluxDB应用程序服务

打开appsettings.json,并将其替换为以下内容

{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft": "Warning",
      "Microsoft.Hosting.Lifetime": "Information"
    }
  },
  "AllowedHosts": "*",

  "InfluxDB": {
    "Token": "Your_Token_Goes_Here"
  }
}

接下来,创建一个新的文件 —— Services/InfluxDBService.cs —— 并将内容替换为以下

using System;
using System.Threading.Tasks;
using InfluxDB.Client;
using Microsoft.Extensions.Configuration;

namespace app.Services
{
    public class InfluxDBService
    {
        private readonly string _token;

        public InfluxDBService(IConfiguration configuration)
        {
            _token = configuration.GetValue<string>("InfluxDB:Token");
        }

        public void Write(Action<WriteApi> action)
        {
            using var client = InfluxDBClientFactory.Create("https://127.0.0.1:8086", _token);
            using var write = client.GetWriteApi();
            action(write);
        }

        public async Task<T> QueryAsync<T>(Func<QueryApi, Task<T>> action)
        {
            using var client = InfluxDBClientFactory.Create("https://127.0.0.1:8086", _token);
            var query = client.GetQueryApi();
            return await action(query);
        }
    }
}

使用通用的.NET和C#约定,这个包装类将允许您以可靠的方式使用InfluxDB C#客户端;当不再需要时,它将被正确释放。

您的token通过IConfiguration服务从appsettings.json文件中获取。有两个便利方法:一个用于写入,一个用于从InfluxDB读取。

Startup.cs中的ConfigureServices()方法中,添加以下行

services.AddSingleton<InfluxDBService>();

您还必须在文件的顶部添加命名空间引用

using app.Services;

现在,您的服务将从.NET内置的依赖注入提供程序中可用。

向InfluxDB写入

现在,让我们模拟一架飞机每五秒钟向中央位置发送其当前高度。要开始,您将使用Coravel的任务调度器

再次打开Startup.cs,并在文件的顶部添加以下引用

using Coravel;
using app.Invocables; // You'll create this namespace soon :)

接下来,在ConfigureServices方法中添加以下行

// You'll create this class soon :)
services.AddTransient<WriteRandomPlaneAltitudeInvocable>();
services.AddScheduler();

然后,在Configure方法的末尾添加以下行

app.ApplicationServices.UseScheduler(scheduler =>
{
    scheduler
        .Schedule<WriteRandomPlaneAltitudeInvocable>()
        .EveryFiveSeconds();
});

现在,创建一个名为Invocables/WriteRandomPlaneAltitudeInvocable.cs的文件。替换其内容为以下

using System;
using System.Threading.Tasks;
using app.Services;
using Coravel.Invocable;
using InfluxDB.Client.Api.Domain;
using InfluxDB.Client.Writes;

namespace app.Invocables
{
    public class WriteRandomPlaneAltitudeInvocable : IInvocable
    {
        private readonly InfluxDBService _service;
        private static readonly Random _random = new Random();

        public WriteRandomPlaneAltitudeInvocable(InfluxDBService service)
        {
            _service = service;
        }

        public Task Invoke()
        {
            _service.Write(write =>
            {
                var point = PointData.Measurement("altitude")
                    .Tag("plane", "test-plane")
                    .Field("value", _random.Next(1000, 5000))
                    .Timestamp(DateTime.UtcNow, WritePrecision.Ns);

                write.WritePoint("test-bucket", "organization", point);
            });

            return Task.CompletedTask;
        }
    }
}

此类被安排每五秒钟执行一次。同样,它模拟一架飞机将其当前高度发送回中央位置以进行数据存储和报告。

从InfluxDB读取

当您运行应用程序时,它将每五秒钟写入一个随机高度,您需要看到这些值。

首先,让我们创建一个模型,控制器将将其传递到其视图中。创建文件Models/AltitudeModel.cs,并将其替换为以下

namespace app.Models
{
    public class AltitudeModel
    {
        public string Time { get; init; }
        public int Altitude { get; init; }
        public string DisplayText => $"Plane was at altitude {Altitude} ft. at {Time}.";
    }
}

接下来,打开Controllers/HomeController.cs,并将Index方法替换为以下

public async Task<IActionResult> Index([FromServices] InfluxDBService service)
{
    var results = await service.QueryAsync(async query =>
    {
        var flux = "from(bucket:\"test-bucket\") |> range(start: 0)";
        var tables = await query.QueryAsync(flux, "organization");
        return tables.SelectMany(table =>
            table.Records.Select(record =>
                new AltitudeModel
                {
                    Time = record.GetTime().ToString(),
                    Altitude = int.Parse(record.GetValue().ToString())
                }));
    });

    return View(results);
}

别忘了在文件的顶部添加所需的引用

using app.Models;
using app.Services;

接下来,打开Views/Home/Index.cshtml,并将其替换为以下

@model IEnumerable<app.Models.AltitudeModel>

@{
    ViewData["Title"] = "Home Page";
}

<ul>
    @foreach (var record in Model)
    {
        <li>@record.DisplayText</li>
    }
</ul>

现在,通过在终端中执行dotnet run来运行应用程序。每次刷新页面后,您应该会看到一个新的值被写入。

application results screenshot

使用Flux来过滤查询

为了演示如何轻松地执行更复杂的数据查询,再次打开Controllers/HomeController.cs。在Index方法中,将您已有的Flux查询替换为以下行

var flux = "from(bucket:\"test-bucket\") " +
           "|> range(start: 0) " +
           "|> filter(fn: (r) => " +
           "r._measurement == \"altitude\" and " +
           "r._value > 3500)";

再次运行应用程序。您将看到只显示高度超过3500英尺的数据。

查看Flux文档以执行更高级的查询,如计算总和、分组数据、获取平均值等。

结论

在这篇文章中,您学习了如何在本地机器上安装和配置InfluxDB,创建一个新的.NET Web应用程序,并读写InfluxDB。您还学习了如何从定期发送其当前状态以存储在特定时间点的数据源收集和查看数据。

通过使用InfluxDB的Flux查询语言或类似SQL的InfluxQL查询语言,您可以使用C#客户端(C#客户端)执行更复杂的查询来处理数据。

下次您收集需要根据时间戳进行查询和聚合的数据时,请考虑使用InfluxDB以及可用的各种编程语言SDK

关于作者

james hickey

詹姆斯是一位微软MVP,在金融科技和保险行业拥有构建Web和移动应用程序的背景。他是《重构TypeScript》一书的作者,也是名为Coravel的.NET开源工具的创造者。他住在加拿大,距离世界上最高的潮汐仅十分钟路程。