SparkStream通过mapWithState设置timeOut无效

问题描述:SparkStream在使用mapWithState时,无论设置timeout为多久(30s,3min,4min。。。),过了该超时时间后,该批信令数据依然会发送,没有被移除,而是过了9分钟后才会移除,
感觉该时间范围没有起到作用。

业务场景:一批业务实时信令数据,如果在预置时间范围内(即该用户保持沉默)没有上报。则系统会认为该用户一直在当前位置(即会将该用户的信令重复上报),超过该时间范围则将其移除。

业务实现:通过使用mapWithState以及其timeout特性设置,将过期的缓存信令移除。

代码片段:如下

//通过KafkaUtils.createDirectStream(...)获得kafka数据,kafka相关参数由kafkaParams指定   
        JavaInputDStream<ConsumerRecord<String,String>> kafkaStream = KafkaUtils.createDirectStream(
                jssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.Subscribe(topicsSet, kafkaParams)
        );
        //使用map先对InputDStream进行取值操作            
        JavaDStream<String> lines=kafkaStream.map(new Function<ConsumerRecord<String, String>, String>() {
                    @Override
                    public String call(ConsumerRecord<String, String> consumerRecord) throws Exception {
                        String line=consumerRecord.value();                            
                        return line;
                    }
        });
        lines.print();          


        // 只更新数值变更的数据
        Function3<String, Optional<String>, State<String>,String> mappingFunc =
                (line, preLine, state) -> {                       
                String returnLine =state.exists() ? state.get() : line;
                return returnLine;
                }

         // DStream made of get cumulative counts that get updated in every batch
        JavaMapWithStateDStream<String, String, String, String> stateDstream =
                pairs.mapWithState(StateSpec.function(mappingFunc).timeout(Durations.minutes(3)));//3分钟后缓存的key过期
        JavaPairDStream<String,String> fullStateDstream=stateDstream.stateSnapshots();//获取所有的未过期的key


        //遍历DStream,并转换成RDD
            stateDstream.foreachRDD(new VoidFunction<JavaPairRDD<String, String>>() {
               ......
                System.out.printlin("=====>"+line);
               ......
            }   

写了一篇文章,关于SparkStreaming使用mapWithState时,设置timeout()无法生效问题,解决方案和原因过程分析:

https://blog.csdn.net/CatchLight/article/details/115621071

希望对你有所帮助