为什么能接收到消息,但是发送的消息,在mqttx里面却收不到

为什么能接收到消息,但是发送的消息,在mqttx里面却收不到

这是接收的代码

package com.yc.go.config.mqtt;


import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;


/**
 * MQTT消费端
 */
@Configuration
public class IotMqttSubscriberConfig {

    @Autowired
    private MqttConfig mqttConfig;

    @Bean(name = "mqttSubscriberClientFactory")
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setUserName(mqttConfig.getUsername());
        mqttConnectOptions.setPassword(mqttConfig.getPassword().toCharArray());
        mqttConnectOptions.setCleanSession(false);
        mqttConnectOptions.setServerURIs(mqttConfig.getHostUrl().split(","));
        mqttConnectOptions.setKeepAliveInterval(2);
        factory.setConnectionOptions(mqttConnectOptions);
        return factory;
    }

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    //配置client,监听的topic
    @Bean
    public MessageProducer inbound() {
        String[] inboundTopics = mqttConfig.getDefaultTopic().split(",");
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
                mqttConfig.getClientId() + "_inbound", mqttClientFactory(), inboundTopics);  //对inboundTopics主题进行监听
        adapter.setCompletionTimeout(5000);
        adapter.setQos(1);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }


    //通过通道获取数据
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")  //异步处理
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println("----------------------");
                System.out.println("message:" + message.getPayload());
                System.out.println("PacketId:" + message.getHeaders().getId());
                System.out.println("Qos:" + message.getHeaders().get(MqttHeaders.QOS));

                String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
                System.out.println("topic:" + topic);
            }
        };
    }
}


这是发送的代码

package com.yc.go.config.mqtt;

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

/**
 * @author yc
 */
@Configuration
public class IotMqttProducerConfig {

    @Autowired
    private MqttConfig mqttConfig;

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setUserName(mqttConfig.getUsername());
        mqttConnectOptions.setPassword(mqttConfig.getPassword().toCharArray());
        mqttConnectOptions.setCleanSession(false);
        mqttConnectOptions.setServerURIs(mqttConfig.getHostUrl().split(","));
        mqttConnectOptions.setKeepAliveInterval(60);
        mqttConnectOptions.setConnectionTimeout(60);
        factory.setConnectionOptions(mqttConnectOptions);
        return factory;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfig.getClientId() + "outbound", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());
        return messageHandler;
    }
}



```java
@Service
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {


    /**
     * 发送到mqtt
     *
     * @param payload 有效载荷
     */
    void sendToMqtt(String payload);

    /**
     * 发送到mqtt
     *
     * @param topic   主题
     * @param payload 消息内容
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);

    /**
     * 发送到mqtt
     *
     * @param topic   主题
     * @param qos     qos
     * @param payload 消息内容
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}

但是这一段代码却可以发送
@PostMapping("/index")
    public String index() {
        String broker = "tcp://broker.emqx.io:1883";
        // TLS/SSL
        // String broker = "ssl://broker.emqx.io:8883";
        String topic = "testtopic1";
        String username = "emqx";
        String password = "public";
        String clientid = "publish_client";
        String content = "Hello MQTT";
        int qos = 0;

        try {
            MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
            // 连接参数
            MqttConnectOptions options = new MqttConnectOptions();
            // 设置用户名和密码
            options.setUserName(username);
            options.setPassword(password.toCharArray());
            options.setKeepAliveInterval(60);
            options.setConnectionTimeout(60);
            // 连接
            client.connect(options);
            // 创建消息并设置 QoS
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(qos);
            // 发布消息
            client.publish(topic, message);
            System.out.println("Message published");
            System.out.println("topic: " + topic);
            System.out.println("message content: " + content);
            // 断开连接
            client.disconnect();
            // 关闭客户端
            client.close();
        } catch (MqttException e) {
            throw new RuntimeException(e);
        }
        return "index";
    }

```

与发送成功的相比,topic不一样。看看mqttConfig.getDefaultTopic()值是什么,对应的队列是否存在?或者改为已有的。

  • 帮你找了个相似的问题, 你可以看下: https://ask.csdn.net/questions/7495266
  • 我还给你找了一篇非常好的博客,你可以看看是否有帮助,链接:Mqtt消息发送、主题订阅
  • 除此之外, 这篇博客: 物联网消息队列客户端-MQTT-基本功能实现中的 4.7.1 发布消息 部分也许能够解决你的问题, 你可以仔细阅读以下内容或跳转源博客中阅读:
  • 首先注入MQTTClient,与单数据源的唯一区别就是bean的名称。默认向Spring容器中添加的实现类名称为“${数据源名称}MQTTClient”
    以上面的配置文件为例,默认的bean名称为 defaultMQTTClientbillMQTTClient

    import com.demo.mqttclient.MQTTClient;
    
    @Resource
    private MQTTClient defaultMQTTClient;
    
    @Resource
    private MQTTClient billMQTTClient;
    

    其他操作同单数据源

  • 您还可以看一下 可用行师老师的物联网即时通信架构实现与实战课程中的 MQTT控制报文剩余长度编码及解码算法解释2小节, 巩固相关知识点
  • 以下回答由chatgpt基于相关博客总结生成:

    根据提供的参考资料,可以看出代码中有些部分是错误的,需要进行修改,以下是需要修改的内容:

    1. 接收消息部分的配置类中,inbound()方法的adapter.setOutputChannel()方法应该传入mqttInputChannel()方法的返回值,而不是传入一个字符串。

    2. 接收消息部分的配置类中,inbound()方法中传入的topic名称应该与实际使用的topic名称相同。

    3. 发送消息部分的MyGateway接口中定义了sendToMqtt()方法,然而在实际使用时,这个方法传入的参数名称是不正确的(应该是payload而不是data),需要进行修改。

    4. 发送消息部分的配置类中,MessageHandler实例创建时的topic名称也应该与实际使用的topic名称相同。

    综上所述,需要修改的代码如下:

    1. MqttConfig配置类中的inbound()方法:
    @Bean
    public MessageProducer inbound(MessageChannel mqttInputChannel,MqttPahoClientFactory mqttClientFactory) {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter("springclient",mqttClientFactory,
                        "testTopic"); //修改为实际使用的topic名称
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }
    
    1. MyGateway接口中的sendToMqtt()方法:
    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MyGateway {
        void sendToMqtt(String payload); //修改为payload
    }
    
    1. MqttConfig配置类中的mqttOutbound()方法:
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                new MqttPahoMessageHandler("testClient", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("testTopic"); //修改为实际使用的topic名称
        return messageHandler;
    }
    

    修改后的代码应该可以正常接收发送MQTT消息了。