首页 > 其他分享 >telegraf解析嵌套json遇到的问题

telegraf解析嵌套json遇到的问题

时间:2024-01-19 16:33:27浏览次数:34  
标签:loc telegraf parameters timestamp v2 kafka 嵌套 json

问题描述

kafka中的数据格式如下:

{
    "customerId": 1652,
    "deviceId": "13011304383",
    "timestamp": 1705637828000,
    "parameters": {
        "acc": 0,
        "locationStatus": 1,
        "altitude": 38.0,
        "loc": {
            "lng": 117.306441,
            "lat": 31.93148
        },
        "latitude": 31.93148,
        "brushState": 0,
        "speed": 0.0,
        "direction": 136.0,
        "height": 38.0,
        "longitude": 117.306441,
        "mileage": 267119.0
    },
    "componentId": 7,
    "entityId": 81495
}

需要通过telegraf把kafka中的数据,同步到influxdb,parameters中的所有key,作为时序数据库的指标存储。

由于influxdb的field value只支持string、int、float等简单类型,不支持对象。而parameters.loc是一个对象,因此呢,需要在telegraf中把对象转换成字符串进行存储。

方案一

使用telegraf中的xpath插件:

data_format = "xpath_json"
  [[inputs.kafka_consumer.xpath]]
      metric_name = "string('device_metric')"
      timestamp = '/timestamp'
      timestamp_format = 'unix_ms'
      timezone = 'Asia/Shanghai'
      field_selection = "/parameters/child::*"
      ### https://github.com/influxdata/telegraf/tree/master/plugins/parsers/json_v2
      [inputs.kafka_consumer.xpath.tags]
        customerId = "/customerId"
        deviceId = "/deviceId"

      ## 优点:可以吧loc作为filed value字符串存储。 可以设置默认把parameters所有key作为指标
      [inputs.kafka_consumer.xpath.fields]
        speed = "number(/parameters/speed)"
        battery = "number(/parameters/battery)"
        brushState = "number(/parameters/brushState)"
        height = "number(/parameters/height)"
        mileage = "number(/parameters/mileage)"
        oil = "number(/parameters/oil)"
        original_oil = "number(/parameters/original_oil)"
        original_waterLevel = "number(/parameters/original_waterLevel)"
        loc = "/parameters/loc"
        lng = "number(/parameters/loc/lng)"
        lat = "number(/parameters/loc/lat)"

      [inputs.kafka_consumer.xpath.fields_int]
        acc = "/parameters/acc"

解析后得到的数据格式如下:

device_metric,customerId=2104,deviceId=18123255038,host=qths-hwpt-03 altitude="83",brushState=0,height=83,locationStatus="1",battery=0,acc=1i,direction="266",mileage=137323,latitude="22.640786",loc="{\"lat\":22.640786,\"lng\":114.200608}",oil=0,lng=114.200608,longitude="114.200608",speed=0,original_oil=0,original_waterLevel=0,lat=22.640786 1705638436000000000

重点关于field key: loc的值:

loc="{\"lat\":22.640786,\"lng\":114.200608}"

以及loc字段的解析配置:

 loc = "/parameters/loc"

"/parameters/loc" 定义的是一个json path, 它的值是一个对象,默认情况下这个插件会把value转换成字符串,因此呢,loc的值就是字符串类型。

这一点能满足我们的需求。

但是xpath这个插件,我认为存在的问题是:它不能根据json value的类型,自定推断出指标的类型。
举例,kafka中,字段altitude的值是整型,而解析后altitude的值类型变成字符串了(我没有显式指定字段altitude的类型,默认就会当作字符串处理)。
我们的业务中有几百个指标,指标随时会新增,每次新增指标都需要在这里显示指定类型,非常不方便。

方案二

使用telegraf中的json_v2插件:

data_format = "json_v2"
  [[inputs.kafka_consumer.json_v2]]
      measurement_name = "device_metric"
      timestamp_path = 'timestamp'
      timestamp_format = 'unix_ms'
      timestamp_timezone = 'Asia/Shanghai'
      [[inputs.kafka_consumer.json_v2.object]]
        path = "parameters"
        tags = ["customerId", "deviceId"]

解析得到的数据格式如下:

device_metric,host=xushengbindeMacBook-Pro.local acc=0,locationStatus=1,altitude=22,loc_lng=118.597916,loc_lat=24.838415,latitude=24.838415,brushState=0,speed=84,direction=150,height=22,longitude=118.597916,mileage=1288 1705651699000000000

相比前面的xpath插件,这个插件的优点是能自动推断数据类型(json中数据类型是啥,解析之后的数据类型还是啥)。

For each field you have the option to define the types. The following rules are in place for this configuration:

  • If a type is explicitly defined, the parser will enforce this type and convert the data to the defined type if possible. If the type can't be converted then the parser will fail.
  • If a type isn't defined, the parser will use the default type defined in the JSON (int, float, string)

但是呢,对于json中的object,官方是这么说的:

  • Array: Every element in an array is treated as a separate line protocol
  • Object: Every key/value in a object is treated as a single line protocol

从结果看,意思就是把loc对象又进行了解析,每一个key算作一个指标。loc_lng=118.597916,loc_lat=24.838415
这一点不符合预期。怎么办呢。 花了好大功能,找到了解决办法。

首先,https://github.com/influxdata/telegraf/tree/master/plugins/parsers/json_v2 中的配置,底层使用的是gjson库,它的语法参考:https://github.com/tidwall/gjson/blob/v1.7.5/SYNTAX.md

查阅这个文档,看到如下介绍:
image
它实际上支持构造新的对象。
并且也支持类型转换(把构造的对象转换成string类型)。https://github.com/tidwall/gjson/blob/v1.7.5/SYNTAX.md
image

顺着这个思路,我修改了配置:

data_format = "json_v2"
  [[inputs.kafka_consumer.json_v2]]
      measurement_name = "device_metric"
      timestamp_path = 'timestamp'
      timestamp_format = 'unix_ms'
      timestamp_timezone = 'Asia/Shanghai'
      [[inputs.kafka_consumer.json_v2.object]]
        excluded_keys = ["loc"]
        path = "parameters"
        tags = ["customerId", "deviceId"]
      [[inputs.kafka_consumer.json_v2.field]]
        path = "{parameters.latitude,parameters.longitude}|@tostr"
        type = "string"
        rename = "loc"

两点调整:
1、默认插件会把parameters中的所有key都解析成指标,因此这里我利用excluded_keys,排除掉loc,默认不解析loc
2、利用"{parameters.latitude,parameters.longitude}|@tostr" 构造了新的loc字符串。

解析后的数据,结果如下:

device_metric,host=xushengbindeMacBook-Pro.local acc=1,locationStatus=1,altitude=77,latitude=31.95914,brushState=0,speed=0,direction=0,height=77,longitude=117.26341,mileage=45322,loc="{\"latitude\":31.95914,\"longitude\":117.26341}" 1705651750000000000

很完美。
1、解析后的类型和json中的值类型保持一致
2、loc的值变成一个字符串,适配influxdb中的field value type。

写到这里,我意识到,实际上可以有更简单的写法:

data_format = "json_v2"
  [[inputs.kafka_consumer.json_v2]]
      measurement_name = "device_metric"
      timestamp_path = 'timestamp'
      timestamp_format = 'unix_ms'
      timestamp_timezone = 'Asia/Shanghai'
      [[inputs.kafka_consumer.json_v2.object]]
        excluded_keys = ["loc"]
        path = "parameters"
        tags = ["customerId", "deviceId"]
      [[inputs.kafka_consumer.json_v2.field]]
        path = "parameters.loc|@tostr"
        type = "string"
        rename = "loc"

解析后的结果和上面一致,符合预期。

telegraf 使用技巧

1、在配置文件最顶层,加入[[processors.printer]] 就可以输出解析后的结果

标签:loc,telegraf,parameters,timestamp,v2,kafka,嵌套,json
From: https://www.cnblogs.com/xushengbin/p/17974981

相关文章

  • Python将JSON以表格数据格式导出
      本文介绍基于Python语言,读取JSON格式的数据,提取其中的指定内容,并将提取到的数据保存到.csv格式或.xlsx格式的表格文件中的方法。  JSON格式的数据在数据信息交换过程中经常使用,但是相对而言并不直观;因此,有时我们希望将JSON格式的数据转换为Excel表格文件数据;这里就介绍一下......
  • net8操作appsettings.json类
    1、添回操作类文件AppSettings.csusingMicrosoft.Extensions.Configuration.Json;namespaceYYApi.Helper{///<summary>///appsettings.json操作类///</summary>publicclassAppSettings{publicstaticIConfigurationConfigu......
  • Json转换工具类(基于google的Gson和阿里的fastjson)
    在项目之中我们经常会涉及到字符串和各种对象的转换,为此特地整理了一下常用的转换方法一、基于com.google.code.gson封装的json转换工具类 1.在pom.xml文件里面引入gson的依赖<dependency><groupId>com.google.code.gson</groupId><arti......
  • 记录 | vscode json美化插件JSON Tools
    安装插件JSONTools原来的json的样子:JSONTools美化/格式化快捷键Ctrl+Alt+M(windows)/Command+Option+M(Mac),然后效果如下:......
  • 记录 | conda报错:conda json.decoder.JSONDecodeError: Expecting value: line 1 colu
    condacreate的时候报错:condajson.decoder.JSONDecodeError:Expectingvalue:line1column1(char0)解决办法:condaclean-i......
  • 数据探索之道:查询Web API数据中的JSON字符串列
    前言在当今数据驱动的时代,对数据进行探索和分析变得愈发关键。WebAPI作为广泛应用的数据源,提供了丰富的信息和资源。然而,面对包含JSON字符串列的WebAPI数据时,我们常常遇到一个挑战:如何高效灵活地处理和查询这些数据?这个问题在数据探索和提取过程中频繁出现。因此小编今天以葡萄......
  • vue3+lottie-web加载json格式动画
    项目中要用动画设计说gif会失真,用json格式动画吧。我虎躯一震,json格式动画什么鬼?lottie库什么鬼。。。。不废话,直接上重点环境:编辑器webstorm,前端技术栈vue3+vite+ts安装lottie-webyarnaddlottie-web引入lottie,引入json格式动画文件.importlottiefrom'lott......
  • 05.接口请求体 - JSON
    目录 接口请求体JSON格式请求体介绍如何构造JSON格式请求体接口请求体简介 进行HTTP请求时,发送给服务器的数据。数据格式类型可以是JSON、XML、文本、图像等格式。请求体的格式和内容取决于服务器端API的设计和开发人员的要求。飞书接口文档常用接......
  • 07.JSON 响应体断言
    什么是JSON响应体 JSON格式的响应体指的是HTTP响应中的消息体(messagebody),它是以JSON格式编码的数据。{"name":"John","age":30,"city":"NewYork"}断言JSON格式响应体使用场景 验证API接口的返回结果是否符合预期。业务场景上是否符合预......
  • vue-element-admin/litemall后端,超过两级嵌套路由无法缓存的问题
    本文主要针对的是litemall后端,vue-element-admin只需稍作修改应该也可以适用。路由扁平的思路主要来源于https://blog.csdn.net/weixin_40119256/article/details/111475571,另外解决面包屑显示问题,此文章作记录以供有需要的同行参考。keep-alive用于缓存不活跃的组件实例,避免重......