通过无服务器函数扩展InfluxDB

导航至

数据采集和数据分析是时间序列平台中的阴阳。有许多资源可以帮助您采集数据。典型的采集方式包括基于代理通过CSV导入使用客户端库第三方技术。一旦您的时间序列数据到达,分析将画上圆满的句号,并经常导致额外的数据采集,等等。

InfluxDB的本地工具,如任务、仪表板和Flux,通常在平台内提供所需洞察,但某些用例需要扩展平台。对于这些用例,InfluxDB的统一API提供了一套完整的集成端点。这种可扩展性是开发者喜欢InfluxDB的原因之一。

为了说明这一点,我提供了两个简单的示例,它们是在亚马逊网络服务(AWS)的函数即服务提供上实现的——Lambda。无服务器函数,如Lambda,几乎适用于任何架构,因为它们是解耦的工作单元,相对容易连接。

设置

我的示例包含一个Flux任务,用于模拟监控Web应用程序的响应时间。InfluxDB收集数据,并作为服务处理器使用其业务流程的 backend 分析引擎。

InfluxDB AWS Lambda

我使用不同的URL复制了三次模拟监控任务,并将结果存储在以监控公司命名的存储桶中。例如,我的一个任务监控Uber的响应时间,响应时间存储在名为“Uber”的存储桶中。最终结果?三个InfluxDB任务以统一的数据集捕捉网站性能,仅通过存储桶名称进行区分。

使用AWS Lambda查询InfluxDB

创建Lambda时,第一个决定是选择哪种语言。为了便于开发,我使用了NodeJS。AWS的NodeJS集成开发环境支持意味着第一个示例可以完全在浏览器中创建。

要跟进,请注册免费的AWS账户并完成Lambda入门步骤。创建一个NodeJS的hello world Lambda,并将示例1复制粘贴到控制台。在InfluxDB方面,注册免费的InfluxDB Cloud账户创建Flux任务,并获取org-id和token以插入示例1的代码

var https = require('https');

exports.handler = function(event, context) {
    var queryForCount = "from(bucket: \"Pingpire\") |> range(start: -1h) |> filter(fn: (r) => r[\"_measurement\"] == \"PingService\") |> filter(fn: (r) => r[\"_field\"] == \"response_time_ms\") |> filter(fn: (r) => r[\"method\"] == \"GET\") |> count()"

    // An object of options to indicate where to post
    var post_options = {
        host: 'us-west-2-1.aws.cloud2.influxdata.com',
        path: '/api/v2/query?orgID=<your-ord-id>',
        method: 'POST',
        headers: {
            'Content-Type': 'application/vnd.flux',
            'Authorization': 'Token <your-user-token>'
        }
    };

    // Set up the request
    var post_req_uber = https.request(post_options, function(res) {
        res.on('data', function(chunk) {
            console.log('Response: ' + chunk);
            context.succeed();
        });
        res.on('error', function(e) {
            console.log("Got error: " + e.message);
            context.done(null, 'FAILURE');
        });
    });
    // post the data
    post_req_uber.write(queryForCount);
    post_req_uber.end();
}

以下是我测试Lambda时浏览器中AWS IDE输出的截图

InfluxDB query workload - AWS Lambda

这个第一个示例是直接使用NodeJS HTTPS POST请求来演示直接使用InfluxDB API的简单直接方式。使用API功能强大且简单,但许多开发者更愿意使用库而不是直接进行HTTP调用。我的第二个示例通过InfluxDB客户端库调用InfluxDB,以展示这种方法。

设置第二个示例需要更多的工作,因为AWS NodeJS库的基础有限。要继续操作,你需要AWS命令行界面(CLI)来添加node包 - 按照以下操作安装/使用AWS CLI

一旦你有了CLI,你可以使用以下命令更新你的Lambda函数的可用库/包

>npm install @influxdata/influxdb-client

在本地目录中。你可以将这个目录打包成zip文件,然后使用CLI将其导入AWS Lambda,或者像我一样,在代码输入类型下使用NodeJS IDE的“上传zip文件”选项。

一旦Lambda集成了InfluxDB客户端库,复制粘贴示例2的代码

const { InfluxDB } = require('@influxdata/influxdb-client')

const token = '<your-user-token>'
const org = '<your-org-id'

const client = new InfluxDB({ url: 'https://us-west-2-1.aws.cloud2.influxdata.com', token: token })
const queryApi = client.getQueryApi(org)

exports.handler = function(event, context, callback) {
  var customerJSON = JSON.parse(JSON.stringify(event));

  var customers = [];
  var x;
  for (x in customerJSON) {
    customers.push(customerJSON[x]);
  }

  var y = 0;
  for (var i = 0; i < customers.length; i++) {
    var query = `from(bucket: "${customers[i]}" ) |> range(start: -10m) |> filter(fn: (r) => r.method == "GET" ) |> count(column: "_value")`;
    console.log(query)

    queryApi.queryRows(query, {
      next(row, tableMeta) {
        const o = tableMeta.toObject(row);
        console.log(`Customer "${customers[y++]}" consumed ` + o._value + ` pings.`);
      },
      error(error) {
        console.error(error)
        console.log('\nFinished ERROR')
      },
      complete() {
        console.log('\nFinished SUCCESS')
      },
    })
  }
}

如你所见,使用客户端库与API访问没有太大区别。你仍然需要token、org-id和查询,但本地库几乎总是使开发者更加高效。

要执行和测试这个示例,请记住我的InfluxDB有几个用于监控公司网站的作业,结果通过bucket名称区分。我的Lambda接收一个包含公司的JSON列表,并对该列表进行循环以查询InfluxDB的结果。以下是我的测试JSON

{
  "Company1": "Microsoft",
  "Company2": "Uber",
  "Company3": "Pingpire"
}

以下是第二个Lambda的AWS IDE输出的截图

AWS IDE Output - InfluxDB Lambda

探索无服务器集成继续...

这些初始示例展示了如何轻松地将Lambda扩展到InfluxDB。在未来几周内,我将在这一基础上进一步探索使用无服务器与InfluxDB。

如我所述,请今天注册免费的InfluxDB Cloud和AWS账户来尝试以上示例。如常,如果您有任何问题、反馈或想探索这些具体示例,请通过我们的社区论坛Slack联系!