为什么能接收到消息,但是发送的消息,在mqttx里面却收不到
这是接收的代码
package com.yc.go.config.mqtt;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
/**
* MQTT消费端
*/
@Configuration
public class IotMqttSubscriberConfig {
@Autowired
private MqttConfig mqttConfig;
@Bean(name = "mqttSubscriberClientFactory")
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setUserName(mqttConfig.getUsername());
mqttConnectOptions.setPassword(mqttConfig.getPassword().toCharArray());
mqttConnectOptions.setCleanSession(false);
mqttConnectOptions.setServerURIs(mqttConfig.getHostUrl().split(","));
mqttConnectOptions.setKeepAliveInterval(2);
factory.setConnectionOptions(mqttConnectOptions);
return factory;
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
//配置client,监听的topic
@Bean
public MessageProducer inbound() {
String[] inboundTopics = mqttConfig.getDefaultTopic().split(",");
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
mqttConfig.getClientId() + "_inbound", mqttClientFactory(), inboundTopics); //对inboundTopics主题进行监听
adapter.setCompletionTimeout(5000);
adapter.setQos(1);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
//通过通道获取数据
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel") //异步处理
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println("----------------------");
System.out.println("message:" + message.getPayload());
System.out.println("PacketId:" + message.getHeaders().getId());
System.out.println("Qos:" + message.getHeaders().get(MqttHeaders.QOS));
String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
System.out.println("topic:" + topic);
}
};
}
}
这是发送的代码
package com.yc.go.config.mqtt;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
/**
* @author yc
*/
@Configuration
public class IotMqttProducerConfig {
@Autowired
private MqttConfig mqttConfig;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setUserName(mqttConfig.getUsername());
mqttConnectOptions.setPassword(mqttConfig.getPassword().toCharArray());
mqttConnectOptions.setCleanSession(false);
mqttConnectOptions.setServerURIs(mqttConfig.getHostUrl().split(","));
mqttConnectOptions.setKeepAliveInterval(60);
mqttConnectOptions.setConnectionTimeout(60);
factory.setConnectionOptions(mqttConnectOptions);
return factory;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfig.getClientId() + "outbound", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());
return messageHandler;
}
}
```java
@Service
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
/**
* 发送到mqtt
*
* @param payload 有效载荷
*/
void sendToMqtt(String payload);
/**
* 发送到mqtt
*
* @param topic 主题
* @param payload 消息内容
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
/**
* 发送到mqtt
*
* @param topic 主题
* @param qos qos
* @param payload 消息内容
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
但是这一段代码却可以发送
@PostMapping("/index")
public String index() {
String broker = "tcp://broker.emqx.io:1883";
// TLS/SSL
// String broker = "ssl://broker.emqx.io:8883";
String topic = "testtopic1";
String username = "emqx";
String password = "public";
String clientid = "publish_client";
String content = "Hello MQTT";
int qos = 0;
try {
MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
// 连接参数
MqttConnectOptions options = new MqttConnectOptions();
// 设置用户名和密码
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setKeepAliveInterval(60);
options.setConnectionTimeout(60);
// 连接
client.connect(options);
// 创建消息并设置 QoS
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
// 发布消息
client.publish(topic, message);
System.out.println("Message published");
System.out.println("topic: " + topic);
System.out.println("message content: " + content);
// 断开连接
client.disconnect();
// 关闭客户端
client.close();
} catch (MqttException e) {
throw new RuntimeException(e);
}
return "index";
}
```
与发送成功的相比,topic不一样。看看mqttConfig.getDefaultTopic()值是什么,对应的队列是否存在?或者改为已有的。
首先注入MQTTClient
,与单数据源的唯一区别就是bean
的名称。默认向Spring
容器中添加的实现类名称为“${数据源名称}MQTTClient”
以上面的配置文件为例,默认的bean
名称为 defaultMQTTClient
和 billMQTTClient
import com.demo.mqttclient.MQTTClient;
@Resource
private MQTTClient defaultMQTTClient;
@Resource
private MQTTClient billMQTTClient;
其他操作同单数据源
根据提供的参考资料,可以看出代码中有些部分是错误的,需要进行修改,以下是需要修改的内容:
接收消息部分的配置类中,inbound()方法的adapter.setOutputChannel()方法应该传入mqttInputChannel()方法的返回值,而不是传入一个字符串。
接收消息部分的配置类中,inbound()方法中传入的topic名称应该与实际使用的topic名称相同。
发送消息部分的MyGateway接口中定义了sendToMqtt()方法,然而在实际使用时,这个方法传入的参数名称是不正确的(应该是payload而不是data),需要进行修改。
发送消息部分的配置类中,MessageHandler实例创建时的topic名称也应该与实际使用的topic名称相同。
综上所述,需要修改的代码如下:
@Bean
public MessageProducer inbound(MessageChannel mqttInputChannel,MqttPahoClientFactory mqttClientFactory) {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter("springclient",mqttClientFactory,
"testTopic"); //修改为实际使用的topic名称
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MyGateway {
void sendToMqtt(String payload); //修改为payload
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler("testClient", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic("testTopic"); //修改为实际使用的topic名称
return messageHandler;
}
修改后的代码应该可以正常接收发送MQTT消息了。