RabbitMQ一个队列绑定多个交换机的问题

【背景】
在基于spring cloud的微服务架构中,我使用spring stream集成rabbitmq,完成消息驱动场景下的业务。
其中,有2个生产者服务:p1、p2,将他们的业务处理后的消息发送到各自的Topic类型的交换机中,以供下游有需要的消费者去消费;
有2个消费者服务:c(从c队列中消费和c相关的消息内容),log(从log队列中消费和日志相关的消息内容)。
log服务按照:"log.#"规则的bindingkey来消费其绑定交换机中的所有日志类型的消息;
c服务按照:"*.c.#"规则的bindingkey来消费其绑定交换机中的所有C类型的消息;
故p1、p2采用动态routingkey的形式,按照形如:“log.c.xx”规则的routingkey发送的消息将被log和c同时消费;
整个消息传递形如:

img

其中生产者P1的spring stream定义:

spring:
  cloud:
    stream:
      bindings:
        # 本服务发送消息binding,对应rabbit的channel
        output:
          # 制定发送对应的topic(kafaka)或Exchange(RabbitMQ)
          destination: p1
          binder: local_rabbit

生产者P2的spring stream定义:

spring:
  cloud:
    stream:
      bindings:
        # 本服务发送消息binding,对应rabbit的channel
        output:
          # 制定发送对应的topic(kafaka)或Exchange(RabbitMQ)
          destination: p2
          binder: local_rabbit

消费者c的spring stream定义:

spring:
  cloud:
    stream:
      bindings:
        # 消费p1发送的消息
        input_p1:
          # 消费的exchange
          destination: p1
          binder: local_rabbit
          # 消费的queue
          group: c

        # 消费p2发送的消息
        input_p2:
          # 消费的exchange
          destination: p2
          binder: local_rabbit
          # 消费的queue
          group: c

.......

      rabbit:
        bindings:
          # 配置channel:“input_p1”
          input_p1:
            consumer:
              # 绑定路由配置:将该渠道的queue绑定到exchange时使用该binding key
              bindingRoutingKey: '*.c.#'
              # 手动确认
              acknowledgeMode: MANUAL

          # 配置channel:“input_p2”
          input_p2:
            consumer:
              # 绑定路由配置:将该渠道的queue绑定到exchange时使用该binding key
              bindingRoutingKey: '*.c.#'
              # 手动确认
              acknowledgeMode: MANUAL

消费者log的定义类同c

【遇到的问题】
问题出现的c服务配置的两个输入通道和p1、p2之间的消息通信上:p1服务发送到交换机p1的“aa.c.xx”消息c可以消费,但p2服务发送到交换机p2的“bb.c.xx”消息c无法消费,在rabbit中显示为ready但始终在内存中未被消费的状态。

img

也就是是:对于c服务的c队列,它绑定到p1和p2用的bindingkey都是‘*.c.#’的表达式;而生产者p1、p2发送到各自的交换机的消息,也都是形如'xx.c.yy’的动态routingkey(比如,p1是'aa.c.xx',p2是'bb.c.xx'),为什么p1发的c就能消费到,p2发的c就消费不到呢?
请问问题在哪?应该如何改造?在微服务场景下,使用spring stream的情况下,需要exchange 和 queue的多对多匹配,有没有更好的设计?感谢!

“Devil组”引证GPT后的撰写:

  • 问题可能出现在p2服务发送到交换机p2的“log.c.xx”消息时。这个消息的路由键应该是“log.c.xx”,但是在消费者c的定义中,绑定的路由键是“*.c.#”,因此消费者c无法消费这个消息。
  • 解决这个问题的一种方法是,在消费者c的定义中,为p2的消息添加一个新的输入通道(例如input_p2_log),并使用路由键“log.#”将其绑定到p2交换机。然后在c的定义中,添加一个新的绑定,将这个输入通道(input_p2_log)绑定到c队列。这样消费者c就能够接收来自p2服务的日志消息。

以下答案由GPT-3.5大模型与博主波罗歌共同编写:
首先,对于问题的原因,可以根据你提供的路由规则进行推测。你的路由规则是"*.c.#",这意味着只要是以.c.为结尾的routing key都会被该队列消费。例如,p1发送log.c.xx的routing key可以被consumer队列消费,是因为该队列的binding key也是"*.c.#",正好匹配了该消息的routing key。但是,p2发送的log.c.xx的routing key无法被consumer队列消费,因为consumer队列的binding key只匹配了.c.为结尾的routing key,而p2发送的routing key是log.c.xx,前缀log.不匹配该队列的binding key中的任何前缀。因此,该消息一直处于ready状态但无法被消费。

解决这个问题,可以考虑对队列的binding key进行调整,将它的规则改为"*.c.#"和"log.#"

spring:
  cloud:
    stream:
      bindings:
        # 消费p1发送的消息
        input_p1:
          # 消费的exchange
          destination: p1
          binder: local_rabbit
          # 消费的queue
          group: c
          # binding key 改为 "*.c.#""log.#" 
          consumer:
            bindingRoutingKey: "*.c.#,log.#"
            acknowledgeMode: MANUAL

        # 消费p2发送的消息
        input_p2:
          # 消费的exchange
          destination: p2
          binder: local_rabbit
          # 消费的queue
          group: c
          # binding key 改为 "*.c.#""log.#" 
          consumer:
            bindingRoutingKey: "*.c.#,log.#"
            acknowledgeMode: MANUAL

改变后的规则可以同时匹配以.c.为结尾和以.c.和以.log.为前缀的routing key,能够正确地消费两个生产者发送的消息。

对于你的第二个问题,关于微服务场景下的exchange和queue的多对多匹配,Spring Cloud Stream提供了很好的支持。在这种情况下,每个生产者可以发送消息到它自己的Exchange,而消费者可以订阅它们感兴趣的Exchange或者队列。这种拓扑结构和路由规则的灵活性是由Spring Cloud Stream的Binder实现提供的,你可以通过修改Binder的配置来控制消息传递的路由规则和绑定。例如,RabbitMQ实现的Binder默认使用Direct Exchange作为消息的路由方式,可以通过配置RabbitMQ Exchange的类型和binding参数来控制消息的路由规则。针对你的使用场景,建议参考Spring Cloud Stream和Binder的官方文档,灵活配置你的消息传递拓扑结构和路由规则。
如果我的回答解决了您的问题,请采纳!

参考GPT和自己的思路:根据您的描述,我认为问题可能在于您的Exchange和Binding的配置中存在一些问题。

您的两个生产者服务(p1和p2)都发送消息到各自的Topic类型的交换机中。但是您的消费者服务(c和log)都绑定到同一个队列(c队列)上,这样就会存在以下问题:

c队列只能同时消费p1和p2发送到p1和p2交换机的routingKey符合“*.c.#”规则的消息,而log服务无法消费这些消息,因为它们不符合其绑定的规则“log.#”。
如果p1和p2都发送了routingKey符合“*.c.#”规则的消息,这些消息将会被同时投递到c队列上,但是您并不知道这些消息分别来自p1还是p2。
为了解决这个问题,我建议您将c队列拆分为两个队列,每个队列绑定一个交换机,这样log服务就可以消费自己感兴趣的消息,而c服务也可以根据自己的规则消费对应的消息了。

具体来说,您可以在RabbitMQ中创建两个Exchange,分别对应p1和p2生产者服务发送消息的交换机。然后分别为这两个Exchange创建两个Queue,分别对应c和log消费者服务要消费的队列。最后,您需要为每个Queue创建相应的Binding,将其绑定到对应的Exchange上,并指定相应的routingKey。

例如,为了消费p1服务发送到p1交换机的符合“*.c.#”规则的消息,您可以这样配置c服务的队列和绑定:

# 消费者c的spring stream定义
spring:
  cloud:
    stream:
      bindings:
        # 消费p1发送的消息
        input_p1:
          # 消费的exchange
          destination: p1
          binder: local_rabbit
          # 消费的queue
          group: c
          # 指定消费的routingKey
          consumer:
            bindingRoutingKey: '*.c.#'
            # 手动确认
            acknowledgeMode: MANUAL

      rabbit:
        bindings:
          # 配置channel:“input_p1”
          input_p1:
            consumer:
              # 绑定到p1交换机上,使用指定的routingKey
              bindingDestination: p1
              bindingRoutingKey: '*.c.#'
              # 手动确认
              acknowledgeMode: MANUAL

        queues:
          # 配置c服务要消费的队列
          c-p1:
            # 绑定到p1交换机上
            bindings: input_p1

然后,您需要为p2服务发送到p2交换机的消息配置相应的队列和绑定,具体操作类似于上面的示例。

希望这些信息可以帮助您解决问题。

不知道你这个问题是否已经解决, 如果还没有解决的话:

如果你已经解决了该问题, 非常希望你能够分享一下解决方案, 写成博客, 将相关链接放在评论区, 以帮助更多的人 ^-^