kafka的模拟数据
{"from":{"province":"内蒙古自治区","city":"锡林郭勒盟"},"to":{"province":"西藏自治区","city":"日喀则"},"time":"2021-07-16 17:07:40.121","value":3291}
flink消费kafka数据
//3.对数据源的数据进行Transformation计算
val result = dataSource.map(line => {
val From=JSON.parseObject(line).getString("from")
val To=JSON.parseObject(line).getString("to")
val Value=JSON.parseObject(line).getString("value")
val Time=JSON.parseObject(line).getString("time")
var successMsg = From + "|" + To //消息汇总
successMsg
})
.map((_, 1))
.keyBy(_._1)
.sum(1)//组合通过做keyby
遇到的问题,项目要求根据城市算出省的总数,但是我只会算省的总数,我算的是辽宁省沈阳市,辽宁省葫芦岛市 ,要求是根据城市 葫芦岛,沈阳 得出辽宁省有2个from
头要秃了 .thank thank thank
json封装的时候,把城市封装在省份的下一级,通过遍历城市就可以计算。