TL;DR InfluxDB 技术提示:处理 JSON 对象和映射数组
作者:Anais Dotis-Georgiou / 产品, 用例, 开发者
2022 年 5 月 11 日
导航至
有多种方法可以使用 Flux 从各种不同的数据源引入数据,包括 SQL 数据库、其他 InfluxDB Cloud 账户、来自 URL 的带注释 CSV 和 JSON。但是,以前您只能使用 Flux 手动构建 JSON 对象表,如第一个示例中所述。
我们将描述如何使用三个示例来处理日益复杂的 JSON 类型。首先,我们将描述如何使用元语法示例来处理这些 JSON 类型。然后,我们将介绍一个使用 OpenWeatherMap API 的真实示例。
在使用 Flux 处理 JSON 对象时,务必注意以下函数和注意事项
- display() 函数:使用此函数返回任何值的 Flux 表示形式作为字符串。这允许您在 InfluxDB UI 中可视化您的 JSON 对象。此功能也在最新的 OSS 版本中发布,因此请确保您已更新。
- array.from() 函数:将此函数与 display() 函数结合使用,以在 InfluxDB UI 中将 JSON 对象视为字符串。我们将在以下部分讨论如何将这两个函数一起使用。
- json.parse() 函数:使用此函数将 JSON 对象转换为 Flux 值。请参阅此处的类型转换。在使用 http.post() 时,您必须将此函数应用于来自 API 调用的 JSON 对象响应。
- time() 函数:使用此函数将字符串时间戳转换为时间值。
- array.map() 函数:使用此函数迭代数组以处理复杂的 JSON 类型。
JSON 示例 1
第一个示例:从键中提取顶层值。
- 将 JSON 转换为类似
JSON = {"a": 1, "b": 2, "c": 3, "time": timestamp_1}
- 以下 Flux 表
time a b c timestamp_1 1 2 3 - 使用以下 Flux
jsonData = json.parse(data: JSON) // uncomment the next line to display the JSON // array.from(rows: [{data: display(v: jsonData)}]) // convert the timestamp to a time type with the time() function // use the array.from() function to create a table array.from(rows: [{_time: time(v: int(v: jsonData.timestamp_1)) , a: jsonData.a, b: jsonData.b, c: jsonData.c }])
- 在此处查找运行此示例的代码。
JSON 示例 2
第二个示例:跨复杂类型数组进行映射。
- 将 JSON 转换为类似
JSON = {"a": 1, "list": [{"b": 1.1, "time": timestamp_1}, {"b": 1.2, "time": timestamp_2} ] }
- 以下 Flux 表
time a b timestamp_1 1 1.1 timestamp_2 1 1.2 - 使用以下 Flux
jsonData = json.parse(data: JSON) // uncomment the next line to display the JSON // array.from(rows: [{data: display(v: jsonData)}]) // extract the list that we want to map across with array.map listData = jsonData.list // gather any top level values that you want to include in your table as constants a = jsonData.a // map across each complex type in the array named "list" list = array.map( arr: listData, fn: (x) => ({ // convert the timestamp to a time type with the time() function "_time": time(v::x.time), "b": x.b, "city": a }) ) // finally convert that flattened list into a table with array.from array.from(rows: list)
- 在此处查找运行此示例的代码。
JSON 示例 3
第三个示例:跨复杂类型数组进行映射,该数组包含带有复杂类型的嵌套数组。
- 将 JSON 转换为类似
JSON = {"list": [{"b": 1.1, "nestedList": [{"alpha": "foo1", "beta": "bar1"}, {"alpha": "foo2", "beta": "foo2"}], "time": 1596632400000000000}, {"b": 1.2, "nestedList": [{"alpha": "foo1", "beta": "bar1"}, {"alpha": "foo2", "beta": "foo2"}], "time": 1596632500000000000} ] }
- 以下 Flux 表
time alpha b timestamp_1 foo1 1.1 timestamp_2 foo1 1.2 - 使用以下 Flux
jsonData = json.parse(data: JSON) // uncomment the next line to display the JSON // array.from(rows: [{data: display(v: jsonData)}]) // extract the list that we want to map across with array.map listData = jsonData.list // map across each complex type in the array named "list" list = listData |> array.map(fn:(x) => { pendingList = x.nestedList return { id: x.b, _time: time(v: int(v: x.time)), alpha: pendingList[0].alpha, } }) array.from(rows: list)
- 在此处查找运行此示例的代码。
JSON 示例 4
第四个示例:跨复杂类型数组进行映射,该数组包含嵌套数组并评估值是否存在。此示例与示例 3 类似,但它可以用于处理可能存在空值的情况。
- 将 JSON 转换为类似
JSON = {"list": [{"b": 1.1, "nestedList": [{"alpha": "foo1", "beta": "bar1"}, {"alpha": "foo2", "beta": "foo2"}], "time": 1596632400000000000}, {"b": 1.2, "nestedList": [{"alpha": "null", "beta": "bar1"}, {"alpha": "foo2", "beta": "foo2"}], "time": 1596632500000000000} ] }
- 以下 Flux 表
time alpha b timestamp_1 foo1 1.1 timestamp_2 foo1 1.2 - 使用以下 Flux
jsonData = json.parse(data: JSON) // uncomment the next line to display the JSON // array.from(rows: [{data: display(v: jsonData)}]) // extract the list that we want to map across with array.map listData = jsonData.list // map across each complex type in the array named "list" list = listData |> array.map(fn:(x) => { pendingList = x.nestedList // filter for the value |> array.filter(fn: (x) => x.alpha == "foo1") // handle null values pendingAlpha = if length(arr: pendingList) == 1 then pendingList[0].alpha else "foo1" return { id: x.b, _time: time(v: int(v: x.time)), alpha: pendingAlpha, } }) array.from(rows: list)
- 在此处查找运行此示例的代码。
OpenWeatherMap API 示例:带有复杂类型的数组
在此示例中,我们将学习如何跨嵌套 JSON 对象数组进行映射,以构建一个表,从而使在 Flux 中使用 JSON 更加容易。最后,我们将使用 Flux 将表写入 InfluxDB,这包括以下步骤
- 使用 http.get() 函数向 API 提交 GET 请求并返回 JSON 对象。
- 使用 json.encode() 函数将对象转换为字节。
- 使用 json.parse() 函数将 JSON 对象或数组转换为 Flux 记录或数组。
- 使用 array.map() 函数跨数组中的元素进行映射。
- 使用 array.from() 函数构造表。
- 使用 to() 函数将数据写入 InfluxDB。
在本节中,我们将使用 OpenWeatherMap 当前天气数据 API,使用 hourly forecast endpoint 收集过去 4 天伦敦每小时的天气数据。我们将返回的 JSON 响应包含 96 个时间戳的数据,如下所示
{
"cod": "200",
"message": 0.0179,
"cnt": 96,
"list": [
{
"dt": 1596632400,
"main": {
"temp": 289.16,
"feels_like": 288.41,
"temp_min": 289.16,
"temp_max": 289.16,
"pressure": 1013,
"sea_level": 1013,
"grnd_level": 1010,
"humidity": 78,
"temp_kf": 0
},
"weather": [
{
"id": 804,
"main": "Clouds",
"description": "overcast clouds",
"icon": "04n"
}
],
"clouds": {
"all": 100
},
"wind": {
"speed": 2.03,
"deg": 252,
"gust":5.46
},
"visibility": 10000,
"pop": 0.04,
"sys": {
"pod": "n"
},
"dt_txt": "2020-08-05 13:00:00"
},
.....
],
"city": {
"id": 2643743,
"name": "London",
"coord": {
"lat": 51.5085,
"lon": -0.1258
},
"country": "GB",
"timezone": 0,
"sunrise": 1568958164,
"sunset": 1569002733
}
}
以下 Flux 代码
- 导入必要的库
- 返回请求对象并将其存储在变量 resp 中。
- 解析响应的 JSON 正文,并将 list 属性存储在变量 nestedListData 中。此属性包含一个数组列表,其中包含伦敦的时间戳和温度。
- 跨对象列表进行映射以提取日期时间和温度。温度作为嵌套数组存在于主属性中。Unix 日期时间值通过将该值乘以 1000000000 转换为纳秒精度。time() 函数用于将 Unix 时间戳转换为 RFC3339 格式。执行此时间戳转换是为了让您可以更好地可视化您的数据。
- 使用 array.from() 函数构造表,并传入提取的时间和温度值列表。
- 分别使用 set() 函数创建 _measurement 和 _field 列,其值分别为“weather”和“temp”。这是为使用 to() 函数做准备,该函数要求您的表具有以下列
- _time
- _measurement
- _field
- _value
- 使用 to() 函数将数据写入名为“bucket”的存储桶。
import "experimental/json"
import "experimental/http"
import "experimental/array"
weatherURL = "https://pro.openweathermap.org/data/2.5/forecast/hourly?lat=35&lon=139&appid=<yourAPIkey>
// http.get() returns the response from the API
resp = http.get(url: weatherURL)
// json.parse function returns a json object with lists, records, strings, booleans, and floats
jsonData = json.parse(data: resp.body)
// get the list object
listData = jsonData.list
// extract the city name just once
city = jsonData.city.name
list = array.map(
arr: listData,
fn: (x) => ({
// convert the timestamp to a time type with the time() function
"_time": time(v::x.dt*1000000000),
"_value": x.main.temp,
"city": city
})
)
array.from(rows: list)
|> set(key: "_field", value: "temp")
|> set(key: "_measurement", value: "weather")
|> yield()
|> to(bucket: "bucket")
关于使用 Flux 处理 JSON 对象的最终思考
我希望这篇 InfluxDB 技术提示文章能启发您利用 Flux 发出请求并操作 JSON 响应。现在您可以使用 array.map() 函数跨 JSON 中的嵌套数组进行映射以构造行。与往常一样,您可以使用 array.from() 函数从这些行构造表。在准备输出以满足 to() 函数的数据要求后,我们最终能够写入表。此功能为数据摄取到 InfluxDB 提供了另一种方法。您可以在计划任务中执行此逻辑,以定期写入来自 HTTP 请求的数据。
如果您正在使用 Flux 并且需要帮助,请在我们的社区站点或 Slack 频道中寻求帮助。如果您正在 InfluxDB 之上开发一个很酷的物联网应用程序,我们很乐意听到它,因此请务必分享您的故事!此外,请在评论部分分享您的想法、疑虑或问题。我们很乐意获得您的反馈并帮助您解决遇到的任何问题!