一个自己用的java模板素材




```package com.cmb.cms.dataflow;


import com.cmb.bdp1.utils.CaseConf;
import com.cmb.cms.dataflow.saretail.api.MainProcess;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.Iterator;
import scala.collection.immutable.Map;

import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.Properties;


public class MainEntrance {
    private static final Logger logger = LoggerFactory.getLogger(MainEntrance.class);

    public static void main(String[] args) throws Exception {
        //加载配置
        if (args.length < 1) {
            logger.error("请传入配置文件路径");
            System.exit(1);
        }
        String confPath = args[0];
//        Properties userConfig = loadConfig(confPath);
        // properties 配置文件抽象 RDM提供的配置类未实现序列化接口,这里使用flink 提供的 ParameterTool
        CaseConf userConfig = new CaseConf(confPath);
        Map<String, String> allFrameConfMap = userConfig.getAll();

        java.util.Map<String,String> myConfMap = new java.util.HashMap<>();
        Iterator<String> keys = allFrameConfMap.keysIterator();
        while(keys.hasNext()){
            String key = keys.next();
            String value = userConfig.get(key, "");
            myConfMap.put(key,value);
        }
        ParameterTool parameters = ParameterTool.fromMap(myConfMap);

        parameters.getConfiguration().toMap().forEach((key, value) -> System.out.println("<" + key + "," + value + ">"));

        try {
            MainProcess cusProcess = (MainProcess) Class.forName(parameters.get("framework.streaming.busImpClassName")).newInstance();
            //是否是本地运行环境
            Boolean isLocal = parameters.getBoolean("app.run.local",false);
            StreamExecutionEnvironment env;
            if (isLocal) {
                System.out.println("env local");
                env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new org.apache.flink.configuration.Configuration());
            } else {
                System.out.println("env yarn");
                env = StreamExecutionEnvironment.getExecutionEnvironment();
            }

            //初始化检查点
            System.out.println("start initCheckpoint");
            cusProcess.initCheckpoint(env, parameters);
            //用户逻辑
            System.out.println("start process");
            cusProcess.setRestartStrategy(env,parameters);
            cusProcess.process(env, parameters);


        } catch (Exception e) {
            logger.error("作业任务执行失败", e);
            System.exit(1);
        }

    }

    public static Properties loadConfig(String confPath) {
        Properties userConfig = new Properties();
        try {
            if (confPath.startsWith("hdfs")) {
                Configuration hdfsConfig = new Configuration();
                Path path = new Path(confPath);
                FileSystem fileSystem = path.getFileSystem(hdfsConfig);
                try (FSDataInputStream fsInput = fileSystem.open(path);) {
                    userConfig.load(fsInput);
                    System.out.println("加载HDFS配置文件成功!");
                }
            } else {
                try (InputStream input = new FileInputStream(new File(confPath))) {
                    userConfig.load(input);
                    System.out.println("加载本地配置文件成功!");
                } catch (Exception e) {
                    System.out.println("加载配置文件出错");
                    System.exit(1);
                }
            }
        } catch (Exception e) {
            logger.error("配置文件加载错误", e);
            System.exit(1);
        }
        return userConfig;
    }

}
package com.cmb.cms.dataflow.saretail.api;

import com.cmb.cms.dataflow.saretail.constant.Constants;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.time.Time;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import static com.cmb.cms.dataflow.saretail.constant.Constants.FAILURE_RATE;
import static com.cmb.cms.dataflow.saretail.constant.Constants.FIXED_DELAY;

/**
 * Description : 检查点设置
 * Created By Ho258399 on  2021/4/30
 */
public interface MainProcess {

    //check point 设置
    default void initCheckpoint(StreamExecutionEnvironment env, ParameterTool userConfig) throws IOException {
        String checkpointSwitch = userConfig.get("checkpoint.switch");
        if (!Constants.Y.equals(checkpointSwitch)) {
            return;
        }
        String checkpointPath = userConfig.get("statebackend.save.dir");
        if (StringUtils.isBlank(checkpointPath)) {
            env.setStateBackend(new MemoryStateBackend());
        } else {
            if (Constants.CHECKPOINT_FS.equals(userConfig.get("statebackend.type"))) {
                env.setStateBackend(new FsStateBackend(checkpointPath));
            } else {
                env.setStateBackend(new RocksDBStateBackend(checkpointPath));
            }
        }
        //启用检查点并每秒保存一次,单位:毫秒;
        env.enableCheckpointing(Long.valueOf(userConfig.get("checkpoint.interval.ms", "60000")));
        //设置两次检查点操作之间的最小时间间隔,避免因为checkpoint操作给应用运行带来过大压力
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(Long.valueOf(userConfig.get("checkpoint.min.pause.ms", "5000")));
        //设置保存点模式为恰好一次
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //取消时保留检查点
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    }


    /**
     * 设置重启策略,在启用了checkpoint时会从最近的一个检查点恢复应用
     * 1.固定延迟重启策略
     * - 最大重启次数默认为3次
     * - 重启延迟时间默认为30s
     * 2.故障率重启策略
     * - 计算故障率的时间区间默认30分钟
     * - 区间内最大容忍的失败次数
     * - 重启延迟时间默认为30s
     *
     * @param env
     * @param userConfig
     */
    default void setRestartStrategy(StreamExecutionEnvironment env, ParameterTool userConfig) { //默认是只允许失败3次,第4次失败不会重启,就会由平台拉起了
        if (userConfig.get(Constants.RESTART_STRATEGY).equalsIgnoreCase(FIXED_DELAY)) {
            int attempts = userConfig.getInt(Constants.FIXED_DELAY_ATTEMPTS, 3);
            long delayTime = userConfig.getLong(Constants.FIXED_DELAY_DELAY, 30);
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart(attempts, Time.of(delayTime, TimeUnit.SECONDS)));

        } else if (userConfig.get(Constants.RESTART_STRATEGY).equalsIgnoreCase(FAILURE_RATE)) { //默认是半小时允许失败3次,半小时内第4次失败不会重启,就会由平台拉起了
            int maxFailuresPerInterval = userConfig.getInt(Constants.FAILURE_RATE_MAX_FAILURES, 3);
            long intervalTime = userConfig.getInt(Constants.FAILURE_RATE_INTERVAL, 30); //分钟级
            long delayTime = userConfig.getLong(Constants.FAILURE_RATE_DELAY, 30); //秒
            env.setRestartStrategy(RestartStrategies.failureRateRestart(
                    maxFailuresPerInterval, Time.of(intervalTime, TimeUnit.MINUTES), Time.of(delayTime, TimeUnit.SECONDS))
            );
        } else {
            System.out.println("使用平台默认的重启策略");
        }
    }


    void process(StreamExecutionEnvironment env, ParameterTool userConfig) throws Exception;
}


package com.cmb.cms.dataflow.saretail.process;

import com.cmb.cms.dataflow.saretail.api.MainProcess;
import com.cmb.cms.dataflow.saretail.beans.*;
import com.cmb.cms.dataflow.saretail.constant.Constants;
import com.cmb.cms.dataflow.saretail.func.*;
import com.cmb.cms.dataflow.saretail.source.schema.KafkaSinkSchema;
import com.cmb.cms.dataflow.saretail.source.KafkaConnector;
import com.cmb.cms.dataflow.saretail.source.schema.ZhaoHuMsgSchema;
import com.esotericsoftware.kryo.util.ObjectMap;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.utils.ParameterTool;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.codehaus.jackson.map.ObjectMapper;
import org.springframework.util.StringUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;

/**
 * Description : 业务逻辑
 * Created By Ho258399 on  2021/4/30
 */
public class Processor implements MainProcess {
    private static final Logger logger = LoggerFactory.getLogger(Processor.class);

    /**
     * 具体数据处理逻辑
     *
     * @param env
     * @param userConfig
     * @throws Exception
     */
    @Override
    public void process(StreamExecutionEnvironment env, ParameterTool userConfig) throws Exception {
        System.out.println("into process");

        //控制部分输出
        boolean isLogOut = userConfig.getBoolean(Constants.LOG_OUT_SWITCH, false);

        int popBusyMin = userConfig.getInt(Constants.POP_BUSY_MIN, 2);
        String contentBusyFormWork = userConfig.get(Constants.CONTENT_BUSY_FORMWORK);
        int popFollowNum = userConfig.getInt(Constants.POP_FOLLOWUP_NUM, 10);
        String contentFollowUpFormWork = userConfig.get(Constants.CONTENT_FOLLOWUP_FORMWORK);
        int idlenessSec = userConfig.getInt(Constants.IDLENESS_SEC, 10);
        //kafka 配置获取
        String kafkaConsumerServers = userConfig.get(Constants.CONSUMER_BOOTSTRAP_SERVERS);
        List<String> sourceTopicList = Arrays.asList(userConfig.get(Constants.SOURCE_TOPICS).split(","));
        String autoResetOffset = userConfig.get(Constants.AUTO_RESET_OFFSET, "latest");
        String consumerUserName = userConfig.get(Constants.CONSUMER_USER_NAME);
        String consumer_Password = userConfig.get(Constants.CONSUMER_PASSWORD);
        String groupID = userConfig.get(Constants.GROUP_ID);

        String sinkTopic = userConfig.get(Constants.TARGET_TOPIC);
        String kafkaProducerServers = userConfig.get(Constants.PRODUCER_BOOTSTRAP_SERVERS);
        String producerUserName = userConfig.get(Constants.PRODUCER_USER_NAME);
        String producerPassword = userConfig.get(Constants.PRODUCER_PASSWORD);


        //获取外呼的数据
        FlinkKafkaConsumer<ZhaoHuInfo> flinkKafkaConsumer = new KafkaConnector<ZhaoHuInfo>()
                .buildConsumerSource(kafkaConsumerServers, sourceTopicList, autoResetOffset
                        , consumerUserName, consumer_Password, groupID, null,
                        new ZhaoHuMsgSchema());
        DataStream<ZhaoHuInfo> zhaohuDataStream = env.addSource(flinkKafkaConsumer);
        //分流
        OutputTag<ZhaoHuInfo> busyStreamTag = new OutputTag<ZhaoHuInfo>("busyTime"){};
        OutputTag<ZhaoHuInfo> followUpStreamTag = new OutputTag<ZhaoHuInfo>("followUp"){};

        SingleOutputStreamOperator<ZhaoHuInfo> outputStream = zhaohuDataStream
                .filter(x -> !StringUtils.isEmpty(x.getTime()))
                .filter(x -> !StringUtils.isEmpty(x.getAgentId()))
                .process(new ProcessFunction<ZhaoHuInfo, ZhaoHuInfo>() {
                    @Override
                    public void processElement(ZhaoHuInfo zh, Context ctx, Collector<ZhaoHuInfo> collector) throws Exception {
                        //登录 示忙 表闲状态流
                        if ("MCCP_agentStatus".equalsIgnoreCase(zh.getMsgType())
                                && Integer.valueOf(0).equals(zh.getLocation())) {
                            ctx.output(busyStreamTag, zh);
                        }
                        //待跟随
                        if ("MCCP_createSession".equalsIgnoreCase(zh.getMsgType())
                                && "KB".equalsIgnoreCase(zh.getChannel())
                                && "N".equalsIgnoreCase(zh.getSessionUpdate()) && "true".equalsIgnoreCase(zh.getSettingFollowUp())
                                && "false".equalsIgnoreCase(zh.getForwardSession())) {
                            ctx.output(followUpStreamTag, zh);
                        }
                    }
                });


        DataStream<ZhaoHuInfo> busyStream = outputStream.getSideOutput(busyStreamTag)
                .assignTimestampsAndWatermarks(WatermarkStrategy.<ZhaoHuInfo>forBoundedOutOfOrderness(Duration.ofSeconds(1))
                .withTimestampAssigner((line, recordTimes) -> line.getEventTime()).withIdleness(Duration.ofSeconds(idlenessSec)));
        busyStream.print("busyStream");
        SingleOutputStreamOperator<PopMsg> processBusyStream = busyStream.keyBy(new KeySelector<ZhaoHuInfo, String>() {

            @Override
            public String getKey(ZhaoHuInfo zh) throws Exception {
                return zh.getAgentId();
            }
        }).process(new PopBusyFunc(popBusyMin, contentBusyFormWork));
        processBusyStream.addSink(new KafkaConnector<PopMsg>().buildProducerSink12(sinkTopic, kafkaProducerServers, producerUserName, producerPassword, "N",
                new KafkaSinkSchema(sinkTopic)));

        processBusyStream.print("busySINK");
        //待跟随
        DataStream<ZhaoHuInfo> followUpStream = outputStream.getSideOutput(followUpStreamTag)
                .assignTimestampsAndWatermarks(WatermarkStrategy.<ZhaoHuInfo>forBoundedOutOfOrderness(Duration.ofSeconds(1))
                        .withTimestampAssigner((line, recordTimes) -> line.getEventTime()).withIdleness(Duration.ofSeconds(idlenessSec)));
        followUpStream.print("followUpStream");
        SingleOutputStreamOperator<PopMsg> processFollowUpStream
                = followUpStream.keyBy(new KeySelector<ZhaoHuInfo, String>() {
            @Override
            public String getKey(ZhaoHuInfo zhaoHuInfo) throws Exception {
                return zhaoHuInfo.getAgentId();
            }
        }).process(new PopFollowUpFunc(popFollowNum, contentFollowUpFormWork));
        processFollowUpStream.addSink(new KafkaConnector<PopMsg>().buildProducerSink12(sinkTopic, kafkaProducerServers, producerUserName, producerPassword, "N",
                new KafkaSinkSchema(sinkTopic)));
        processBusyStream.print("followUpSINK");
        env.execute("Busy_FollowUp_POP");
    }
}

package com.cmb.cms.dataflow.saretail.func;

import com.cmb.cms.dataflow.saretail.beans.PopMsg;
import com.cmb.cms.dataflow.saretail.beans.ZhaoHuInfo;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.util.Iterator;


/**
 * Description : 描述
 * Created By Ho258399 on  2022/5/7
 */
public class PopBusyFunc extends KeyedProcessFunction<String, ZhaoHuInfo, PopMsg> {

    //设置的阈值
    private int popMin;
    private String formWork;

    public PopBusyFunc(int popMin, String formWork) {
        this.popMin = popMin;
        this.formWork = formWork;
    }

    // 定义状态,保存当前的总count值
    MapState<String, PopMsg> stateMap;

    MapState<String, Integer> lastSatusMap;


    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<PopMsg> out) throws Exception {
        System.out.println("定时器触发!");
        //到达触发时间,遍历拿到与 timestamp一样的记录
        Iterable<PopMsg> values = stateMap.values();
        Iterator<PopMsg> iterator = values.iterator();
        while (iterator.hasNext()) {
            PopMsg next = iterator.next();
            if (next.getEventTime() + popMin*60000 == timestamp) {
                System.out.println("定时器触发,并且移除上一次的状态");
                lastSatusMap.put(next.getAgentId(), -1);
                System.out.println(next.toString());
                out.collect(next);
            }
        }


    }

    @Override
    public void open(Configuration parameters) throws Exception {
        stateMap = getRuntimeContext().getMapState(new MapStateDescriptor<String, PopMsg>("busy-state", String.class, PopMsg.class));
        lastSatusMap = getRuntimeContext().getMapState(new MapStateDescriptor<String, Integer>("agent-status", String.class, Integer.class));
        super.open(parameters);
    }

    /**
     * 每来一个进线成功,就开启一个定时器,在定时器触发之前,接收到离线,就删除定时器
     *
     * @param value
     * @param ctx
     * @param out
     * @throws Exception
     */
    @Override
    public void processElement(ZhaoHuInfo value, Context ctx, Collector<PopMsg> out) throws Exception {
        Integer lastStatus = lastSatusMap.get(value.getAgentId());//取上一次的状态
        System.out.println("上一次的状态: "+lastStatus);
        lastSatusMap.put(value.getAgentId(),value.getStatus());
        if(lastStatus==null){
            lastStatus= -1;
        }
        //连续相同的状态不注册定时器
        if(value.getStatus().equals(2)&&Integer.valueOf(2).equals(lastStatus)){
            return;
        }
        //判断示忙,如果上一次状态是示忙直接过滤掉当次
        if (value.getStatus().equals(2)) {
            Long eventTime = value.getEventTime();
            PopMsg popMsg = getPopMsg(value);
            stateMap.put(value.getAgentId(), popMsg);
            long l = eventTime + popMin * 60000;
            System.out.println("注册定时器!"+l);
            ctx.timerService().registerEventTimeTimer(l);
        } else  { //判断离线
            //获取之前存过的定时器
            String agentId = value.getAgentId();
            PopMsg popMsg = stateMap.get(agentId);
            if(popMsg!=null){
                Long lastTimer = stateMap.get(agentId).getEventTime() + popMin*60000;
                //删除定时器
                System.out.println("删除定时器!");
                ctx.timerService().deleteEventTimeTimer(lastTimer);
            }
        }
    }


    /**
     * 生成弹屏数据
     *
     * @param value
     * @return
     */
    private PopMsg getPopMsg(ZhaoHuInfo value) {
        PopMsg popMsg = new PopMsg();
        popMsg.setAgentId(value.getAgentId());
        popMsg.setChannel(0);
        popMsg.setStartTime(value.getDateTime());
        popMsg.setEventTime(value.getEventTime());
        String content = formWork.replace("~",String.valueOf(popMin));
        popMsg.setContent(content);
        return popMsg;

    }
}
package com.cmb.cms.dataflow.saretail.func;

import com.cmb.cms.dataflow.saretail.beans.FollowUpAlarmInfo;
import com.cmb.cms.dataflow.saretail.beans.PopMsg;
import com.cmb.cms.dataflow.saretail.beans.ZhaoHuInfo;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.time.LocalDate;
import java.time.format.DateTimeFormatter;


/**
 * Description : 描述
 * Created By Ho258399 on  2022/5/7
 */
public class PopFollowUpFunc extends KeyedProcessFunction<String, ZhaoHuInfo, PopMsg> {

    //设置的阈值
    private int popNum;
    private String formWork;

    public PopFollowUpFunc(int popNum, String formWork) {
        this.popNum = popNum;
        this.formWork = formWork;
    }

    // 定义状态,保存当前的总count值
    MapState<String, FollowUpAlarmInfo> numMap;


    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<PopMsg> out) throws Exception {

    }

    @Override
    public void open(Configuration parameters) throws Exception {
        numMap = getRuntimeContext().getMapState(new MapStateDescriptor<String, FollowUpAlarmInfo>("followUp-info", String.class, FollowUpAlarmInfo.class));
        super.open(parameters);
    }

    /**
     * 每来一个进线成功,就开启一个定时器,在定时器触发之前,接收到离线,就删除定时器
     *
     * @param value
     * @param ctx
     * @param out
     * @throws Exception
     */
    @Override
    public void processElement(ZhaoHuInfo value, Context ctx, Collector<PopMsg> out) throws Exception {
        String agentId = value.getAgentId();
        String todayKey = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))+"_"+agentId;
        System.out.println(todayKey);
        //说明今日已经告警
        if(numMap.get(todayKey)==null){
            numMap.put(todayKey, new FollowUpAlarmInfo(1,false));
            return;
        }
        FollowUpAlarmInfo followUpAlarmInfo = numMap.get(todayKey);
        followUpAlarmInfo.setCount(followUpAlarmInfo.getCount()+1);
        //未告警,并且已经大于阈值,告警一次
        if(Boolean.FALSE.equals(followUpAlarmInfo.isAlreadyPop()) && (followUpAlarmInfo.getCount() >= popNum)){
            System.out.println("触发告警: "+followUpAlarmInfo);
            followUpAlarmInfo.setAlreadyPop(true);
            PopMsg popMsg = getPopMsg(value);
            out.collect(popMsg);
        }else {
            System.out.println("未触发告警: "+followUpAlarmInfo);
        }
        numMap.put(todayKey,followUpAlarmInfo);
    }


    /**
     * 生成弹屏数据
     *
     * @param value
     * @return
     */
    private PopMsg getPopMsg(ZhaoHuInfo value) {
        PopMsg popMsg = new PopMsg();
        popMsg.setAgentId(value.getAgentId());
        popMsg.setChannel(0);
        popMsg.setStartTime(value.getDateTime());
        popMsg.setEventTime(value.getEventTime());
        String content = formWork.replace("~",String.valueOf(popNum));
        popMsg.setContent(content);
        return popMsg;

    }
}