如何开始使用 JavaScript 和 InfluxDB
作者:社区 / 产品, 用例, 开发者
2021 年 11 月 03 日
导航至
本文由 Nicolas Bohorquez 撰写,最初发表于 The New Stack。向下滚动查看作者的照片和简介。
Telegraf 是为 InfluxDB 收集数据的首选方式。虽然在某些用例中,客户端库更好,例如在解析服务器端事件流时。在本教程中,您将学习如何读取数据流,将其作为时间序列存储到 InfluxDB 中,并使用 InfluxDB 的 JavaScript 客户端库对数据运行查询。
本教程中的所有代码均可在 GitHub 上的此存储库中免费获得: GitHub。
什么是时间序列数据库?
时间序列数据库 是一种专门类型的数据存储,专注于提供工具来存储和查询具有时间单位维度的量度数据。 好的例子包括每小时特定分钟的温度、股票市场中股票的价格或高峰时段某个区域的汽车数量。
有许多基于时间的 数据集示例。并非所有类型的数据都适合时间序列数据库。一个例子是用于训练机器学习领域分类问题的 Iris 数据集。另一个例子是用于预测的 Titanic 数据集。
时间序列数据库通常用于分析应用程序日志和收集传感器数据。应用程序和传感器不断生成数据流,其属性与基于时间的维度不同。在本教程中,数据源是 Wikimedia 基金会提供的近期更改的 事件流。此数据遵循 Server-Sent Event,可以直接通过 HTTP 消耗。
示例 Node.js CLI 应用程序由两个组件组成。第一个组件消耗流消息并将它们作为数据点写入 InfluxDB 数据库。第二个组件是一个读取器,它查询数据库以将查询结果呈现为简单的折线图。时间在 x 轴上呈现,序列的值在 y 轴上呈现。
教程先决条件
本示例使用 Ubuntu 20.04 和 Node.js v14.17.3 (npm v6.14.13) 进行测试 – 使用 Node Version Manager (NVM) 安装。由于 Node.js 有许多版本,NVM 帮助轻松管理和测试代码。此示例还从本地 InfluxDB 2.0 数据库写入和读取数据。
如果您没有本地安装,可以按照安装指南进行操作,然后创建一个示例组织、存储桶 和 令牌。
创建令牌后,使用本地安装的值设置以下环境变量的值
- INFLUX_ORG
- INFLUX_BUCKET
- INFLUX_TOKEN
- INFLUX_URL
这些变量在 env.js 文件中读取,并带有一些默认值。
安装库
InfluxDB JavaScript 客户端是一个标准的 Node.js 模块,您可以从命令行安装它
npm install @influxdata/influxdb-client
npm install @influxdata/influxdb-client-apis
它也可以作为浏览器上的依赖项使用,只需一行代码
<script type="module">
// import latest release from npm repository
import {InfluxDB} from 'https://unpkg.com/@influxdata/influxdb-client/dist/index.browser.mjs'
<script>
对于本示例,除了 InfluxDB JavaScript 客户端之外,还使用了一些其他库用于渲染目的
- asciichart:纯 JavaScript 中的控制台 ASCII 折线图
- blessed:类似于 curses 的库,具有高级终端界面 API(“curses”是类 Unix 系统的终端控制库)
- chalk:正确的终端字符串样式
- eventsource:EventSource 客户端的纯 JavaScript 实现
建立连接
安装依赖项并设置环境变量后,您可以连接到存储桶。 对于本示例,writer.js 和 reader.js 使用从环境中读取的 url 和令牌实例化 InfluxDB 对象:new InfluxDB({url, token})
此对象提供实例化不同 API 客户端(例如 Writer 或 Query)的方法。插入数据
要在 InfluxDB 中插入数据,您必须遵循行协议。 这定义了构成数据点的四个元素的结构- Measurement:这是您要插入数据的表的名称
- 标签集:以逗号分隔的数据属性键=值集
- 字段集:以逗号分隔的数据维度键=值集,数据可以是浮点型(默认)、整型、无符号整型、字符串、布尔型
- 时间戳(可选):Unix 时间戳
#!/usr/bin/env node
const {InfluxDB, Point, HttpError} = require('@influxdata/influxdb-client');
const {url, token, org, bucket, sseUrl} = require('./env');
var EventSource = require('eventsource');
const {hostname} = require('os');
//Creates a writer with "seconds" as precision
const writeApi = new InfluxDB({url, token}).getWriteApi(org, bucket, 's');
//Sets the common tags for this writer
writeApi.useDefaultTags({location: hostname(), source:'wikimedia', sseUrl:sseUrl });
console.log(`Connecting to EventStreams at ${sseUrl}`);
var eventSource = new EventSource(sseUrl);
eventSource.onmessage = function(event) {
// event.data will be a JSON string containing the message event.
const d = JSON.parse(event.data);
if( d.length != undefined ){
const dataPoint = new Point('edition')
.tag( 'user',d.user )
.tag( 'isBot', d.bot )
.floatField('value', d.length.new)
writeApi.writePoint( dataPoint );
console.log( dataPoint );
writeApi
.flush().then(() => {})
.catch(e => {
console.log('\nFinished ERROR: ' + e);
});
}
};
请注意,在第 9-11 行中,writeApi
使用 organization
和 bucket
环境变量实例化,但也使用精度值。 在本例中,它是 seconds
,它定义了要写入的数据的时间戳维度粒度; 在此处查看其他可能的值。 此外,您可以为要写入的每个数据点设置一些默认标签——在本例中是数据的来源。 另请注意,客户端允许您使用其他值扩展此默认标签集; 对于每个数据点,都会添加 user
和 isBot
标签。 写入的字段是所做更改的长度(第 23 行),writePoint
方法接受 Point 实例,然后刷新 writeApi 实例。 您应始终关闭 writeApi,以便刷新挂起的更改并关闭挂起的计划重试执行。 writer.js 为事件源推送的每个消息无限期运行。查询数据
一旦数据进入存储桶,InfluxDB JavaScript 客户端就会提供另一个 API 客户端来查询数据。 在本示例中,InfluxDB
对象运行一个查询,该查询返回过去 10 秒内按 isBot
标签分组的数据点数const query = '\
from(bucket:"js-sample")\
|> range(start: -10s)\
|> filter(fn:(r) => r._measurement == "edition")\
|> group(columns: ["isBot"])\
|> count()\
';
该查询是用 Flux 函数式数据脚本语言编写的。 这是为查询、分析和处理 InfluxDB 2.0 上的数据而设计的。 先前的语言,即 InfluxDB SQL-like 查询语言,仍在 API 的 /query 兼容性端点上受支持,但建议使用新语言的全部功能。 如您所见,查询定义了存储桶、数据范围、要应用的过滤器、分组列以及要应用的数据函数。 以下函数向您展示了如何针对 QueryApi
运行查询function queryExample(fluxQuery) {
const queryApi = new InfluxDB({url, token}).getQueryApi(org)
queryApi.queryRows(fluxQuery, {
next(row, tableMeta) {
const o = tableMeta.toObject(row);
pushRow(o);
render();
},
complete() {
console.log('FINISHED')
},
error(error) {
console.log('QUERY FAILED', error)
},
});
}
对于返回的每一行,数据都会被解析为一个对象,并传递给一个函数,该函数根据 isBot
属性将其存储在数组中。 然后使用 asciichart 库渲染序列的两个数组,bots 和 humans。 读取器的完整代码如下所示const {InfluxDB} = require('@influxdata/influxdb-client');
const {url, token, org, bucket, chart, screen} = require('./env');
const chalk = require('chalk');
var asciichart = require ('asciichart');
const maxLength = 100;
const bots = [];
const humans = [];
var config = {
height: 18, // any height you want
colors: [
asciichart.blue,
asciichart.red,
asciichart.default, // default color
undefined, // equivalent to default
]
}
const query = `\
from(bucket:"${bucket}")\
|> range(start: -10s)\
|> filter(fn:(r) => r._measurement == "edition")\
|> group(columns: ["isBot"])\
|> count()\
`;
demo();
async function demo() {
for (let index = 0; index < 300; index++) {
queryExample( query );
await sleep(500);
}
}
function queryExample(fluxQuery) {
const queryApi = new InfluxDB({url, token}).getQueryApi(org)
queryApi.queryRows(fluxQuery, {
next(row, tableMeta) {
const o = tableMeta.toObject(row);
pushRow(o);
render();
}, complete() {}
, error(error) {
console.log('QUERY FAILED', error)
}
});
}
function pushRow(row) {
if (bots.length >= maxLength) {
bots.shift ();
}
if (humans.length >= maxLength) {
humans.shift ();
}
row.isBot == 'true' ? bots.push( row['_value']) : humans.push( row['_value']);
}
function render(){
if(bots.length != 0 && humans.length != 0) {
const plt = asciichart.plot ([bots,humans], config).split ('\n');
chart.setLine (0, chalk.blue('Bots: '+bots[bots.length-1]) + ' ' + chalk.red('Humans: '+humans[humans.length-1]));
plt.forEach ((line, i) => {
chart.setLine (i + 1, line);
});
}
screen.render();
}
function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
附加文档和功能
InfluxDB JavaScript 客户端还提供了附加功能(如 HealthAPI 以及其他包装类),这些功能简化了构建生产就绪管道的过程。 有关更多详细信息,请查看客户端和 InfluxDB API 文档。结论
像 InfluxDB 这样的时间序列数据库提供了专门的功能和工具,用于存储和分析具有高摄取率的数据点,例如物联网 (IoT) 数据。 此示例充当有用的镜头,用于了解 InfluxDB JavaScript 客户端收集高容量流数据的功能。 该客户端是一个简单、标准的 Node.js 模块,可让您轻松地将数据写入或从 InfluxDB 实例读取数据。关于作者
