rabbitmq获取id序号时:
message.getMessageProperties().getHeaders().get("spring_returned_message_correlation"); 消息序号id取值为null
//发送消息
rabbitTemplate.convertAndSend()
...
自己Debug..
然后走到RabbitTemplate类中发现
long nextPublishSeqNo = channel.getNextPublishSeqNo(); 值为0
但是不知道为啥,
求解答..
在《rabbitMq事务消息方案》中增加了发送任务表rabbit_producer_record,如果发送成功后,不清理发送记录,是可以用该表来实现“获取组内的上一个消息id”的目的。但是出于数据量的考虑,发送成功后,还是删除更好。
额外再增加一张表producer_sequence_record来保存顺序消息记录:
CREATE TABLE `producer_sequence_record` (
`id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '消息id',
`message_id` VARCHAR(50) NOT NULL COMMENT '消息id',
`application` VARCHAR(50) NOT NULL COMMENT '应用名称',
`group_name` VARCHAR(50) NOT NULL COMMENT '消息分组,同分组内,消息序号按发送顺序递增',
`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`),
UNIQUE KEY `idx_message_app` (`message_id`, `application`),
KEY `idx_app_group` (`application`, `group_name`),
KEY `idx_create_time` (`create_time`)
) ENGINE = InnoDB DEFAULT CHARACTER SET = utf8mb4 COMMENT = '顺序mq的记录表';
当查询“组内的上一个消息id”时,可以用这样的语句select max(id) where application=? and group_name=? and id < ?
发送消息的时候,会判断消息类型,如果消息类型是顺序消息,就会执行上述sql,得到id,再根据id查producer_sequence_record的整行记录,得到message_id。
具体代码可以看com.zidongxiangxi.reliablemq.producer.service.DefaultRabbitProducerServiceImpl
public class DefaultRabbitProducerServiceImpl implements RabbitProducerService {
......
@Async
@Override
public void send(RabbitProducer producer) {
try {
Message message;
if (Objects.nonNull(sequenceRecordManager)
&& Objects.equals(producer.getType(), MessageTypeEnum.SEQUENCE.getValue())) {
// 从sequenceRecordManager获取上个消息id
String previousMessageId = sequenceRecordManager.getPreviousMessageId(producer.getMessageId(),
producer.getApplication());
message = RabbitUtils.generateMessage(producer, previousMessageId);
} else {
message = RabbitUtils.generateMessage(producer);
}
CorrelationData correlationData = RabbitUtils.generateCorrelationData(producer);
rabbitTemplate.send(producer.getExchange(), producer.getRoutingKey(), message, correlationData);
} catch (Throwable throwable) {
log.error("send message failed producer={}", producer.toString(), throwable);
if (Objects.nonNull(alarm)) {
alarm.failWhenProduce(JSON.toJSONString(producer), throwable);
}
}
}
}
传了previousMessageId 参数,RabbitUtils.generateMessage方法就会在构造Message对象的时候,往消息头设置previousMessageId。