Flink任务在由不可预测异常导致任务挂掉,然后自动重新提交之后会重复消费kafka里的数据怎么解决?

如代码所示,同一个flink任务里有多个处理不同数据流的flatMap,从kafka不同的topic中取数据进行处理,其中stream1有不可预测的异常没有抓住,或者因为环境问题导致整个任务被cancel,然后自动重新启动,重启任务之后都会概率性出现有数条kafka里的数据被重复处理,有木有大神知道怎么解决啊?

StreamExecutionEnvitoment env = StreamExecutionEnvitoment.getExecutionEnvitoment();
env.enableCheckpointing(5000);

FlinkKafkaConsumer011<String> myConsumer1 = new FlinkKafkaConsumer011<>(topic1, Charset.forName("ISO8859-1"), prop);
myConsumer1.setStartFromLatest();
DataStream<String> stream1 = env.addSource(myConsumer).name(topic);
stream1.flatMap(new Handle1()).setParallelism(1).setMaxParallelism(1).addSink(new Sink1());

FlinkKafkaConsumer011<String> myConsumer2 = new FlinkKafkaConsumer011<>(topic2, Charset.forName("ISO8859-1"), prop);
myConsumer2.setStartFromLatest();
DataStream<String> stream2 = env.addSource(myConsumer).name(topic);
stream2.flatMap(new Handle2()).setParallelism(1).setMaxParallelism(1).addSink(new Sink2());

flink支持失败点恢复机制