【背景】
在基于spring cloud的微服务架构中,我使用spring stream集成rabbitmq,完成消息驱动场景下的业务。
其中,有2个生产者服务:p1、p2,将他们的业务处理后的消息发送到各自的Topic类型的交换机中,以供下游有需要的消费者去消费;
有2个消费者服务:c(从c队列中消费和c相关的消息内容),log(从log队列中消费和日志相关的消息内容)。
log服务按照:"log.#"规则的bindingkey来消费其绑定交换机中的所有日志类型的消息;
c服务按照:"*.c.#"规则的bindingkey来消费其绑定交换机中的所有C类型的消息;
故p1、p2采用动态routingkey的形式,按照形如:“log.c.xx”规则的routingkey发送的消息将被log和c同时消费;
整个消息传递形如:
其中生产者P1的spring stream定义:
spring:
cloud:
stream:
bindings:
# 本服务发送消息binding,对应rabbit的channel
output:
# 制定发送对应的topic(kafaka)或Exchange(RabbitMQ)
destination: p1
binder: local_rabbit
生产者P2的spring stream定义:
spring:
cloud:
stream:
bindings:
# 本服务发送消息binding,对应rabbit的channel
output:
# 制定发送对应的topic(kafaka)或Exchange(RabbitMQ)
destination: p2
binder: local_rabbit
消费者c的spring stream定义:
spring:
cloud:
stream:
bindings:
# 消费p1发送的消息
input_p1:
# 消费的exchange
destination: p1
binder: local_rabbit
# 消费的queue
group: c
# 消费p2发送的消息
input_p2:
# 消费的exchange
destination: p2
binder: local_rabbit
# 消费的queue
group: c
.......
rabbit:
bindings:
# 配置channel:“input_p1”
input_p1:
consumer:
# 绑定路由配置:将该渠道的queue绑定到exchange时使用该binding key
bindingRoutingKey: '*.c.#'
# 手动确认
acknowledgeMode: MANUAL
# 配置channel:“input_p2”
input_p2:
consumer:
# 绑定路由配置:将该渠道的queue绑定到exchange时使用该binding key
bindingRoutingKey: '*.c.#'
# 手动确认
acknowledgeMode: MANUAL
消费者log的定义类同c
【遇到的问题】
问题出现的c服务配置的两个输入通道和p1、p2之间的消息通信上:p1服务发送到交换机p1的“aa.c.xx”消息c可以消费,但p2服务发送到交换机p2的“bb.c.xx”消息c无法消费,在rabbit中显示为ready但始终在内存中未被消费的状态。
也就是是:对于c服务的c队列,它绑定到p1和p2用的bindingkey都是‘*.c.#’的表达式;而生产者p1、p2发送到各自的交换机的消息,也都是形如'xx.c.yy’的动态routingkey(比如,p1是'aa.c.xx',p2是'bb.c.xx'),为什么p1发的c就能消费到,p2发的c就消费不到呢?
请问问题在哪?应该如何改造?在微服务场景下,使用spring stream的情况下,需要exchange 和 queue的多对多匹配,有没有更好的设计?感谢!
“Devil组”引证GPT后的撰写:
以下答案由GPT-3.5大模型与博主波罗歌共同编写:
首先,对于问题的原因,可以根据你提供的路由规则进行推测。你的路由规则是"*.c.#"
,这意味着只要是以.c.
为结尾的routing key都会被该队列消费。例如,p1
发送log.c.xx
的routing key可以被consumer队列消费,是因为该队列的binding key也是"*.c.#"
,正好匹配了该消息的routing key。但是,p2
发送的log.c.xx
的routing key无法被consumer队列消费,因为consumer队列的binding key只匹配了.c.
为结尾的routing key,而p2
发送的routing key是log.c.xx
,前缀log.
不匹配该队列的binding key中的任何前缀。因此,该消息一直处于ready状态但无法被消费。
解决这个问题,可以考虑对队列的binding key进行调整,将它的规则改为"*.c.#"和"log.#"
:
spring:
cloud:
stream:
bindings:
# 消费p1发送的消息
input_p1:
# 消费的exchange
destination: p1
binder: local_rabbit
# 消费的queue
group: c
# binding key 改为 "*.c.#" 和 "log.#"
consumer:
bindingRoutingKey: "*.c.#,log.#"
acknowledgeMode: MANUAL
# 消费p2发送的消息
input_p2:
# 消费的exchange
destination: p2
binder: local_rabbit
# 消费的queue
group: c
# binding key 改为 "*.c.#" 和 "log.#"
consumer:
bindingRoutingKey: "*.c.#,log.#"
acknowledgeMode: MANUAL
改变后的规则可以同时匹配以.c.
为结尾和以.c.
和以.log.
为前缀的routing key,能够正确地消费两个生产者发送的消息。
对于你的第二个问题,关于微服务场景下的exchange和queue的多对多匹配,Spring Cloud Stream提供了很好的支持。在这种情况下,每个生产者可以发送消息到它自己的Exchange,而消费者可以订阅它们感兴趣的Exchange或者队列。这种拓扑结构和路由规则的灵活性是由Spring Cloud Stream的Binder实现提供的,你可以通过修改Binder的配置来控制消息传递的路由规则和绑定。例如,RabbitMQ实现的Binder默认使用Direct Exchange作为消息的路由方式,可以通过配置RabbitMQ Exchange的类型和binding参数来控制消息的路由规则。针对你的使用场景,建议参考Spring Cloud Stream和Binder的官方文档,灵活配置你的消息传递拓扑结构和路由规则。
如果我的回答解决了您的问题,请采纳!
参考GPT和自己的思路:根据您的描述,我认为问题可能在于您的Exchange和Binding的配置中存在一些问题。
您的两个生产者服务(p1和p2)都发送消息到各自的Topic类型的交换机中。但是您的消费者服务(c和log)都绑定到同一个队列(c队列)上,这样就会存在以下问题:
c队列只能同时消费p1和p2发送到p1和p2交换机的routingKey符合“*.c.#”规则的消息,而log服务无法消费这些消息,因为它们不符合其绑定的规则“log.#”。
如果p1和p2都发送了routingKey符合“*.c.#”规则的消息,这些消息将会被同时投递到c队列上,但是您并不知道这些消息分别来自p1还是p2。
为了解决这个问题,我建议您将c队列拆分为两个队列,每个队列绑定一个交换机,这样log服务就可以消费自己感兴趣的消息,而c服务也可以根据自己的规则消费对应的消息了。
具体来说,您可以在RabbitMQ中创建两个Exchange,分别对应p1和p2生产者服务发送消息的交换机。然后分别为这两个Exchange创建两个Queue,分别对应c和log消费者服务要消费的队列。最后,您需要为每个Queue创建相应的Binding,将其绑定到对应的Exchange上,并指定相应的routingKey。
例如,为了消费p1服务发送到p1交换机的符合“*.c.#”规则的消息,您可以这样配置c服务的队列和绑定:
# 消费者c的spring stream定义
spring:
cloud:
stream:
bindings:
# 消费p1发送的消息
input_p1:
# 消费的exchange
destination: p1
binder: local_rabbit
# 消费的queue
group: c
# 指定消费的routingKey
consumer:
bindingRoutingKey: '*.c.#'
# 手动确认
acknowledgeMode: MANUAL
rabbit:
bindings:
# 配置channel:“input_p1”
input_p1:
consumer:
# 绑定到p1交换机上,使用指定的routingKey
bindingDestination: p1
bindingRoutingKey: '*.c.#'
# 手动确认
acknowledgeMode: MANUAL
queues:
# 配置c服务要消费的队列
c-p1:
# 绑定到p1交换机上
bindings: input_p1
然后,您需要为p2服务发送到p2交换机的消息配置相应的队列和绑定,具体操作类似于上面的示例。
希望这些信息可以帮助您解决问题。
不知道你这个问题是否已经解决, 如果还没有解决的话: