DataStream<ItemViewCount> windowAggStream = dataStream
.filter(data -> "pv".equals(data.getBehavior())) // 过滤pv行为
.keyBy("itemId") // 按商品ID分组
.timeWindow(Time.hours(1), Time.minutes(5)) // 开滑窗
.aggregate(new ItemCountAgg(), new WindowItemCountResult());
public static class ItemCountAgg implements AggregateFunction<UserBehavior, Long, Long> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(UserBehavior value, Long accumulator) {
return accumulator + 1;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long a, Long b) {
return a + b;
}
}
public static class WindowItemCountResult implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> {
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Long> input, Collector<ItemViewCount> out) throws Exception {
Long itemId = tuple.getField(0);
Long windowEnd = window.getEnd();
Long count = input.iterator().next();
out.collect(new ItemViewCount(itemId, windowEnd, count));
}
}
上面是截取的代码,运行报错:
Error:(62, 17) java: 对于aggregate(com.zqs.flink.project.hotitemanalysis.HotItems.ItemCountAgg,com.zqs.flink.project.hotitemanalysis.HotItems.WindowItemCountResult), 找不到合适的方法
方法 org.apache.flink.streaming.api.datastream.WindowedStream.<ACC,R>aggregate(org.apache.flink.api.common.functions.AggregateFunction<com.zqs.flink.project.hotitemanalysis.beans.UserBehavior,ACC,R>)不适用
(无法推断类型变量 ACC,R
(实际参数列表和形式参数列表长度不同))
你好,我是有问必答小助手,非常抱歉,本次您提出的有问必答问题,技术专家团超时未为您做出解答
本次提问扣除的有问必答次数,将会以问答VIP体验卡(1次有问必答机会、商城购买实体图书享受95折优惠)的形式为您补发到账户。
因为有问必答VIP体验卡有效期仅有1天,您在需要使用的时候【私信】联系我,我会为您补发。