我们如何使用 InfluxDB 进行安全监控
作者:Darin Fisher / 产品, 用例, 开发者
2021 年 1 月 29 日
导航至
在 InfluxData,我们认为使用时间序列数据库进行安全监控是有道理的。 总结来说,这是因为安全调查不可避免地是面向时间的——您想要监控和警报谁在什么时间从哪里访问了什么——而像 InfluxDB 这样的时间序列数据库在查询执行此操作所需的数据方面非常高效。
在这篇文章中,我们想向您展示我们如何开始使用 InfluxDB 进行安全监控,以便您可以将这些模式应用于您自己的组织。
我们在 InfluxData 的安全事件清单
第一个问题:从哪里开始我们的安全事件监控? 由于大多数安全漏洞都与帐户泄露有关,我们决定从那里开始关注。
为了验证对我们服务的地理位置适当的访问,我们需要收集 100 多个云服务的数据。 但我们遇到的第一个障碍之一是访问数据(谁登录了)和活动数据(登录后他们做了什么)的可用性。 在这 100 多项服务中,有几十项使用单点登录 (SSO),使用 Google Workspace(以前称为 G Suite)。 由于我们能够获取这些服务的访问数据,我们决定从那里开始。
我们正在寻找的模式
安全监控完全是关于异常检测——什么是与正常的偏差。 我们决定关注以下几点
- 唯一帐户总数
- 身份验证尝试总数
- 成功身份验证尝试总数
- 不成功的身份验证尝试总数
- 每个帐户的平均 IP 地址数
- 每个 IP 地址的平均帐户数
- 身份验证事件列表,包括时间、用户名、应用、IP 地址以及是否成功
通过存储在 InfluxDB 中的事件数据,可以轻松查看上述任何时间段的信息,例如过去一小时、一天、一周、一个月等; 一组特定的时段或天数; 一天中的某些时段(工作时间和非工作时间); 或一周中的某些天(工作日与周末)。
我们希望保持所需数据点的简单性。 毕竟,我们有很多服务需要跟踪。 我们使用的云服务列表——因此 攻击面 ——正在不断变化。 并且使用模式会随着时间而变化,无论是假期放缓,还是后疫情世界恢复更多旅行。
身份验证事件
我们正在从 Google Workspace (GWs) 审核日志服务收集身份验证事件,登录审核日志。 GCP Cloud Logging 文档 描述了自动收集的方法。 稍后,我们将转向使用 Telegraf PubSub 插件 收集这些事件,因为它是一种更简单、更干净的集成。 (请从我的错误中吸取教训!)
数据收集
每项服务都可能需要单独的收集过程。 方法和数据模型可能相似,但每种方法都是独一无二的。 这将通过多种方式和各种工具来完成。 (详细信息超出了我们今天的范围。)Google Workspace 的收集服务目前每 5 分钟运行一个 NodeJS 轮询程序。 同样,这正在迁移到更简单的 PubSub Telegraf 监听器。
数据存储
指标存储是 InfluxDB Cloud 服务。
数据模型
每项服务将以不同于其他服务的模式传递数据,我们需要对每项服务进行规范化以供我们使用。
我们将需要的基本信息
- 身份验证时间戳
- 公司帐户 ID
- 用户名
- 用户 ID
- 用户域
- 身份验证类型
- 身份验证结果
公司 ID 用于管理单独的公司帐户。 用户名通常是电子邮件地址,但也可能与用户 ID 匹配。
我们的密钥映射到静态值或 GWs 事件字段。
GWs == Google Workspace 事件记录
- time: GWs.id.time
- service_source: "G Suite"
- service_domain: "influxdata.com"
- source_address: GWs.ipAddress
- email_address: GWs.actor.email
- saas_account_id: GWs.actor.profileId
- customer_id: GWs.id.customerId
- application: GWs.id.applicationName
- auth_results: GWs.events[X].name
- login_type: GWs.events[X].parameters[Y].value
可视化
此初始 仪表板可视化 由通用使用指标、成功与失败计数、帐户和地址基数、随时间变化的图表以及身份验证事件详细信息列表组成。 可视化和仪表板 的创建在 InfluxDB 文档 中得到了很好的介绍。
仪表板元素
上述仪表板的每个单元格使用的 Flux 查询 如下
唯一帐户
这构建了在给定时间段内尝试身份验证的唯一帐户列表。
from(bucket: v.bucket)
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) =>
r._measurement == "auth_activity"
and r._field == "auth_result"
)
|> keep(columns:["email_address"])
|> group()
|> unique(column: "email_address")
|> count(column: "email_address")
身份验证尝试
在我们的请求期间尝试了多少次身份验证。
from(bucket: v.bucket)
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) =>
r._measurement == "auth_activity"
and r._field == "auth_result"
and (r._value == "login_success" or r._value == "login_failure")
)
|> keep(columns:["_time","email_address"])
|> group()
|> count(column: "email_address")
成功
在此时间段内,有多少次身份验证尝试成功。
from(bucket: v.bucket)
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) =>
r._measurement == "auth_activity"
and r._value == "login_success"
)
|> group()
|> count()
失败
演示在此期间有多少次身份验证尝试失败。
from(bucket: v.bucket)
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) =>
r._measurement == "auth_activity"
and r._value == "login_failure"
)
|> group()
|> count()
每个帐户的平均地址基数
在给定的时间段内,每个用户 ID 使用的互联网地址的平均数量是多少。
from(bucket: v.bucket)
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) =>
r._measurement == "auth_activity"
and r._field == "auth_result"
)
|> keep(columns:["email_address","source_address"])
|> group(columns: ["email_address"])
|> unique(column: "source_address")
|> count(column: "source_address")
|> group()
|> mean(column: "source_address")
每个地址的帐户总基数
在同一时间段内,每个互联网地址使用了多少个帐户。
addresses = from(bucket: v.bucket)
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) =>
r._measurement == "auth_activity"
and r._field == "auth_result"
)
|> keep(columns:["source_address"])
|> map(fn: (r) => ({ r with field: "x1" }))
|> group(columns:["field"])
|> rename(columns: {source_address: "_value"})
|> unique()
|> count()
accounts = from(bucket: v.bucket)
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) =>
r._measurement == "auth_activity"
and r._field == "auth_result"
)
|> keep(columns:["email_address"])
|> map(fn: (r) => ({ r with field: "x1" }))
|> group(columns:["field"])
|> rename(columns: {email_address: "_value"})
|> unique()
|> count()
join(tables: { d1: addresses, d2: accounts }, on: ["field"])
|> map(fn: (r) => ({
r with _value: float(v: r._value_d1) / float(v: r._value_d2)
}))
|> keep(columns:["_value"])
身份验证结果
整个时间段内发生的身份验证尝试成功和失败的摘要。
from(bucket: v.bucket)
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) =>
r._measurement == "auth_activity"
and (r._field == "auth_result")
)
|> keep(columns: ["_start","_stop","_time","_value"])
|> map(fn: (r) => ({ r with res: r._value }))
|> group(columns: ["res"])
|> aggregateWindow(every: v.windowPeriod, fn: count )
最新身份验证事件
时间段内身份验证事件和详细信息的完整列表。
from(bucket: v.bucket)
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter( fn: (r) =>
r._measurement == "auth_activity"
and r._field == "auth_result"
)
|> duplicate(column: "_value", as: "auth_result")
|> drop(columns:[
"_start","_stop","_field","_measurement","application",
"customer_id", "service_source","saas_account_id","_value",
"service_domain"
])
|> group()
|> sort(columns:["_time"], desc: true)
向我们的云软件供应商同仁发出请求
请允许我站到我的肥皂箱上说几句。
我们注意到的一件事是,许多云服务和 SaaS 应用程序不提供对安全事件(如登录)的访问权限。 而对于那些确实提供的服务,许多服务会为此额外收费。 例如,这是 AWS CloudTrail 的定价,它允许您记录和监控您的 AWS 帐户活动。
作为一个行业,我们云和 SaaS 供应商正在损害我们自己,因为这些做法降低了我们的客户发现安全漏洞的可能性。 我们越能通过 API 广泛提供安全事件——并使这些 API 免费——我们就越能建立对我们所有人提供的产品的信任。
想想汽车行业——他们不会对更高级的安全带、防抱死制动系统或安全气囊额外收费。 这些都是标配,因为汽车供应商知道,他们制造的汽车越安全,人们就越会信任它们作为交通工具。 我们需要开始像那样思考。
因此,如果您是云软件开发人员,请通过免费 API 提供您的安全事件,尤其是身份验证事件。 具体来说,提供程序化访问,通过拉取(REST API 调用)或推送(Web 套接字、MQTT、AMQP 等)以下信息
- 访问: 谁(尝试)登录,在什么时间,以及从哪里登录,以 IP 地址或 完全限定域名 (FQDN) 的形式。 更好的是:确定登录的纬度和经度。 这样,客户可以计算登录会话之间的距离,以查看它们是否表明帐户遭到泄露。
- 使用情况: 某人的会话持续了多长时间。
- 活动: 这是特定于领域的,应允许跟踪应用程序或云服务中的至少添加、更改和删除操作。
InfluxDB OSS 和 Enterprise 产品生成详细的授权和活动日志。 这可以通过 AWS、Azure 和 Google Cloud 市场获得,以简化安装和购买。
现在,要完全透明,该信息仅适用于 InfluxDB Cloud 服务,需要向支持团队提出请求。 这些功能已在我们的产品路线图上,我们正在努力纠正这一点。
有关 InfluxData 和安全的更多信息
关于安全的 InfluxDB 内容
- 使用 InfluxDB 监控端点安全状态
- 使用 InfluxDB 和 Telegraf 监控 Fail2ban
- 自动化 SSL 证书过期监控
- InfluxDB OSS 2.0 安全和授权
- 管理 InfluxDB 安全性 | InfluxDB OSS 1.8 文档
- InfluxData 的安全性
- 使用 Suricata 和 Telegraf 进行网络安全监控 | 博客
- InfluxDB 中的身份验证和授权 | InfluxDB OSS 1.8 文档
我们有以下安全监控模板
将日志数据发送到 InfluxDB
- 将 Syslog 数据发送到 InfluxDB 和 Telegraf Syslog 输入插件
- Telegraf Docker 日志插件
- Telegraf Graylog 输入插件
- Telegraf Logparser 插件
- Telegraf Logstash 插件
- Telegraf Tail 插件
- fluentd 和 InfluxDB 的缓冲输出插件
从 InfluxDB 发送日志数据
结论
随着我们将运营和服务从自托管迁移到云端,与安全相关的事件更难收集和关联,但监控它们变得更加重要。 我们的工具和方法必须不断发展,以跟上不断变化的攻击面,而 InfluxData 平台可以很好地用于此目的。
我们将继续构建和演示各种改进我们安全态势的方法,敬请期待更多信息!
如果您正在使用时间序列数据库进行安全监控,我们很乐意听取您的意见。 请在 我们的 Slack 或 我们的社区网站 上告知我们。 如果您想亲自试用 InfluxDB,请 在此处获取。
感谢 Al Sargent 和 Peter Albert 在本文中提供的帮助和贡献。