业务是 A服务收到web请求,发送一个mq消息,由B服务接收mq 并处理数据后 发送另一个topic给A服务。
A服务收到消息返回成功或者失败给http请求。
问题:
do {
// 循环查看缓存是否接收到mq消息
for (int i = 1000; i > 0; i--) {
if (callbackClass.callback(keys) == ParamCode.ERROR.getCode()) {
return false;
// -----------②-----问题出在这里------------
} else if (callbackClass.callback(keys) == ParamCode.SUCCESS.getCode()){
return true;
}
}
// 判断是否超时
if (System.currentTimeMillis()/1000 - startTime > 10){
return false;
}
} while (true);
以下为示例代码
Enum
public enum ParamCode {
NONE("未收到MQ消息",0),
SUCCESS("mq返回成功" ,1),
ERROR("mq返回失败",2);
private String name;
private Integer code;
ParamCode(String name, Integer code) {
this.name = name;
this.code = code;
}
public String getName() {
return name;
}
public Integer getCode() {
return code;
}
}
接口
业务类接口
public interface CallbackClass {
/**
* 回调查看数据是否接收到mq消息
* @param keys
* @return
*/
Integer callback(List<String> keys);
/**
* 缓存数据或修改缓存的值
* @param keys
* @param value
*/
void cache(List<String> keys, Integer value);
}
缓存类接口
public interface CallbackClass {
/**
* 回调查看数据是否接收到mq消息
* @param keys
* @return
*/
Integer operationCallbackMqtt(List<String> keys);
/**
* 缓存数据或修改缓存的值
* @param keys
* @param value
*/
void cacheAlarmInfo(List<String> keys, Integer value);
}
实现类
业务类
@Service
public class ServiceImpl implements Service {
@Autowired
private CallbackClass callbackClass;
/**
* 发送mq 调用回调幻术
* @param keys
* @return
*/
public boolean serviceMethod(List<String> keys){
/*
.......这里发送了一个mq消息
*/
//将数据放入缓存 状态为未收到mq消息
callbackClass.cache(keys,ParamCode.NONE.getCode());
long startTime = System.currentTimeMillis()/1000;
do {
// 循环查看缓存是否接收到mq消息
for (int i = 1000; i > 0; i--) {
// -----------②-----问题出在这里------------
if (callbackClass.callback(keys) == ParamCode.ERROR.getCode()) {
return false;
} else if (callbackClass.callback(keys) == ParamCode.SUCCESS.getCode()){
return true;
}
}
// 判断是否超时
if (System.currentTimeMillis()/1000 - startTime > 10){
return false;
}
} while (true);
}
}
缓存类
@Service
public class CallbackClassImpl implements CallbackClass {
private static ConcurrentHashMap<String, Integer> mqCallbackMap = new ConcurrentHashMap<>();
// 添加缓存,或者修改value值
public void cache(List<String> keys, Integer value) {
if (keys != null) {
for (String key: keys) {
mqCallbackMap.put(key,value);
}
}
}
public Integer callback(List<String> keys) {
int size = keys.size();
// --------①------------
System.out.println(Thread.currentThread().getName() + "------------------------" + size);
System.out.println(mqCallbackMap);
do {
String key = keys.get(--size);
int code = 0;
if (StringUtil.isNullOrEmpty(key)) {
code = mqCallbackMap.get(key);
}
if (code != ParamCode.SUCCESS.getCode()) {
return code;
}
} while (size != 0);
// list 中的所有key 都成功时删除缓存并返回成功
// 根据key 删除Map 中的数据
do {
String key = keys.get(size++);
if (StringUtil.isNullOrEmpty(key)) {
mqCallbackMap.remove(key);
}
} while (size != keys.size());
return ParamCode.SUCCESS.getCode();
}
}
MQ这样用你是真的牛批