情况描述:现需要将flink 窗口处理的结果,存储到map中,使用java 定时器,定时写入redis中。但是上传到flink中执行,map中数据为空。本地测试,均正常
相关代码:
//Map
public class MapUtils {
// 指向自己实例的私有静态引用,主动创建
private static ConcurrentHashMap<String, Person> map = new ConcurrentHashMap();
// 以自己实例为返回值的静态的公有方法,静态工厂方法
public static ConcurrentHashMap<String, Person> getMap() {
return map;
}
}
//java 测试类
class classTimerTest {
static String str="HH:mm:ss";
static SimpleDateFormat dateFormat = new SimpleDateFormat(str);
public static void main(String[] args) {
Timer timer = new Timer();
String now1 = dateFormat.format(System.currentTimeMillis());
MapUtils.getMap().put("1", new Person());
System.out.println(now1);
//延迟3秒后执行任务
timer.schedule(new MyTask(),3000,3000);//单位是毫秒
}
}
class MyTask extends TimerTask {
@Override
public void run() {
System.out.println(MapUtils.getMap().isEmpty());
}
}
//flink 主方法
public class FlinkApplication {
public static void main(String[] args) throws Exception {
//忽略,窗口数据写入到Map单例中
Timer timer = new Timer();
//String now1 = dateFormat.format(System.currentTimeMillis());
System.out.println(new Date());
//延迟3秒后执行任务
timer.schedule(new MyTask(),10000,10000);//单位是毫秒
FlinkUtils.getEnv().execute("init job");
}
}
class MyTask extends TimerTask {
@Override
public void run() {
System.out.println(MapUtils.getMap().isEmpty());
}
}
测试结果:
idea java 测试正常,map有数据
idea flink 测试正常,map有数据
flink 运行jar包, map为空。
麻烦大家帮忙看一下
flink不是有窗口函数吗