flink的streamAPI 中 intervalJoin,对未关联上的数据如何旁路输出?
Apache Flink的Stream API提供了一个intervalJoin操作,可以在两个数据流之间进行时间间隔关联。在intervalJoin操作中,如果某条数据在指定的时间间隔内没有匹配的数据,则该数据被视为未关联上的数据。
为了处理未关联上的数据,Flink提供了一个withRichJoinFunction API,您可以使用它来实现自定义的关联处理逻辑,并在必要时将未关联的数据作为侧边流进行输出。
代码示例:
DataStream<Tuple2<Integer, Integer>> left = ...;
DataStream<Tuple2<Integer, Integer>> right = ...;
left.keyBy(0)
.intervalJoin(right.keyBy(0))
.between(Time.seconds(-5), Time.seconds(5))
.withRichJoinFunction((first, second, out) -> {
if (second == null) {
out.collect(new Tuple2<>(first.f0, first.f1));
} else {
out.collect(new Tuple2<>(first.f0, first.f1 + second.f1));
}
})
.print();
在上面的代码中,如果second为null,说明在指定的时间间隔内没有匹配的数据,因此该数据作为未关联的数据被输出。