我们如何使用 InfluxDB 进行安全监控

导航到

在 InfluxData,我们相信使用时序数据库进行安全监控是合理的。总结来说,这是因为安全调查不可避免地是面向时间的——你想要监控和警告谁访问了什么,从哪里,在什么时间——而像 InfluxDB 这样的时序数据库在查询进行此类操作所需的数据方面非常高效。

在这篇文章中,我们想向您展示我们如何使用 InfluxDB 进行安全监控的起点,以便您可以将这些模式应用到自己的组织中。

InfluxData 的安全事件库存

第一个问题:我们从哪里开始安全事件监控?由于大多数安全漏洞都与账户被篡改有关,我们决定将重点放在这里。

为了验证对我们服务的地理位置适当的访问,我们需要收集超过 100 个云服务的相关数据。但我们遇到的第一大障碍是访问数据(谁登录了)和活动数据(登录后他们做了什么)。在这 100 多个服务中,有几十个使用基于 Google Workspace(以前称为 G Suite)的单点登录(SSO)。由于我们能够获取这些服务的访问数据,我们决定从这里开始。

 

influxdata google workspace - SSO

我们正在寻找的模式

安全监控完全是关于异常检测——偏离正常的情况。我们决定查看以下内容:

  • 唯一账户总数
  • 身份验证尝试总数
  • 成功身份验证尝试总数
  • 失败身份验证尝试总数
  • 每个账户的平均 IP 地址数
  • 每个 IP 地址的平均账户数
  • 包含时间、用户名、应用程序、IP 地址以及成功与否的认证事件列表

在 InfluxDB 中存储事件数据后,可以轻松地查看任何时间段的信息,例如过去一小时、一天、一周、一个月等;特定的小时或日;一天中的某些小时(工作时间与非工作时间);或一周中的某些日(工作日与周末)。

我们希望所需的数据点尽可能简单。毕竟,我们有许多服务需要跟踪。我们使用的云服务列表——以及因此产生的攻击面——在持续变化。使用模式也会随时间变化,无论是假期放缓,还是随着更多旅行返回后疫情时代。

身份验证事件

我们从Google Workspace(GWs)审计日志服务中收集身份验证事件,登录审计日志。GCP云日志文档描述了自动收集的方法。稍后,我们将转向使用Telegraf PubSub插件收集这些事件,因为它更容易,更干净地集成。(请从我的错误中学习!)

数据收集

每个服务可能需要单独的收集过程。方法和数据模型可以相似,但每个都将具有独特性。这将通过许多方式和各种工具来完成。(详细内容超出了今天的范围。)目前,Google Workspace的收集服务运行一个每5分钟轮询一次的NodeJS程序。同样,这正在迁移到一个更简单的PubSub Telegraf监听器。

数据存储

度量存储是InfluxDB Cloud服务

数据模型

每个服务将以与其他服务不同的模式提供数据,我们需要对每个进行标准化以供我们使用。

我们将需要的基本信息

  • 身份验证时间戳
  • 公司账户ID
  • 用户名
  • 用户ID
  • 用户域
  • 身份验证类型
  • 身份验证结果

公司ID用于管理独立的公司账户。用户名通常是电子邮件地址,但可能与用户ID相匹配。

我们的密钥映射到静态值或GWs事件字段。

GWs == Google Workspace事件记录

  • 时间:GWs.id.time
  • 服务来源:"G Suite"
  • 服务域:"influxdata.com"
  • 源地址:GWs.ipAddress
  • 电子邮件地址:GWs.actor.email
  • saas_account_id:GWs.actor.profileId
  • customer_id:GWs.id.customerId
  • 应用程序:GWs.id.applicationName
  • auth_results:GWs.events[X].name
  • 登录类型:GWs.events[X].parameters[Y].value

可视化

此初始仪表板可视化包括一般使用度量、成功与失败计数、账户和地址基数、随时间变化的结果图表以及身份验证事件详细列表。在InfluxDB文档中有很好的可视化仪表板创建说明。

Google Workspace authentication

仪表板元素

用于上述仪表板每个单元格的Flux查询如下

唯一账户

这将在给定时间段内构建尝试身份验证的唯一账户列表。

from(bucket: v.bucket)
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) =>
    r._measurement == "auth_activity"
    and r._field == "auth_result"
  )
  |> keep(columns:["email_address"])
  |> group()
  |> unique(column: "email_address")
  |> count(column: "email_address")
身份验证尝试

在我们的请求期间,尝试了多少次总共的身份验证。

from(bucket: v.bucket)
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) =>
    r._measurement == "auth_activity"
    and r._field == "auth_result"
    and (r._value == "login_success" or r._value == "login_failure")
  )
  |> keep(columns:["_time","email_address"])
  |> group()
  |> count(column: "email_address")
成功

在此时间段内,有多少次身份验证尝试成功。

from(bucket: v.bucket)
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) =>
    r._measurement == "auth_activity"
    and r._value == "login_success"
  )
  |> group()
  |> count()
失败

展示了在此时间段内有多少次身份验证尝试失败。

from(bucket: v.bucket)
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) =>
    r._measurement == "auth_activity"
    and r._value == "login_failure"
  )
  |> group()
  |> count()
每个账户的平均地址基数

在给定时间段内,每个用户ID使用的互联网地址的平均数量。

from(bucket: v.bucket)
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) =>
    r._measurement == "auth_activity"
    and r._field == "auth_result"
  )
  |> keep(columns:["email_address","source_address"])
  |> group(columns: ["email_address"])
  |> unique(column: "source_address")
  |> count(column: "source_address")
  |> group()
  |> mean(column: "source_address")
每个地址的账户基数

在相同时间段内,每个互联网地址使用了多少个账户。

addresses = from(bucket: v.bucket)
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) =>
    r._measurement == "auth_activity"
    and r._field == "auth_result"
  )
  |> keep(columns:["source_address"])
  |> map(fn: (r) => ({ r with field: "x1" }))
  |> group(columns:["field"])
  |> rename(columns: {source_address: "_value"})
  |> unique()
  |> count()

accounts = from(bucket: v.bucket)
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) =>
    r._measurement == "auth_activity"
    and r._field == "auth_result"
  )
  |> keep(columns:["email_address"])
  |> map(fn: (r) => ({ r with field: "x1" }))
  |> group(columns:["field"])
  |> rename(columns: {email_address: "_value"})
  |> unique()
  |> count()

join(tables: { d1: addresses, d2: accounts }, on: ["field"])
  |> map(fn: (r) => ({
    r with _value: float(v: r._value_d1) / float(v: r._value_d2)
  }))
  |> keep(columns:["_value"])
身份验证结果

总结在整个时间段内发生的身份验证尝试成功和失败。

from(bucket: v.bucket)
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) =>
    r._measurement == "auth_activity"
    and (r._field == "auth_result")
  )
  |> keep(columns: ["_start","_stop","_time","_value"])
  |> map(fn: (r) => ({ r with res: r._value }))
  |> group(columns: ["res"])
  |> aggregateWindow(every: v.windowPeriod, fn: count )
最新的身份验证事件

给定时间段内身份验证事件和详细信息的完整列表。

from(bucket: v.bucket)
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter( fn: (r) =>
    r._measurement == "auth_activity"
    and r._field == "auth_result"
  )
  |> duplicate(column: "_value", as: "auth_result")
  |> drop(columns:[
    "_start","_stop","_field","_measurement","application",
    "customer_id", "service_source","saas_account_id","_value",
    "service_domain"
  ])
  |> group()
  |> sort(columns:["_time"], desc: true)

对我们同行云软件供应商的请求

让我站到我的肥皂箱上。

我们注意到的一个问题是,许多云服务和SaaS应用程序不提供对安全事件(如登录)的访问权限。对于那些提供此类服务的企业,许多还额外收费。例如,以下是AWS CloudTrail的价格,它允许您记录和监控AWS账户活动。

作为云和SaaS供应商,我们正在对自己造成伤害,因为这些做法降低了客户发现安全漏洞的可能性。我们越能通过API广泛提供安全事件,并且让这些API免费,我们就越能增强我们所有人提供的产品信任度。

security events API trust

版权:@sammiechaffin 通过 Unsplash

想想汽车行业——他们不会对更高级的安全带、防抱死制动系统或安全气囊额外收费。这些是标准配置,因为汽车供应商知道,他们使汽车越安全,人们就越信任他们作为交通工具。我们需要开始这样思考。

所以如果你是云软件开发人员,请通过免费API提供你的安全事件,特别是认证事件。具体来说,提供以下信息的程序化访问,无论是通过拉取(REST API调用)还是推送(Web Sockets、MQTT、AMQP等):

  • 访问:谁(尝试)登录,什么时间,从哪里登录,以IP地址或完全限定域名(FQDN)的形式。更好的是:确定登录的经纬度。这样,客户可以计算登录会话之间的距离,以查看它们是否表明账户被入侵。
  • 使用:某人的会话持续了多长时间。
  • 活动:这是特定领域的,应该允许跟踪应用程序或云服务中的至少添加、更改和删除操作。

InfluxDB OSS企业版产品产生详细的授权和活动日志。这些日志可通过AWSAzureGoogle Cloud 市场获取,以简化安装和购买。

现在,为了完全透明,这些信息仅适用于请求支持团队的InfluxDB云服务。这些功能在我们的产品路线图上,我们正在努力解决这个问题。

更多关于InfluxData和安全的详细信息

InfluxDB关于安全的文章

我们有以下安全监控模板

将日志数据发送到InfluxDB

从 InfluxDB 发送日志数据

结论

随着我们将运营和服务从自托管迁移到云端,安全相关事件收集和关联变得更加困难,但观察它们的重要性却更加突出。我们的工具和方法必须不断发展,以跟上不断变化的攻击面,InfluxData 平台可以很好地用于此。

我们将继续构建和展示各种改进我们安全状况的方法,敬请期待更多内容!

如果您使用时间序列数据库进行安全监控,我们很乐意听取您的意见。请在我们Slack社区网站上告诉我们。如果您想自己尝试 InfluxDB,请在此获取

感谢 Al Sargent 和 Peter Albert 对本文的贡献和帮助。