Java arraylist 自动添加元素的问题

我现在想在java中设立一个ArrayList MessageQueue, 往里面不停添加从MQTT broker上传过来的消息。然后获取整个MessageQueue,比如从MQTT broker上总共传来三个消息,那么我的MessageQueue里就应该有三个消息。我现在的代码如下,我发现当我输入一个信息后, System.out.println("现在的MessageQueue是:" + MessageQueue)输出的是接收到的信息,但是如果我接收到的是第二条信息,那么这个输出的只是第二条信息,并不是两条信息的整体。 而System.out.println("MessageQueue整体是:" + MessageQueue)的输出一直是null, 我想实现,每收到一个信息,那么信息会自动更新至MessageQueue中,即System.out.println("MessageQueue整体是:" + MessageQueue)的输出是所有已收到的信息。

 

请各位帮我看看哪儿出错了呢?

 

其中接收方代码如下

import jade.*;

public class ServerAgent extends Agent {

	private static final long serialVersionUID = 1L;
	protected ArrayList<ACLMessage> MessageQueue = null;
	// protected ArrayList<String> myProcess = new ArrayList<>();

	public void setup() {
		
		SubscribeSample subscribeSample = new SubscribeSample();
		subscribeSample.subscribe("Json");

		//每五秒重复一次下面的动作
		this.addBehaviour(new TickerBehaviour(this, 5000) {

			@Override
			protected void onTick() {
				
				if (subscribeSample.getMsg() != null && MessageQueue != null) {
					MessageQueue.add(subscribeSample.getMsg());
					System.out.println("现在的MessageQueue是:" + MessageQueue);


				} else {
					System.out.println("还未收到任何信息");
				}
				
				System.out.println("MessageQueue整体是:" + MessageQueue);

			}

 

Subscribesample类的核心代码如下,逻辑就是把从MQTT broker上传来的JSON文件 通过GSON 转化为JAVA对象, 在我这代码中,对象就是msg

public class SubscribeSample {

	public static String arrivedMessage;
	public static ArrayList<Object> ReceivedRequests = new ArrayList<>();
	public static ACLMessage msg = null;


	public static void subscribe(String TOPIC) {
		
		String broker = "tcp://192.168.137.100:1883";
		
		int qos = 1;
		String clientid = "mqtt-explorer-3260c410";
		MaintestOutput m = new MaintestOutput();
		
		try {
			
			MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
			// MQTT的连接设置
			MqttConnectOptions options = new MqttConnectOptions();
			// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
			options.setCleanSession(true);
			// 设置连接的用户名
			// options.setUserName(userName);
			// 设置连接的密码
			// options.setPassword(passWord.toCharArray());
			// 设置超时时间 单位为秒
			options.setConnectionTimeout(10);
			// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
			options.setKeepAliveInterval(20);
			// 设置回调函数
			client.setCallback(new MqttCallback() {

				public void connectionLost(Throwable cause) {
					System.out.println("connectionLost");
				}

				public void messageArrived(String TOPIC, MqttMessage message)
						throws ClassNotFoundException, IOException {
					System.out.println("======监听到来自[" + TOPIC + "]的消息======");
					System.out.println("message content:" + new String(message.getPayload()));

				System.out.println("message.getPayload的类型" + message.getPayload());

				   Gson gson = new Gson();

					msg = gson.fromJson(Json, ACLMessage.class);


				public void deliveryComplete(IMqttDeliveryToken token) {
					System.out.println("deliveryComplete---------" + token.isComplete());
				}

			
			}

			);

			// 建立连接
			System.out.println("连接到 broker: " + broker);
			client.connect(options);

			System.out.println("连接成功.");
			// 订阅消息
			client.subscribe(TOPIC, qos);
			System.out.println("开始监听" + TOPIC);
		} catch (

		Exception e) {
			e.printStackTrace();
		}
	}

 

import jade.*;
public class ServerAgent extends Agent {
	private static final long serialVersionUID = 1L;
    //创建集合的目的是为了将所有的信息存放一起,然后输出MessageQueue整体消息
	protected ArrayList<ACLMessage> MessageQueue = new ArrayList<>();
	public void setup() {
		SubscribeSample subscribeSample = null;//这个对象需要在每次调用的时候创建
		subscribeSample.subscribe("Json");
		//每五秒重复一次下面的动作
		this.addBehaviour(new TickerBehaviour(this, 5000) {
            subscribeSample =new SubscribeSample();
			@Override
			protected void onTick() {
				//上面需要new好MessageQueue对象,否则这块一直不成立
				if (subscribeSample.getMsg() != null && MessageQueue != null) {
					MessageQueue.add(subscribeSample.getMsg());
                    //现在接收到的消息,直接从subscribeSample.getMsg()输出就好了
					System.out.println("现在的MessageQueue是:"+subscribeSample.getMsg() );
				} else { 
					System.out.println("还未收到任何信息");
				}
				System.out.println("MessageQueue整体是:" + MessageQueue);
			}

修改一下接收方代码

protected ArrayList<ACLMessage> MessageQueue = null;

 这是未初始化的语句

if (subscribeSample.getMsg() != null && MessageQueue != null) {
					MessageQueue.add(subscribeSample.getMsg());
					System.out.println("现在的MessageQueue是:" + MessageQueue);
 
 
				}

 这是要添加到queue的语句

 

显然的,你需要

ArrayList<ACLMessage> = new ArrayList<ACLMessage>();

对吗

ArrayList<Integer> primeNumbers = new ArrayList<>();这样初始化数组

输出当前信息直接用subscribeSample.getMsg()呗

还有ACLMessage应该相当于是string类型吧

希望可以采纳

你这个逻辑写法就有问题  在实际项目中绝对是被pass掉的。

客户端收到mqtt服务器推送的消息后  直接就放进MessageQueue即可,无需你自己写个定时器 定时去拉去*(这种做法就不可取)

你的ArrayList 在声明的时候就是null,然后你每次收到消息后 再去new对象  所以才会造成你每次只有一条信息在MessageQueue中 (因为你每次都new 一个ArrayListdudu对象)

import jade.*;
public class ServerAgent extends Agent {
	private static final long serialVersionUID = 1L;
	protected ArrayList<ACLMessage> MessageQueue = null;
	// protected ArrayList<String> myProcess = new ArrayList<>();
	public void setup() {
		
		SubscribeSample subscribeSample = new SubscribeSample();
		subscribeSample.subscribe("Json");
		//每五秒重复一次下面的动作
		this.addBehaviour(new TickerBehaviour(this, 5000) {
			@Override
			protected void onTick() {
				
				if (subscribeSample.getMsg() != null && MessageQueue != null) {
                    //这里加一个判断,如果是空值就给new一个对象
                    if(MessageQueue==null){
                        MessageQueue=new ArrayList<>();
                    }
					MessageQueue.add(subscribeSample.getMsg());
					System.out.println("现在的MessageQueue是:" + MessageQueue);
 
				} else {
					System.out.println("还未收到任何信息");
				}
				
				System.out.println("MessageQueue整体是:" + MessageQueue);
			}

如上,21行处改动

我的理解:你想要获取到消息后然后处理这些消息,并且消息不丢失。

你现在的这种做法完全不行

第一:5秒执行一次,每次都能获取到没有收到的消息吗?有偏移量吗?

第二:你写的这个消息队列MessagQueue得初始化,并且每次获取到了MessagQueue里的全部消息后得出队,这个跟MQ的思路是一样的(不知道你了解MQ不,抱歉!)。

第三:建议你用MQ实现,或者模拟队列,实时入队,而不是5秒一次。

总之,按我的理解,你这个完全就是要做一个MQ的功能

 

 

你直接放redis里面 然后用redis的方法获取 可获取一个也可以获取多个,网上有好多redis的工具类用起来很方便的希望能帮到你!

每次都new TickerBehaviour,MessageQueue属于TickerBehaviour对象的,就表示TickerBehaviour每次都初始化了一个List,理论上TickerBehaviour永远只会有一条数据,把MessageQueue改为ServerAgent的静态变量,引用改为ServerAgent.ServerAgent就可以了,记得先初始化。