想请问下各位 java接入mqtt ,我在模拟TPS 100 左右的数据时 ,处理收到的消息只处理数据不会出现问题,但是在处理消息中对消息回复,或者在处理数据的时候给别人发数据,就会造成服务掉线。这是什么原因
【相关推荐】
package com.wanwei.mqtt;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import com.wanwei.ilogger.ilogger;
public class MyMqttRecieveMessage {
private static int QoS = 1;
private static String Host = "tcp://127.0.0.1:1883";
private static MemoryPersistence memoryPersistence = null;
private static MqttConnectOptions mqttConnectOptions = null;
private static MqttClient mqttClient = null;
public static void init(String clientId) {
mqttConnectOptions = new MqttConnectOptions();
memoryPersistence = new MemoryPersistence();
ilogger ilogger = new ilogger("MyMqttRecieveMessage", "init");
if(null != memoryPersistence && null != clientId && null != Host) {
try {
mqttClient = new MqttClient(Host, clientId, memoryPersistence);
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}else {
ilogger.logerr("memoryPersistence clientId Host 有空值");
}
if(null != mqttConnectOptions) {
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setConnectionTimeout(30);
mqttConnectOptions.setKeepAliveInterval(45);
if(null != mqttClient && !mqttClient.isConnected()) {
mqttClient.setCallback(new MqttRecieveCallback());
try {
mqttClient.connect();
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}else {
ilogger.logerr("mqttClient is error");
}
}else {
ilogger.logerr("mqttConnectOptions is null");
}
}
public static void recieve(String topic) {
int[] Qos = {QoS};
String[] topics = {topic};
ilogger ilogger = new ilogger("MyMqttRecieveMessage", "subTopic");
if(null != mqttClient && mqttClient.isConnected()) {
if(null!=topics && null!=Qos && topics.length>0 && Qos.length>0) {
try {
mqttClient.subscribe(topics, Qos);
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}else {
ilogger.logerr("there is error");
}
}else {
init("123444");
recieve(topic);
}
}
}
接收消息客户端回调函数:
package com.wanwei.mqtt;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import com.wanwei.bean.ChatMessage;
import com.wanwei.dao.DealMessage;
public class MqttRecieveCallback implements MqttCallback{
@Override
public void connectionLost(Throwable cause) {
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("Client 接收消息主题 : " + topic);
System.out.println("Client 接收消息Qos : " + message.getQos());
System.out.println("Client 接收消息内容 : " + new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
}
切记接收消息时,以上两个函数缺一不可!!!
下一篇讲一下mqtt客户端具体用法,附带测试方法