// 默认连接用户
private static final String USERNAME = "admin";
// 默认连接密码
private static final String PASSWORD = "admin";
// 默认连接URL
private static final String BROKERURL = "tcp://139.224.235.172:61613";
public static void main(String[] args) {
// 连接工厂
ConnectionFactory connectionFactory;
// 连接
Connection connection = null;
// 会话
Session session;
// 消息目的地
Destination destination;
// 消息消费者
MessageConsumer consumer;
// 实例化工厂
connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD,
BROKERURL);
try {
// 创建连接
connection = connectionFactory.createConnection();
// 启动连接
connection.start();
// 创建会话
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建消息队列
destination = session
.createQueue("p171024103004.response.payonline");
// 创建消费者
consumer = session.createConsumer(destination);
while (true) {
// 参数:接收消息的超时时间,为0的话则不超时,receive返回下一个消息,但是超时了或者消费者被关闭,返回null
Message message = (Message) consumer.receive(10*1000);
if (message != null) {
System.out.println("收到的消息:" + ((TextMessage) message).getText());
} else {
System.out.println("没有收到消息");
break;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
需要 创建生产者 并且 要同一个消息队列 还需要生产者发送
final MessageProducer producer = session.createProducer(destination);
TextMessage textMessage = session.createTextMessage("你快看看我出不出来");
producer.send(textMessage);

有可能是你环境没有搭好
一般都是在客户端关闭seesion conection,代码例子
Java code
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Override
public void convertAndSend(Event message) {
// build ConnectionFactory And Queue is necessary
buildConnectionFactoryAndQueue();
Connection connect = null;
Session session = null;
MessageProducer producer = null;
try {
connect = jmsConnectionFactory.createConnection();
session = connect.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
producer = session.createProducer(queue);
// create a JMS message and send it
ObjectMessage objMsg = session.createObjectMessage(message);
// set message selector
String messageSelector = message.getMessageSelector();
objMsg.setStringProperty("messageReceiver", messageSelector);
producer.send(objMsg);
} catch (JMSException e) {
String errorMessage = "JMSException while queueing HTTP JMS Message";
throw new EventRuntimeException(errorMessage, e);
} finally {
SafeCloseUtil.close(producer); // 这里关闭 producer
SafeCloseUtil.close(session); // 这里关闭 producer
SafeCloseUtil.close(connect);
}
}
服务端只需要实现MessageListener的onMessage,客户端的 session服务端根本不知道,也无法关闭。
Java code
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Override
public void onMessage(Message message) {
printLogMessage("start public function onMessage()..");
try {
if (message instanceof ObjectMessage) {
ObjectMessage objMsg = (ObjectMessage) message;
Event event = (Event) objMsg.getObject();
// if Re deliver message warn the message
if (message.getJMSRedelivered()) {
log.warn("...", event.getClass().getSimpleName());
}
// out Put Event Log
outPutEventLog(event);
// dispatch Event
dispatchEvent(event);
} else {
log.error("This MDB message was not instance of ObjectMessage; ignoring.");
}
printLogMessage("end public function onMessage()..");
} catch (JMSException e) {
String errorMessage = "JMS Exception while Listener message.errorMessage:"
+ e.getMessage();
log.error(errorMessage);
throw new EventRuntimeException(errorMessage, e);
}
}
生产者 是否还在运行?又是否 持久化? 确定后去百度 ActiveMQ 使用?