java接入mqtt 遇到的一些问题

想请问下各位 java接入mqtt ,我在模拟TPS 100 左右的数据时 ,处理收到的消息只处理数据不会出现问题,但是在处理消息中对消息回复,或者在处理数据的时候给别人发数据,就会造成服务掉线。这是什么原因

【相关推荐】



  • 请看👉 :mqtt基于paho的消息订阅接收的JAVA代码
  • 除此之外, 这篇博客: java 实现mqtt发送和接收消息 客户端代码中的 2接收消息的客户端: 部分也许能够解决你的问题, 你可以仔细阅读以下内容或跳转源博客中阅读:
    package com.wanwei.mqtt;
    
    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttSecurityException;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    
    import com.wanwei.ilogger.ilogger;
    
    public class MyMqttRecieveMessage {
    
    	private static int QoS = 1;
    	private static String Host = "tcp://127.0.0.1:1883";
    	private static MemoryPersistence memoryPersistence = null;
    	private static MqttConnectOptions mqttConnectOptions = null;
    	private static MqttClient mqttClient  = null;
    	
    	public static void init(String clientId) {
    		mqttConnectOptions = new MqttConnectOptions();
    		memoryPersistence = new MemoryPersistence();
    		ilogger ilogger = new ilogger("MyMqttRecieveMessage", "init");
    		if(null != memoryPersistence && null != clientId && null != Host) {
    			try {
    				 mqttClient = new MqttClient(Host, clientId, memoryPersistence);
    			} catch (MqttException e) {
    				// TODO Auto-generated catch block
    				e.printStackTrace();
    			}
    		}else {
    			ilogger.logerr("memoryPersistence clientId Host 有空值");
    		}
    		
    		if(null != mqttConnectOptions) {
    			mqttConnectOptions.setCleanSession(true);
    			mqttConnectOptions.setConnectionTimeout(30);
    			mqttConnectOptions.setKeepAliveInterval(45);
    			if(null != mqttClient && !mqttClient.isConnected()) {
    				mqttClient.setCallback(new MqttRecieveCallback());
    				try {
    					mqttClient.connect();
    				} catch (MqttException e) {
    					// TODO Auto-generated catch block
    					e.printStackTrace();
    				} 
    			}else {
    				ilogger.logerr("mqttClient is error");
    			}
    		}else {
    			ilogger.logerr("mqttConnectOptions is null");
    		}
    	}
    	
    	
        public static void recieve(String topic) {
            int[] Qos = {QoS};
            String[] topics = {topic};
            ilogger ilogger = new ilogger("MyMqttRecieveMessage", "subTopic");
            if(null != mqttClient && mqttClient.isConnected()) {
            	if(null!=topics && null!=Qos && topics.length>0 && Qos.length>0) {
            		try {
            			mqttClient.subscribe(topics, Qos);
            		} catch (MqttException e) {
            			// TODO Auto-generated catch block
            			e.printStackTrace();
            		}
            	}else {
            		ilogger.logerr("there is error");
            	}
            }else {
            	init("123444");
            	recieve(topic);
            }
        }
     
    
    }
    	
    
    

    接收消息客户端回调函数:

    package com.wanwei.mqtt;
    
    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttCallback;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    
    import com.wanwei.bean.ChatMessage;
    import com.wanwei.dao.DealMessage;
    
    public class MqttRecieveCallback implements MqttCallback{
    
    	@Override
    	public void connectionLost(Throwable cause) {
    		
    	}
    
    	@Override
    	public void messageArrived(String topic, MqttMessage message) throws Exception {
    		 System.out.println("Client 接收消息主题 : " + topic);
    	     System.out.println("Client 接收消息Qos : " + message.getQos());
    	     System.out.println("Client 接收消息内容 : " + new String(message.getPayload()));
    	    
    	    
    	}
    
    	@Override
    	public void deliveryComplete(IMqttDeliveryToken token) {
    		
    	}
    
    }
    

    切记接收消息时,以上两个函数缺一不可!!!

    下一篇讲一下mqtt客户端具体用法,附带测试方法


如果你已经解决了该问题, 非常希望你能够分享一下解决方案, 写成博客, 将相关链接放在评论区, 以帮助更多的人 ^-^