我现在想在java中设立一个ArrayList MessageQueue, 往里面不停添加从MQTT broker上传过来的消息。然后获取整个MessageQueue,比如从MQTT broker上总共传来三个消息,那么我的MessageQueue里就应该有三个消息。我现在的代码如下,我发现当我输入一个信息后, System.out.println("现在的MessageQueue是:" + MessageQueue)输出的是接收到的信息,但是如果我接收到的是第二条信息,那么这个输出的只是第二条信息,并不是两条信息的整体。 而System.out.println("MessageQueue整体是:" + MessageQueue)的输出一直是null, 我想实现,每收到一个信息,那么信息会自动更新至MessageQueue中,即System.out.println("MessageQueue整体是:" + MessageQueue)的输出是所有已收到的信息。
请各位帮我看看哪儿出错了呢?
其中接收方代码如下
import jade.*;
public class ServerAgent extends Agent {
private static final long serialVersionUID = 1L;
protected ArrayList<ACLMessage> MessageQueue = null;
// protected ArrayList<String> myProcess = new ArrayList<>();
public void setup() {
SubscribeSample subscribeSample = new SubscribeSample();
subscribeSample.subscribe("Json");
//每五秒重复一次下面的动作
this.addBehaviour(new TickerBehaviour(this, 5000) {
@Override
protected void onTick() {
if (subscribeSample.getMsg() != null && MessageQueue != null) {
MessageQueue.add(subscribeSample.getMsg());
System.out.println("现在的MessageQueue是:" + MessageQueue);
} else {
System.out.println("还未收到任何信息");
}
System.out.println("MessageQueue整体是:" + MessageQueue);
}
Subscribesample类的核心代码如下,逻辑就是把从MQTT broker上传来的JSON文件 通过GSON 转化为JAVA对象, 在我这代码中,对象就是msg
public class SubscribeSample {
public static String arrivedMessage;
public static ArrayList<Object> ReceivedRequests = new ArrayList<>();
public static ACLMessage msg = null;
public static void subscribe(String TOPIC) {
String broker = "tcp://192.168.137.100:1883";
int qos = 1;
String clientid = "mqtt-explorer-3260c410";
MaintestOutput m = new MaintestOutput();
try {
MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
// MQTT的连接设置
MqttConnectOptions options = new MqttConnectOptions();
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
options.setCleanSession(true);
// 设置连接的用户名
// options.setUserName(userName);
// 设置连接的密码
// options.setPassword(passWord.toCharArray());
// 设置超时时间 单位为秒
options.setConnectionTimeout(10);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
options.setKeepAliveInterval(20);
// 设置回调函数
client.setCallback(new MqttCallback() {
public void connectionLost(Throwable cause) {
System.out.println("connectionLost");
}
public void messageArrived(String TOPIC, MqttMessage message)
throws ClassNotFoundException, IOException {
System.out.println("======监听到来自[" + TOPIC + "]的消息======");
System.out.println("message content:" + new String(message.getPayload()));
System.out.println("message.getPayload的类型" + message.getPayload());
Gson gson = new Gson();
msg = gson.fromJson(Json, ACLMessage.class);
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
}
);
// 建立连接
System.out.println("连接到 broker: " + broker);
client.connect(options);
System.out.println("连接成功.");
// 订阅消息
client.subscribe(TOPIC, qos);
System.out.println("开始监听" + TOPIC);
} catch (
Exception e) {
e.printStackTrace();
}
}
import jade.*;
public class ServerAgent extends Agent {
private static final long serialVersionUID = 1L;
//创建集合的目的是为了将所有的信息存放一起,然后输出MessageQueue整体消息
protected ArrayList<ACLMessage> MessageQueue = new ArrayList<>();
public void setup() {
SubscribeSample subscribeSample = null;//这个对象需要在每次调用的时候创建
subscribeSample.subscribe("Json");
//每五秒重复一次下面的动作
this.addBehaviour(new TickerBehaviour(this, 5000) {
subscribeSample =new SubscribeSample();
@Override
protected void onTick() {
//上面需要new好MessageQueue对象,否则这块一直不成立
if (subscribeSample.getMsg() != null && MessageQueue != null) {
MessageQueue.add(subscribeSample.getMsg());
//现在接收到的消息,直接从subscribeSample.getMsg()输出就好了
System.out.println("现在的MessageQueue是:"+subscribeSample.getMsg() );
} else {
System.out.println("还未收到任何信息");
}
System.out.println("MessageQueue整体是:" + MessageQueue);
}
修改一下接收方代码
protected ArrayList<ACLMessage> MessageQueue = null;
这是未初始化的语句
if (subscribeSample.getMsg() != null && MessageQueue != null) {
MessageQueue.add(subscribeSample.getMsg());
System.out.println("现在的MessageQueue是:" + MessageQueue);
}
这是要添加到queue的语句
显然的,你需要
ArrayList<ACLMessage> = new ArrayList<ACLMessage>();
对吗
ArrayList<Integer> primeNumbers = new ArrayList<>();这样初始化数组
输出当前信息直接用subscribeSample.getMsg()呗
还有ACLMessage应该相当于是string类型吧
希望可以采纳
你这个逻辑写法就有问题 在实际项目中绝对是被pass掉的。
客户端收到mqtt服务器推送的消息后 直接就放进MessageQueue即可,无需你自己写个定时器 定时去拉去*(这种做法就不可取)
你的ArrayList 在声明的时候就是null,然后你每次收到消息后 再去new对象 所以才会造成你每次只有一条信息在MessageQueue中 (因为你每次都new 一个ArrayListdudu对象)
import jade.*;
public class ServerAgent extends Agent {
private static final long serialVersionUID = 1L;
protected ArrayList<ACLMessage> MessageQueue = null;
// protected ArrayList<String> myProcess = new ArrayList<>();
public void setup() {
SubscribeSample subscribeSample = new SubscribeSample();
subscribeSample.subscribe("Json");
//每五秒重复一次下面的动作
this.addBehaviour(new TickerBehaviour(this, 5000) {
@Override
protected void onTick() {
if (subscribeSample.getMsg() != null && MessageQueue != null) {
//这里加一个判断,如果是空值就给new一个对象
if(MessageQueue==null){
MessageQueue=new ArrayList<>();
}
MessageQueue.add(subscribeSample.getMsg());
System.out.println("现在的MessageQueue是:" + MessageQueue);
} else {
System.out.println("还未收到任何信息");
}
System.out.println("MessageQueue整体是:" + MessageQueue);
}
如上,21行处改动
我的理解:你想要获取到消息后然后处理这些消息,并且消息不丢失。
你现在的这种做法完全不行
第一:5秒执行一次,每次都能获取到没有收到的消息吗?有偏移量吗?
第二:你写的这个消息队列MessagQueue得初始化,并且每次获取到了MessagQueue里的全部消息后得出队,这个跟MQ的思路是一样的(不知道你了解MQ不,抱歉!)。
第三:建议你用MQ实现,或者模拟队列,实时入队,而不是5秒一次。
总之,按我的理解,你这个完全就是要做一个MQ的功能
你直接放redis里面 然后用redis的方法获取 可获取一个也可以获取多个,网上有好多redis的工具类用起来很方便的希望能帮到你!
每次都new TickerBehaviour,MessageQueue属于TickerBehaviour对象的,就表示TickerBehaviour每次都初始化了一个List,理论上TickerBehaviour永远只会有一条数据,把MessageQueue改为ServerAgent的静态变量,引用改为ServerAgent.ServerAgent就可以了,记得先初始化。