flink json对象嵌套,算聚合

kafka的模拟数据

{"from":{"province":"内蒙古自治区","city":"锡林郭勒盟"},"to":{"province":"西藏自治区","city":"日喀则"},"time":"2021-07-16 17:07:40.121","value":3291}

flink消费kafka数据

 //3.对数据源的数据进行Transformation计算
    val result = dataSource.map(line => {
      val From=JSON.parseObject(line).getString("from")
      val To=JSON.parseObject(line).getString("to")
      val Value=JSON.parseObject(line).getString("value")
      val Time=JSON.parseObject(line).getString("time")
      var successMsg = From + "|" + To  //消息汇总
      successMsg
    })
      .map((_, 1))
      .keyBy(_._1)
      .sum(1)//组合通过做keyby

遇到的问题,项目要求根据城市算出省的总数,但是我只会算省的总数,我算的是辽宁省沈阳市,辽宁省葫芦岛市 ,要求是根据城市 葫芦岛,沈阳 得出辽宁省有2个from

头要秃了 .thank thank thank

json封装的时候,把城市封装在省份的下一级,通过遍历城市就可以计算。