rocketmq初始化生产者长连接疑问

img

疑问 :在@bean里面初始一次生产者长连接后,如果在使用过程中,bean里面的长连接超时或空闲自动断了,那在再次调用的时候怎么办呢?

  • 你可以参考下这个问题的回答, 看看是否对你有帮助, 链接: https://ask.csdn.net/questions/774388
  • 这篇博客你也可以参考下:精华推荐 | 【深入浅出RocketMQ原理及实战】「性能原理挖掘系列」透彻剖析贯穿RocketMQ的事务性消息的底层原理并在分析其实际开发场景
  • 除此之外, 这篇博客: RocketMQ(十):rocketMQ的幂等性是什么?解决了什么问题?幂等性的两种解决方案的优势和缺点+rocketmq的死信队列?中的 3.死信队列(就是咋咋都没处理的消息)【还是老规矩,上个图先】 部分也许能够解决你的问题, 你可以仔细阅读以下内容或跳转源博客中阅读:
  • rocketmq之死信队列
    1.如果优惠券系统的数据库宕机,会咋样?

    《爱咋咋没工夫伺候了还!》

    2.数据库宕机的时候,是否可以返回CONSUME_SUCCESS?

    consumer.registerMessageListener(new MessageListenerConcurrently(){
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(
    List<MessageExt> msgs,
    ConsumeConcurrentlyContext context
    ){
    //这里对获取到的msg订单消息进行处理
    //例如增加几份,发送优惠卷,通知发货等等
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS
    }
    })
    

    一旦返回成功,下一次就会处理下一批消息,但是这批消息实际上是没有处理成功的,所以消息就直接丢失了;

    3.如果对消息的处理有异常,可以返回reconsume_later状态

    consumer.registerMessageListener(new MessageListenerConcurrently(){
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(
    List<MessageExt> msgs,
    ConsumeConcurrentlyContext context
    ){
    
    try{
     
    //这里对获取到的msg订单消息进行处理
    //例如增加几份,发送优惠卷,通知发货等等
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS}
    catch(Exception e){
    //如果因为数据库宕机,对消息处理失败了
    //返回一个稍后重试消费状态later
    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
    }
    })
    

    如果消息处理失败了,就返回RESONSUME_LATER状态,让rocketMQ稍后再重新将这批消息给我,稍后重试对这批消息进行处理;

    4.rocketmq是如何让你进行消费重试的?

    简单来说rocketmq会有一个针对consumerGroup的重试队列,如果返回了RECONSUME_LATER状态,他就会将这批消息放到这个消费组的重试队列中,假设消费组的名称“VoucherConsumerGroup”,那么它会有一个“%RERAY%VoucherConsumerGroup”这个名字的重试队列,过一段时间之后,重试队列中的消息会再次给消费者进行消费,如果再次处理失败,又返回了reconsume_later,那么再过一段时间又会重新进入重试,默认最多是可以重试16次,每次重试的时间不一样,间隔时间可以如下配置:messageDelayLevel = 1s 5s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

    5.如果连续重试16次还是无法处理消息?怎么办?

    如果连续重试16次还是无法处理成功,这个时候就要进入另外一个队列了,叫做死信队列,所谓的死信队列,顾名思义就是死掉的消息放入这个队列中,死信队列名**“%DLQ%VoucherConsumerGroup”**,在rocketMQ的管理后台上是可以看到的,

    6.对死信队列中的消息,如何处理?

    具体看使用场景,例如专门开一个后台的线程,就是定于“%DLQ%VoucherConsumerGroup”这个死信队列,对死信队列中的消息还是一直不停的进行重试机制。

    7.消息处理失败场景下的方案总结

    消费者底层一些依赖可能有故障了,比如数据库宕机,缓存宕机之类的,此时就没有办法完成消息的处理了,那么就可以通过一些返回状态去让消息进入rocketMQ自带的重试队列,同时如果反复重试还是不行,就可以让消息进入rocketMQ自带的死信队列,后续针对死信队列中的消息进行单独处理就可以了。

    这个章节分享的差不多了,谢谢大家的耐心观看,表达错误的地方,还望评论交流;

  • 您还可以看一下 林祥纤老师的分布式事务解决方案「手写代码」课程中的 使用RocketMQ实战可靠消息最终一致性分布式事务:RocketMQ安装小节, 巩固相关知识点