看不懂源码,但是打断点发现好像是ack方法执行了两次,不知道为什么会执行两次ack方法
这是配置代码
#rabbit配置
spring:
rabbitmq:
# rabbit地址
host: 127.0.0.1
# rabbit端口号
port: 5672
# 用户账号和密码
username: guest
password: guest
#rabbit项目名,每个virtualHost的队列是隔离的,相当于数据库
virtual-host: /rabbit
#开启Publisher Confirms 模式,消息发送到交换器后触发回调。
#publisher-confirms: true
#开启PublisherReturn 模式,交换机将消息发送到对应队列失败时触发
#publisher-returns: true
listener:
direct:
#设置监听为手动答应模式
acknowledge-mode: manual
这是绑定交换机代码
@Configuration
public class RabbitConfig {
/**
* 将队列绑定到交换机中,并绑定路由
* @return
*/
@Bean
public Binding bindingExchange() {
return BindingBuilder.bind(queue()).to(directExchange()).with(RabbitEnum.RABBIT.getRoutingKey());
}
/**
* 创建队列
* @return
*/
@Bean
public Queue queue() {
return new Queue(RabbitEnum.RABBIT.getQueue(),true);
}
/**
* 创建交换机
* @return
*/
@Bean
public DirectExchange directExchange() {
return new DirectExchange(RabbitEnum.RABBIT.getDirectExchange(), true, false);
}
}
这是生产者发布消息和消费者消费消息的代码
@Service
public class PushService {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 推送方法
*/
public void push(){
System.out.println("开始推送");
//将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
rabbitTemplate.convertAndSend(RabbitEnum.RABBIT.getDirectExchange(), RabbitEnum.RABBIT.getRoutingKey(),"helloWord", new CorrelationData("1"));
System.out.println("结束推送");
}
/**
* 推送队列的监听
* 注解中为监听的队列
* @param
* @param channel
* @param tag
* @throws IOException
*/
@RabbitListener(queues = "rabbit.direct.queue.push")
public void processDirect(String str, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
System.out.println("收到消息");
System.out.println(str);
channel.basicAck(tag,true);
}
}
这是消费完成后报的异常
ERROR 12656 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
在使用 Spring Boot 和 RabbitMQ 进行队列测试时,当将消息确认模式配置为手动 ACK(acknowledgment)时,消息被 ACK 两次的可能原因如下:
消息重复消费:当消息处理过程中发生异常或处理时间过长时,消费者可能会重新消费相同的消息。这可能导致消息在处理完成后再次确认,从而导致消息被 ACK 两次。在确认之前,应确保消息处理的幂等性,以避免重复处理相同的消息。
消费者重启:如果消费者应用程序在消息处理期间意外终止并重新启动,尚未确认的消息可能会被重新分发给新的消费者实例。新的消费者实例会对之前未确认的消息进行确认,导致消息被 ACK 两次。
多个消费者实例:如果存在多个消费者实例监听同一个队列,并且配置了手动 ACK,那么当消息被投递到队列时,每个消费者实例都有机会接收并处理该消息。因此,如果多个消费者实例都对消息进行了确认,那么消息就会被 ACK 多次。
解决此问题的方法包括:
确保消息处理的幂等性:在消费者应用程序中实现幂等性,即无论收到相同的消息多少次,最终的结果应该是一致的。这样即使消息被重复处理或重新消费,也不会产生错误的影响。
避免消费者重启时的重复处理:可以在消费者应用程序中记录已经处理的消息的唯一标识,例如消息的 ID 或业务相关的唯一标识。在重新启动时,检查已经处理过的消息,并跳过重复的消息。
确保只有一个消费者实例消费消息:在 RabbitMQ 配置中,可以使用队列的 Exclusive 模式确保只有一个消费者实例监听队列。这样可以避免多个消费者实例同时处理同一消息。