env.socketTextStream("10.3.87.23", 6666).assignAscendingTimestamps(_.times) //事务时间
.keyBy(k => (k.db, k.tbl))
.window(SlidingEventTimeWindows.of(Time.days(3), Time.days(1)))
.reduce((x, y) => {
TableCall(x.db, x.tbl, x.conuts + 1, y.times)})
.print()
{"timestamp":"2021-08-01 15:31:56,895","msg":" INFO org.apache.hadoop.hive.metastore.HiveMetaStore: [pool-9-thread-97]: 88: get_table : db=hivetest tbl=chinese_part"}
{"timestamp":"2021-08-02 15:31:56,895","msg":" INFO org.apache.hadoop.hive.metastore.HiveMetaStore: [pool-9-thread-97]: 88: get_table : db=hivetest tbl=chinese_part"}
{"timestamp":"2021-08-03 15:31:56,895","msg":" INFO org.apache.hadoop.hive.metastore.HiveMetaStore: [pool-9-thread-97]: 88: get_table : db=hivetest tbl=chinese_part"}
我已经输入了三天的数据了。为什么没有输出。写jdbc也不行啊,就算输入了第四天的数据还是没有输出,等我结束了nc就有输出了。换kafka source有输出了。但是都输入好好多天的数据才有输出。也不知道应该输入多少天的数据才会输出