如何使用nifi的JoltTransformJSON 处理复杂json数据

如何使用nifi的JoltTransformJSON 处理复杂json数据,输入:

{
    "nodeIdStr": "telemetry_testone",
    "subscriptionIdStr": "subscription1",
    "sensorPath": "123",
    "collectionId": "1269",
    "collectionStartTime": "1684297683311",
    "msgTimestamp": "1684297683344",
    "dataGpb": [{
        "timestamp": "1684297683312",
        "content": {
            "interfaces": {
                "interface": [{
                    "name": "GigabitEthernet0/2/28",
                    "mib-statistics": {
                        "receive-byte": "58769780",
                        "send-byte": "1024662",
                        "receive-packet": "533149",
                        "send-packet": "8581",
                        "receive-unicast-packet": "114270",
                        "receive-multicast-packet": "13292",
                        "receive-broad-packet": "405587",
                        "send-unicast-packet": "8581"
                    }
                }]
            }
        }
    }, {
        "timestamp": "1684297683312",
        "content": {
            "interfaces": {
                "interface": [{
                    "name": "GigabitEthernet0/2/2",
                    "mib-statistics": {
                        "receive-byte": "27047302",
                        "send-byte": "32103826",
                        "receive-packet": "163025",
                        "send-packet": "86680",
                        "receive-unicast-packet": "30369",
                        "receive-multicast-packet": "31020",
                        "receive-broad-packet": "101636",
                        "send-unicast-packet": "86680"
                    }
                }]
            }
        }
    }],
    "collectionEndTime": "1684297683312",
    "currentPeriod": 60000,
    "exceptDesc": "OK",
    "productName": "NetEngine 8000 M1A",
    "protoPath": "huawei_ifm.Ifm",
    "softwareVersion": "V800R021C00SPC100"
}

输出:


```bash
[{
        "nodeIdStr": "telemetry_testone",
        "subscriptionIdStr": "subscription1",
        "sensorPath": "123",
        "collectionId": "1269",
        "collectionStartTime": "1684297683311",
        "msgTimestamp": "1684297683344",

        "timestamp": "1684297683312",

        "name": "GigabitEthernet0/2/28",

        "receive-byte": "58769780",
        "send-byte": "1024662",
        "receive-packet": "533149",
        "send-packet": "8581",
        "receive-unicast-packet": "114270",
        "receive-multicast-packet": "13292",
        "receive-broad-packet": "405587",
        "send-unicast-packet": "8581"
        "collectionEndTime": "1684297683312",
        "currentPeriod": 60000,
        "exceptDesc": "OK",
        "productName": "NetEngine 8000 M1A",
        "protoPath": "huawei_ifm.Ifm",
        "softwareVersion": "V800R021C00SPC100"
                
    }, {
        "nodeIdStr": "telemetry_testone",
        "subscriptionIdStr": "subscription1",
        "sensorPath": "123",
        "collectionId": "1269",
        "collectionStartTime": "1684297683311",
        "msgTimestamp": "1684297683344",

        "timestamp": "1684297683312",

        "name": "GigabitEthernet0/2/2",

        "receive-byte": "27047302",
        "send-byte": "32103826",
        "receive-packet": "163025",
        "send-packet": "86680",
        "receive-unicast-packet": "30369",
        "receive-multicast-packet": "31020",
        "receive-broad-packet": "101636",
        "send-unicast-packet": "86680"
        "collectionEndTime": "1684297683312",
        "currentPeriod": 60000,
        "exceptDesc": "OK",
        "productName": "NetEngine 8000 M1A",
        "protoPath": "huawei_ifm.Ifm",
        "softwareVersion": "V800R021C00SPC100"
            
    }
    
]

请问如何配置?

该回答引用GPT与博主@晓码自在合作编写:

要使用NiFi的JoltTransformJSON处理这段复杂的JSON数据,您需要以下几个步骤:

  1. 添加GetFileProcessor,选择要处理的JSON文件作为输入流。
  2. 添加EvaluateJsonPathProcessor,使用JSON路径表达式提取要处理的JSON节点。可以提取dataGpb数组中的每个对象。
  3. 添加JoltTransformJSON Processor,对每个对象进行变换。配置如下:
json
{
  "spec": {
    "content": {
      "timestamp": "$.timestamp", 
      "name": "$.content.interfaces.interface.name",
      "receive-byte": "$.content.interfaces.interface.mib-statistics.receive-byte",
      "send-byte": "$.content.interfaces.interface.mib-statistics.send-byte",
      "//...": "//..."//省略其他类似配置
    },
    "combine": {
      "nodeIdStr": "=$[0].nodeIdStr",
      "subscriptionIdStr": "=$[0].subscriptionIdStr",
      "sensorPath": "=$[0].sensorPath",
      "collectionId": "=$[0].collectionId",  
      "collectionStartTime": "=$[0].collectionStartTime",
      "msgTimestamp":"=$[0].msgTimestamp" ,
           "//...": "//..."//省略其他类似配置
    }
  } 
}

该配置使用JSON路径表达式提取每个对象中的内容,然后使用combine关键字提取第一个对象中的其他信息,组合到最终的输出结果中。

  1. 添加PutFileProcessor,输出最终的JSON数据结果。

  2. 连接各个Processor,即可完成数据的提取、转换和输出。

通过上述步骤,可以使用JoltTransformJSON对复杂的JSON数据进行分解和重组,输出想要的格式结果。我强烈建议您可以根据上述思路开发一个简单的NiFi流,实现该JSON数据的处理,以便加深理解。只有大量实践,才能熟练掌握JoltTransformJSON的配置和用法。