rabbitmq消息生成id为空

rabbitmq获取id序号时:
message.getMessageProperties().getHeaders().get("spring_returned_message_correlation"); 消息序号id取值为null

 //发送消息
        rabbitTemplate.convertAndSend()
...
自己Debug..
然后走到RabbitTemplate类中发现
long nextPublishSeqNo = channel.getNextPublishSeqNo();  值为0
但是不知道为啥,
求解答..
  • 这有个类似的问题, 你可以参考下: https://ask.csdn.net/questions/7504757
  • 这篇博客也不错, 你可以看下RabbitMQ学习系列:一、RabbitMQ 的安装
  • 除此之外, 这篇博客: rabbitMq顺序消费方案中的 发送的时,如何获取组内的上一个消息id 部分也许能够解决你的问题, 你可以仔细阅读以下内容或跳转源博客中阅读:
  • 在《rabbitMq事务消息方案》中增加了发送任务表rabbit_producer_record,如果发送成功后,不清理发送记录,是可以用该表来实现“获取组内的上一个消息id”的目的。但是出于数据量的考虑,发送成功后,还是删除更好。
    额外再增加一张表producer_sequence_record来保存顺序消息记录:

    CREATE TABLE `producer_sequence_record` (
      `id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '消息id',
      `message_id` VARCHAR(50) NOT NULL COMMENT '消息id',
      `application` VARCHAR(50) NOT NULL COMMENT '应用名称',
      `group_name` VARCHAR(50) NOT NULL COMMENT '消息分组,同分组内,消息序号按发送顺序递增',
      `create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
      `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
      PRIMARY KEY (`id`),
      UNIQUE KEY `idx_message_app` (`message_id`, `application`),
      KEY `idx_app_group` (`application`, `group_name`),
      KEY `idx_create_time` (`create_time`)
    ) ENGINE = InnoDB DEFAULT CHARACTER SET = utf8mb4 COMMENT = '顺序mq的记录表';
    

    当查询“组内的上一个消息id”时,可以用这样的语句select max(id) where application=? and group_name=? and id < ?
    发送消息的时候,会判断消息类型,如果消息类型是顺序消息,就会执行上述sql,得到id,再根据id查producer_sequence_record的整行记录,得到message_id。
    具体代码可以看com.zidongxiangxi.reliablemq.producer.service.DefaultRabbitProducerServiceImpl

    public class DefaultRabbitProducerServiceImpl implements RabbitProducerService {
    	......
    	
    	@Async
        @Override
        public void send(RabbitProducer producer) {
            try {
                Message message;
                if (Objects.nonNull(sequenceRecordManager)
                    && Objects.equals(producer.getType(), MessageTypeEnum.SEQUENCE.getValue())) {
                    // 从sequenceRecordManager获取上个消息id
                    String previousMessageId = sequenceRecordManager.getPreviousMessageId(producer.getMessageId(),
                        producer.getApplication());
                    message = RabbitUtils.generateMessage(producer, previousMessageId);
                } else {
                    message = RabbitUtils.generateMessage(producer);
                }
                CorrelationData correlationData = RabbitUtils.generateCorrelationData(producer);
                rabbitTemplate.send(producer.getExchange(), producer.getRoutingKey(), message, correlationData);
            } catch (Throwable throwable) {
                log.error("send message failed producer={}", producer.toString(), throwable);
                if (Objects.nonNull(alarm)) {
                    alarm.failWhenProduce(JSON.toJSONString(producer), throwable);
                }
            }
        }
    }
    

    传了previousMessageId 参数,RabbitUtils.generateMessage方法就会在构造Message对象的时候,往消息头设置previousMessageId。

  • 您还可以看一下 Array老师老师的RabbitMQ3.8.x单机+集群搭建教程课程中的 RabbitMQ的三种模式:单机,默认,镜像小节, 巩固相关知识点