如何开始使用 JavaScript 和 InfluxDB

导航至

本文由 Nicolas Bohorquez 撰写,最初发表于 The New Stack。向下滚动查看作者的照片和简介。

Telegraf 是为 InfluxDB 收集数据的首选方式。虽然在某些用例中,客户端库更好,例如在解析服务器端事件流时。在本教程中,您将学习如何读取数据流,将其作为时间序列存储到 InfluxDB 中,并使用 InfluxDB 的 JavaScript 客户端库对数据运行查询。

本教程中的所有代码均可在 GitHub 上的此存储库中免费获得: GitHub

什么是时间序列数据库?

时间序列数据库 是一种专门类型的数据存储,专注于提供工具来存储和查询具有时间单位维度的量度数据。 好的例子包括每小时特定分钟的温度、股票市场中股票的价格或高峰时段某个区域的汽车数量。

有许多基于时间的 数据集示例。并非所有类型的数据都适合时间序列数据库。一个例子是用于训练机器学习领域分类问题的 Iris 数据集。另一个例子是用于预测的 Titanic 数据集。

时间序列数据库通常用于分析应用程序日志和收集传感器数据。应用程序和传感器不断生成数据流,其属性与基于时间的维度不同。在本教程中,数据源是 Wikimedia 基金会提供的近期更改的 事件流。此数据遵循 Server-Sent Event,可以直接通过 HTTP 消耗。

示例 Node.js CLI 应用程序由两个组件组成。第一个组件消耗流消息并将它们作为数据点写入 InfluxDB 数据库。第二个组件是一个读取器,它查询数据库以将查询结果呈现为简单的折线图。时间在 x 轴上呈现,序列的值在 y 轴上呈现。

Node.js CLI application InfluxDB - Time is rendered in the x-axis, and the value of the series in the y-axis.

教程先决条件

本示例使用 Ubuntu 20.04 和 Node.js v14.17.3 (npm v6.14.13) 进行测试 – 使用 Node Version Manager (NVM) 安装。由于 Node.js 有许多版本,NVM 帮助轻松管理和测试代码。此示例还从本地 InfluxDB 2.0 数据库写入和读取数据。

如果您没有本地安装,可以按照安装指南进行操作,然后创建一个示例组织存储桶 和 令牌

创建令牌后,使用本地安装的值设置以下环境变量的值

  • INFLUX_ORG
  • INFLUX_BUCKET
  • INFLUX_TOKEN
  • INFLUX_URL

这些变量在 env.js 文件中读取,并带有一些默认值。

Variables read in the env.js file, with some default values

安装库

InfluxDB JavaScript 客户端是一个标准的 Node.js 模块,您可以从命令行安装它

npm install @influxdata/influxdb-client
npm install @influxdata/influxdb-client-apis

它也可以作为浏览器上的依赖项使用,只需一行代码

<script type="module">
    // import latest release from npm repository 
    import {InfluxDB} from 'https://unpkg.com/@influxdata/influxdb-client/dist/index.browser.mjs'
<script>

对于本示例,除了 InfluxDB JavaScript 客户端之外,还使用了一些其他库用于渲染目的

  • asciichart:纯 JavaScript 中的控制台 ASCII 折线图
  • blessed:类似于 curses 的库,具有高级终端界面 API(“curses”是类 Unix 系统的终端控制库)
  • chalk:正确的终端字符串样式
  • eventsource:EventSource 客户端的纯 JavaScript 实现
这些都不是使用 InfluxDB JavaScript 客户端所必需的。

建立连接

安装依赖项并设置环境变量后,您可以连接到存储桶。 对于本示例,writer.js 和 reader.js 使用从环境中读取的 url 和令牌实例化 InfluxDB 对象:new InfluxDB({url, token}) 此对象提供实例化不同 API 客户端(例如 Writer 或 Query)的方法。

插入数据

要在 InfluxDB 中插入数据,您必须遵循行协议。 这定义了构成数据点的四个元素的结构
  • Measurement:这是您要插入数据的表的名称
  • 标签集:以逗号分隔的数据属性键=值集
  • 字段集:以逗号分隔的数据维度键=值集,数据可以是浮点型(默认)、整型、无符号整型、字符串、布尔型
  • 时间戳(可选):Unix 时间戳
幸运的是,InfluxDB JavaScript 客户端提供了一个包装类,与 Writer API 一起使用,用于插入一个或多个数据点。 以下代码显示了如何连接到事件源,以及如何为每个消息实例化一个要添加到存储桶的点
#!/usr/bin/env node

const {InfluxDB, Point, HttpError} = require('@influxdata/influxdb-client');
const {url, token, org, bucket, sseUrl} = require('./env');
var EventSource = require('eventsource');
const {hostname} = require('os');

//Creates a writer with "seconds" as precision
const writeApi = new InfluxDB({url, token}).getWriteApi(org, bucket, 's');
//Sets the common tags for this writer
writeApi.useDefaultTags({location: hostname(), source:'wikimedia', sseUrl:sseUrl });

console.log(`Connecting to EventStreams at ${sseUrl}`);
var eventSource = new EventSource(sseUrl);

eventSource.onmessage = function(event) {
    // event.data will be a JSON string containing the message event.
    const d = JSON.parse(event.data);
    if( d.length != undefined ){
        const dataPoint = new Point('edition')
            .tag( 'user',d.user )
            .tag( 'isBot', d.bot )
            .floatField('value', d.length.new)
        writeApi.writePoint( dataPoint );
        console.log( dataPoint );
        writeApi
        .flush().then(() => {})
        .catch(e => {
            console.log('\nFinished ERROR: ' + e);
        });
    }
};
请注意,在第 9-11 行中,writeApi 使用 organizationbucket 环境变量实例化,但也使用精度值。 在本例中,它是 seconds,它定义了要写入的数据的时间戳维度粒度; 在此处查看其他可能的值。 此外,您可以为要写入的每个数据点设置一些默认标签——在本例中是数据的来源。 另请注意,客户端允许您使用其他值扩展此默认标签集; 对于每个数据点,都会添加 userisBot 标签。 写入的字段是所做更改的长度(第 23 行),writePoint 方法接受 Point 实例,然后刷新 writeApi 实例。 您应始终关闭 writeApi,以便刷新挂起的更改并关闭挂起的计划重试执行。 writer.js 为事件源推送的每个消息无限期运行。

查询数据

一旦数据进入存储桶,InfluxDB JavaScript 客户端就会提供另一个 API 客户端来查询数据。 在本示例中,InfluxDB 对象运行一个查询,该查询返回过去 10 秒内按 isBot 标签分组的数据点数
const query = '\
from(bucket:"js-sample")\
|> range(start: -10s)\
|> filter(fn:(r) => r._measurement == "edition")\
|> group(columns: ["isBot"])\
|> count()\
';
该查询是用 Flux 函数式数据脚本语言编写的。 这是为查询、分析和处理 InfluxDB 2.0 上的数据而设计的。 先前的语言,即 InfluxDB SQL-like 查询语言,仍在 API 的 /query 兼容性端点上受支持,但建议使用新语言的全部功能。 如您所见,查询定义了存储桶、数据范围、要应用的过滤器、分组列以及要应用的数据函数。 以下函数向您展示了如何针对 QueryApi 运行查询
function queryExample(fluxQuery) {
    const queryApi = new InfluxDB({url, token}).getQueryApi(org)
    queryApi.queryRows(fluxQuery, {
        next(row, tableMeta) {
            const o = tableMeta.toObject(row);
            pushRow(o);
            render();
        },
        complete() { 
            console.log('FINISHED')
        },
        error(error) {
            console.log('QUERY FAILED', error)
        },
    });
}
对于返回的每一行,数据都会被解析为一个对象,并传递给一个函数,该函数根据 isBot 属性将其存储在数组中。 然后使用 asciichart 库渲染序列的两个数组,bots 和 humans。 读取器的完整代码如下所示
const {InfluxDB} = require('@influxdata/influxdb-client');
const {url, token, org, bucket, chart, screen} = require('./env');
const chalk = require('chalk');
var asciichart = require ('asciichart');
const maxLength = 100;
const bots = [];
const humans = [];
var config = {
    height:  18,         // any height you want
    colors: [
        asciichart.blue,
        asciichart.red,
        asciichart.default, // default color
        undefined, // equivalent to default
    ]
}

const query = `\
from(bucket:"${bucket}")\
|> range(start: -10s)\
|> filter(fn:(r) => r._measurement == "edition")\
|> group(columns: ["isBot"])\
|> count()\
`;

demo();

async function demo() {
    for (let index = 0; index < 300; index++) {
        queryExample( query );
        await sleep(500);
    }
}

function queryExample(fluxQuery) {
    const queryApi = new InfluxDB({url, token}).getQueryApi(org)
    queryApi.queryRows(fluxQuery, {
        next(row, tableMeta) {
            const o = tableMeta.toObject(row);
            pushRow(o);
            render();
        }, complete() {}
        , error(error) {
            console.log('QUERY FAILED', error)
        }
    });
}

function pushRow(row) {
    if (bots.length >= maxLength) {
        bots.shift ();
    }
    if (humans.length >= maxLength) {
        humans.shift ();
    }

    row.isBot == 'true' ? bots.push( row['_value']) : humans.push( row['_value']);
}

function render(){
    if(bots.length != 0 && humans.length != 0) {
        const plt = asciichart.plot ([bots,humans], config).split ('\n');
        chart.setLine (0, chalk.blue('Bots: '+bots[bots.length-1]) + ' ' + chalk.red('Humans: '+humans[humans.length-1]));
        plt.forEach ((line, i) => {
            chart.setLine (i + 1, line);
        });
    }
    screen.render();
}

function sleep(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
}

附加文档和功能

InfluxDB JavaScript 客户端还提供了附加功能(如 HealthAPI 以及其他包装类),这些功能简化了构建生产就绪管道的过程。 有关更多详细信息,请查看客户端和 InfluxDB API 文档。

结论

InfluxDB 这样的时间序列数据库提供了专门的功能和工具,用于存储和分析具有高摄取率的数据点,例如物联网 (IoT) 数据。 此示例充当有用的镜头,用于了解 InfluxDB JavaScript 客户端收集高容量流数据的功能。 该客户端是一个简单、标准的 Node.js 模块,可让您轻松地将数据写入或从 InfluxDB 实例读取数据。

关于作者

Nicolas Bohorquez 是 Merqueo 的数据架构师,曾是多家初创公司开发团队的成员,并在美洲创立了三家公司。 他热衷于复杂性建模和使用数据科学来改善世界。