How to Get Started with JavaScript and InfluxDB
By
Community /
Product, Use Cases, Developer
Nov 03, 2021
Navigate to:
This article was written by Nicolas Bohorquez and was originally published in The New Stack. Scroll below for the author’s picture and bio.
Telegraf is the preferred way to collect data for InfluxDB. Though in some use cases, client libraries are better, such as when parsing a stream of server-side events. In this tutorial, you’ll learn how to read a data stream, store it as a time series into InfluxDB and run queries over the data using InfluxDB’s JavaScript client library.
All the code in this tutorial is available for free in this repo on GitHub.
What is a time series database?
A time series database is a specialized type of data store focused on providing tools to store and query data that has a dimension measured as a time unit. Good examples include the temperature at a particular minute of every hour, the price of a stock in the stock market or the number of cars in one area during rush hour.
There are many examples of time-based datasets. Not all types of data are suitable for time series databases. One example is the Iris dataset used for training classification problems in the machine learning space. Nor is the Titanic dataset, which is used in forecasting.
Time series databases are often used to analyze application logs and collect sensor data. Applications and sensors produce streams of data constantly, with attributes different from the time-based dimension. In the case of this tutorial, the data source is the event stream of the recent changes provided by the Wikimedia Foundation. This data follows the Server-Sent Event and can be consumed directly via HTTP.
The sample Node.js CLI application consists of two components. The first consumes stream messages and writes them as data points in the InfluxDB database. Second, there’s a reader that queries the database to render the results of the query as a simple line chart. Time is rendered in the x-axis, and the value of the series in the y-axis.
Tutorial prerequisites
This example is tested using Ubuntu 20.04 and Node.js v14.17.3 (npm v6.14.13) – installed using the Node Version Manager (NVM). With the many versions of Node.js available, NVM helps to manage and test the code easily. This example also writes and reads data from a local InfluxDB 2.0 database.
If you don’t have a local installation, you can follow the installation guide, then create a sample organization, bucket and token.
Once you have the token created, set the values of the following environment variables with the values of your local installation:
- INFLUX_ORG
- INFLUX_BUCKET
- INFLUX_TOKEN
- INFLUX_URL
Those variables are read in the env.js file, with some default values.
Installing the Library
The InfluxDB JavaScript client is a standard Node.js module that you can install from the command line:
npm install @influxdata/influxdb-client
npm install @influxdata/influxdb-client-apis
It can also be used as a dependency on browser with a line of code:
<script type="module">
// import latest release from npm repository
import {InfluxDB} from 'https://unpkg.com/@influxdata/influxdb-client/dist/index.browser.mjs'
<script>
For this example, some other libraries are used for rendering purposes, besides the InfluxDB JavaScript client:
- asciichart: Console ASCII line charts in pure JavaScript
- blessed: A curses-like library with a high-level terminal interface API ("curses" is a terminal control library for Unix-like systems)
- chalk: Terminal string styling done right
- eventsource: A pure JavaScript implementation of an EventSource client
Making a connection
Once dependencies are installed and environment variables are set, you can make a connection to the bucket. For this example, the writer.js and reader.js instantiates an InfluxDB object with the url and token read from the environment:new InfluxDB({url, token})
This object provides methods to instantiate the different API clients, such as Writer or Query.
Inserting data
To insert data in InfluxDB, you have to follow the line protocol. This defines a structure for the four elements that constitute a data point:- Measurement: This is the name of the table in which you are going to insert the data
- Tag set: A comma-separated set of key = value of data properties
- Field set: A comma-separated set of key = value of data dimensions, the data can be float (default), integer, uinterger, string, Boolean
- Time stamp (optional): Unix time stamp
#!/usr/bin/env node
const {InfluxDB, Point, HttpError} = require('@influxdata/influxdb-client');
const {url, token, org, bucket, sseUrl} = require('./env');
var EventSource = require('eventsource');
const {hostname} = require('os');
//Creates a writer with "seconds" as precision
const writeApi = new InfluxDB({url, token}).getWriteApi(org, bucket, 's');
//Sets the common tags for this writer
writeApi.useDefaultTags({location: hostname(), source:'wikimedia', sseUrl:sseUrl });
console.log(`Connecting to EventStreams at ${sseUrl}`);
var eventSource = new EventSource(sseUrl);
eventSource.onmessage = function(event) {
// event.data will be a JSON string containing the message event.
const d = JSON.parse(event.data);
if( d.length != undefined ){
const dataPoint = new Point('edition')
.tag( 'user',d.user )
.tag( 'isBot', d.bot )
.floatField('value', d.length.new)
writeApi.writePoint( dataPoint );
console.log( dataPoint );
writeApi
.flush().then(() => {})
.catch(e => {
console.log('\nFinished ERROR: ' + e);
});
}
};
Notice that in lines 9–11 the writeApi
is instantiated with the organization
and bucket
environment variables, but also with a precision value. In this case it's seconds
, which defines the granularity of the time stamp dimension of the data to be written; check the other possible values here.
Also, you can set some default tags for each data point to be written the source of the data in this case. Also notice that the client gives you the option to extend this default tag set with other values; for each data point the user
and isBot
tags are added.
The field written is the length of the change made (line 23), the writePoint
method accepts the Point instance, and then the writeApi instance is flushed. You should always close the writeApi in order to flush pending changes and close pending scheduled retry executions. The writer.js runs indefinitely for each message pushed by the event source.
Querying data
Once the data is in the bucket, the InfluxDB JavaScript client provides another API client to query the data. In this example, theInfluxDB
object runs a query that returns the number of data points grouped by the isBot
tag in the last 10 seconds:
const query = '\
from(bucket:"js-sample")\
|> range(start: -10s)\
|> filter(fn:(r) => r._measurement == "edition")\
|> group(columns: ["isBot"])\
|> count()\
';
The query is written in the Flux functional data scripting language. This is designed for querying, analyzing and acting on data over InfluxDB 2.0. The previous language, the InfluxDB SQL-like query language, is still supported at the /query compatibility endpoint of the API, but the recommendation is to use the full power of the new language.
As you can see, the query defines the bucket, range of data, filters to be applied, grouping columns and data functions to be applied. The following function shows you how to run the query against the QueryApi
:
function queryExample(fluxQuery) {
const queryApi = new InfluxDB({url, token}).getQueryApi(org)
queryApi.queryRows(fluxQuery, {
next(row, tableMeta) {
const o = tableMeta.toObject(row);
pushRow(o);
render();
},
complete() {
console.log('FINISHED')
},
error(error) {
console.log('QUERY FAILED', error)
},
});
}
For each row returned, the data is parsed into an object and passed to a function that stores it in an array based on the isBot
attribute. The two arrays of the series, bots and humans, are then rendered using the asciichart library. The full code for the reader is shown below:
const {InfluxDB} = require('@influxdata/influxdb-client');
const {url, token, org, bucket, chart, screen} = require('./env');
const chalk = require('chalk');
var asciichart = require ('asciichart');
const maxLength = 100;
const bots = [];
const humans = [];
var config = {
height: 18, // any height you want
colors: [
asciichart.blue,
asciichart.red,
asciichart.default, // default color
undefined, // equivalent to default
]
}
const query = `\
from(bucket:"${bucket}")\
|> range(start: -10s)\
|> filter(fn:(r) => r._measurement == "edition")\
|> group(columns: ["isBot"])\
|> count()\
`;
demo();
async function demo() {
for (let index = 0; index < 300; index++) {
queryExample( query );
await sleep(500);
}
}
function queryExample(fluxQuery) {
const queryApi = new InfluxDB({url, token}).getQueryApi(org)
queryApi.queryRows(fluxQuery, {
next(row, tableMeta) {
const o = tableMeta.toObject(row);
pushRow(o);
render();
}, complete() {}
, error(error) {
console.log('QUERY FAILED', error)
}
});
}
function pushRow(row) {
if (bots.length >= maxLength) {
bots.shift ();
}
if (humans.length >= maxLength) {
humans.shift ();
}
row.isBot == 'true' ? bots.push( row['_value']) : humans.push( row['_value']);
}
function render(){
if(bots.length != 0 && humans.length != 0) {
const plt = asciichart.plot ([bots,humans], config).split ('\n');
chart.setLine (0, chalk.blue('Bots: '+bots[bots.length-1]) + ' ' + chalk.red('Humans: '+humans[humans.length-1]));
plt.forEach ((line, i) => {
chart.setLine (i + 1, line);
});
}
screen.render();
}
function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}