我们如何使用 InfluxDB 进行安全监控

导航至

在 InfluxData,我们认为使用时间序列数据库进行安全监控是有道理的。 总结来说,这是因为安全调查不可避免地是面向时间的——您想要监控和警报谁在什么时间从哪里访问了什么——而像 InfluxDB 这样的时间序列数据库在查询执行此操作所需的数据方面非常高效。

在这篇文章中,我们想向您展示我们如何开始使用 InfluxDB 进行安全监控,以便您可以将这些模式应用于您自己的组织。

我们在 InfluxData 的安全事件清单

第一个问题:从哪里开始我们的安全事件监控? 由于大多数安全漏洞都与帐户泄露有关,我们决定从那里开始关注。

为了验证对我们服务的地理位置适当的访问,我们需要收集 100 多个云服务的数据。 但我们遇到的第一个障碍之一是访问数据(谁登录了)和活动数据(登录后他们做了什么)的可用性。 在这 100 多项服务中,有几十项使用单点登录 (SSO),使用 Google Workspace(以前称为 G Suite)。 由于我们能够获取这些服务的访问数据,我们决定从那里开始。

 

influxdata google workspace - SSO

我们正在寻找的模式

安全监控完全是关于异常检测——什么是与正常的偏差。 我们决定关注以下几点

  • 唯一帐户总数
  • 身份验证尝试总数
  • 成功身份验证尝试总数
  • 不成功的身份验证尝试总数
  • 每个帐户的平均 IP 地址数
  • 每个 IP 地址的平均帐户数
  • 身份验证事件列表,包括时间、用户名、应用、IP 地址以及是否成功

通过存储在 InfluxDB 中的事件数据,可以轻松查看上述任何时间段的信息,例如过去一小时、一天、一周、一个月等; 一组特定的时段或天数; 一天中的某些时段(工作时间和非工作时间); 或一周中的某些天(工作日与周末)。

我们希望保持所需数据点的简单性。 毕竟,我们有很多服务需要跟踪。 我们使用的云服务列表——因此 攻击面 ——正在不断变化。 并且使用模式会随着时间而变化,无论是假期放缓,还是后疫情世界恢复更多旅行。

身份验证事件

我们正在从 Google Workspace (GWs) 审核日志服务收集身份验证事件,登录审核日志GCP Cloud Logging 文档 描述了自动收集的方法。 稍后,我们将转向使用 Telegraf PubSub 插件 收集这些事件,因为它是一种更简单、更干净的集成。 (请从我的错误中吸取教训!)

数据收集

每项服务都可能需要单独的收集过程。 方法和数据模型可能相似,但每种方法都是独一无二的。 这将通过多种方式和各种工具来完成。 (详细信息超出了我们今天的范围。)Google Workspace 的收集服务目前每 5 分钟运行一个 NodeJS 轮询程序。 同样,这正在迁移到更简单的 PubSub Telegraf 监听器。

数据存储

指标存储是 InfluxDB Cloud 服务

数据模型

每项服务将以不同于其他服务的模式传递数据,我们需要对每项服务进行规范化以供我们使用。

我们将需要的基本信息

  • 身份验证时间戳
  • 公司帐户 ID
  • 用户名
  • 用户 ID
  • 用户域
  • 身份验证类型
  • 身份验证结果

公司 ID 用于管理单独的公司帐户。 用户名通常是电子邮件地址,但也可能与用户 ID 匹配。

我们的密钥映射到静态值或 GWs 事件字段。

GWs == Google Workspace 事件记录

  • time: GWs.id.time
  • service_source: "G Suite"
  • service_domain: "influxdata.com"
  • source_address: GWs.ipAddress
  • email_address: GWs.actor.email
  • saas_account_id: GWs.actor.profileId
  • customer_id: GWs.id.customerId
  • application: GWs.id.applicationName
  • auth_results: GWs.events[X].name
  • login_type: GWs.events[X].parameters[Y].value

可视化

此初始 仪表板可视化 由通用使用指标、成功与失败计数、帐户和地址基数、随时间变化的图表以及身份验证事件详细信息列表组成。 可视化和仪表板 的创建在 InfluxDB 文档 中得到了很好的介绍。

Google Workspace authentication

仪表板元素

上述仪表板的每个单元格使用的 Flux 查询 如下

唯一帐户

这构建了在给定时间段内尝试身份验证的唯一帐户列表。

from(bucket: v.bucket)
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) =>
    r._measurement == "auth_activity"
    and r._field == "auth_result"
  )
  |> keep(columns:["email_address"])
  |> group()
  |> unique(column: "email_address")
  |> count(column: "email_address")
身份验证尝试

在我们的请求期间尝试了多少次身份验证。

from(bucket: v.bucket)
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) =>
    r._measurement == "auth_activity"
    and r._field == "auth_result"
    and (r._value == "login_success" or r._value == "login_failure")
  )
  |> keep(columns:["_time","email_address"])
  |> group()
  |> count(column: "email_address")
成功

在此时间段内,有多少次身份验证尝试成功。

from(bucket: v.bucket)
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) =>
    r._measurement == "auth_activity"
    and r._value == "login_success"
  )
  |> group()
  |> count()
失败

演示在此期间有多少次身份验证尝试失败。

from(bucket: v.bucket)
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) =>
    r._measurement == "auth_activity"
    and r._value == "login_failure"
  )
  |> group()
  |> count()
每个帐户的平均地址基数

在给定的时间段内,每个用户 ID 使用的互联网地址的平均数量是多少。

from(bucket: v.bucket)
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) =>
    r._measurement == "auth_activity"
    and r._field == "auth_result"
  )
  |> keep(columns:["email_address","source_address"])
  |> group(columns: ["email_address"])
  |> unique(column: "source_address")
  |> count(column: "source_address")
  |> group()
  |> mean(column: "source_address")
每个地址的帐户总基数

在同一时间段内,每个互联网地址使用了多少个帐户。

addresses = from(bucket: v.bucket)
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) =>
    r._measurement == "auth_activity"
    and r._field == "auth_result"
  )
  |> keep(columns:["source_address"])
  |> map(fn: (r) => ({ r with field: "x1" }))
  |> group(columns:["field"])
  |> rename(columns: {source_address: "_value"})
  |> unique()
  |> count()

accounts = from(bucket: v.bucket)
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) =>
    r._measurement == "auth_activity"
    and r._field == "auth_result"
  )
  |> keep(columns:["email_address"])
  |> map(fn: (r) => ({ r with field: "x1" }))
  |> group(columns:["field"])
  |> rename(columns: {email_address: "_value"})
  |> unique()
  |> count()

join(tables: { d1: addresses, d2: accounts }, on: ["field"])
  |> map(fn: (r) => ({
    r with _value: float(v: r._value_d1) / float(v: r._value_d2)
  }))
  |> keep(columns:["_value"])
身份验证结果

整个时间段内发生的身份验证尝试成功和失败的摘要。

from(bucket: v.bucket)
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) =>
    r._measurement == "auth_activity"
    and (r._field == "auth_result")
  )
  |> keep(columns: ["_start","_stop","_time","_value"])
  |> map(fn: (r) => ({ r with res: r._value }))
  |> group(columns: ["res"])
  |> aggregateWindow(every: v.windowPeriod, fn: count )
最新身份验证事件

时间段内身份验证事件和详细信息的完整列表。

from(bucket: v.bucket)
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter( fn: (r) =>
    r._measurement == "auth_activity"
    and r._field == "auth_result"
  )
  |> duplicate(column: "_value", as: "auth_result")
  |> drop(columns:[
    "_start","_stop","_field","_measurement","application",
    "customer_id", "service_source","saas_account_id","_value",
    "service_domain"
  ])
  |> group()
  |> sort(columns:["_time"], desc: true)

向我们的云软件供应商同仁发出请求

请允许我站到我的肥皂箱上说几句。

我们注意到的一件事是,许多云服务和 SaaS 应用程序不提供对安全事件(如登录)的访问权限。 而对于那些确实提供的服务,许多服务会为此额外收费。 例如,这是 AWS CloudTrail 的定价,它允许您记录和监控您的 AWS 帐户活动。

作为一个行业,我们云和 SaaS 供应商正在损害我们自己,因为这些做法降低了我们的客户发现安全漏洞的可能性。 我们越能通过 API 广泛提供安全事件——并使这些 API 免费——我们就越能建立对我们所有人提供的产品的信任。

security events API trust

图片来源:@sammiechaffin via Unsplash

想想汽车行业——他们不会对更高级的安全带、防抱死制动系统或安全气囊额外收费。 这些都是标配,因为汽车供应商知道,他们制造的汽车越安全,人们就越会信任它们作为交通工具。 我们需要开始像那样思考。

因此,如果您是云软件开发人员,请通过免费 API 提供您的安全事件,尤其是身份验证事件。 具体来说,提供程序化访问,通过拉取(REST API 调用)或推送(Web 套接字、MQTT、AMQP 等)以下信息

  • 访问: 谁(尝试)登录,在什么时间,以及从哪里登录,以 IP 地址或 完全限定域名 (FQDN) 的形式。 更好的是:确定登录的纬度和经度。 这样,客户可以计算登录会话之间的距离,以查看它们是否表明帐户遭到泄露。
  • 使用情况: 某人的会话持续了多长时间。
  • 活动: 这是特定于领域的,应允许跟踪应用程序或云服务中的至少添加、更改和删除操作。

InfluxDB OSSEnterprise 产品生成详细的授权和活动日志。 这可以通过 AWSAzureGoogle Cloud 市场获得,以简化安装和购买。

现在,要完全透明,该信息仅适用于 InfluxDB Cloud 服务,需要向支持团队提出请求。 这些功能已在我们的产品路线图上,我们正在努力纠正这一点。

有关 InfluxData 和安全的更多信息

关于安全的 InfluxDB 内容

我们有以下安全监控模板

将日志数据发送到 InfluxDB

从 InfluxDB 发送日志数据

结论

随着我们将运营和服务从自托管迁移到云端,与安全相关的事件更难收集和关联,但监控它们变得更加重要。 我们的工具和方法必须不断发展,以跟上不断变化的攻击面,而 InfluxData 平台可以很好地用于此目的。

我们将继续构建和演示各种改进我们安全态势的方法,敬请期待更多信息!

如果您正在使用时间序列数据库进行安全监控,我们很乐意听取您的意见。 请在 我们的 Slack我们的社区网站 上告知我们。 如果您想亲自试用 InfluxDB,请 在此处获取

感谢 Al Sargent 和 Peter Albert 在本文中提供的帮助和贡献。