```package com.cmb.cms.dataflow;
import com.cmb.bdp1.utils.CaseConf;
import com.cmb.cms.dataflow.saretail.api.MainProcess;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.Iterator;
import scala.collection.immutable.Map;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.Properties;
public class MainEntrance {
private static final Logger logger = LoggerFactory.getLogger(MainEntrance.class);
public static void main(String[] args) throws Exception {
//加载配置
if (args.length < 1) {
logger.error("请传入配置文件路径");
System.exit(1);
}
String confPath = args[0];
// Properties userConfig = loadConfig(confPath);
// properties 配置文件抽象 RDM提供的配置类未实现序列化接口,这里使用flink 提供的 ParameterTool
CaseConf userConfig = new CaseConf(confPath);
Map<String, String> allFrameConfMap = userConfig.getAll();
java.util.Map<String,String> myConfMap = new java.util.HashMap<>();
Iterator<String> keys = allFrameConfMap.keysIterator();
while(keys.hasNext()){
String key = keys.next();
String value = userConfig.get(key, "");
myConfMap.put(key,value);
}
ParameterTool parameters = ParameterTool.fromMap(myConfMap);
parameters.getConfiguration().toMap().forEach((key, value) -> System.out.println("<" + key + "," + value + ">"));
try {
MainProcess cusProcess = (MainProcess) Class.forName(parameters.get("framework.streaming.busImpClassName")).newInstance();
//是否是本地运行环境
Boolean isLocal = parameters.getBoolean("app.run.local",false);
StreamExecutionEnvironment env;
if (isLocal) {
System.out.println("env local");
env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new org.apache.flink.configuration.Configuration());
} else {
System.out.println("env yarn");
env = StreamExecutionEnvironment.getExecutionEnvironment();
}
//初始化检查点
System.out.println("start initCheckpoint");
cusProcess.initCheckpoint(env, parameters);
//用户逻辑
System.out.println("start process");
cusProcess.setRestartStrategy(env,parameters);
cusProcess.process(env, parameters);
} catch (Exception e) {
logger.error("作业任务执行失败", e);
System.exit(1);
}
}
public static Properties loadConfig(String confPath) {
Properties userConfig = new Properties();
try {
if (confPath.startsWith("hdfs")) {
Configuration hdfsConfig = new Configuration();
Path path = new Path(confPath);
FileSystem fileSystem = path.getFileSystem(hdfsConfig);
try (FSDataInputStream fsInput = fileSystem.open(path);) {
userConfig.load(fsInput);
System.out.println("加载HDFS配置文件成功!");
}
} else {
try (InputStream input = new FileInputStream(new File(confPath))) {
userConfig.load(input);
System.out.println("加载本地配置文件成功!");
} catch (Exception e) {
System.out.println("加载配置文件出错");
System.exit(1);
}
}
} catch (Exception e) {
logger.error("配置文件加载错误", e);
System.exit(1);
}
return userConfig;
}
}
package com.cmb.cms.dataflow.saretail.api;
import com.cmb.cms.dataflow.saretail.constant.Constants;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.time.Time;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import static com.cmb.cms.dataflow.saretail.constant.Constants.FAILURE_RATE;
import static com.cmb.cms.dataflow.saretail.constant.Constants.FIXED_DELAY;
/**
* Description : 检查点设置
* Created By Ho258399 on 2021/4/30
*/
public interface MainProcess {
//check point 设置
default void initCheckpoint(StreamExecutionEnvironment env, ParameterTool userConfig) throws IOException {
String checkpointSwitch = userConfig.get("checkpoint.switch");
if (!Constants.Y.equals(checkpointSwitch)) {
return;
}
String checkpointPath = userConfig.get("statebackend.save.dir");
if (StringUtils.isBlank(checkpointPath)) {
env.setStateBackend(new MemoryStateBackend());
} else {
if (Constants.CHECKPOINT_FS.equals(userConfig.get("statebackend.type"))) {
env.setStateBackend(new FsStateBackend(checkpointPath));
} else {
env.setStateBackend(new RocksDBStateBackend(checkpointPath));
}
}
//启用检查点并每秒保存一次,单位:毫秒;
env.enableCheckpointing(Long.valueOf(userConfig.get("checkpoint.interval.ms", "60000")));
//设置两次检查点操作之间的最小时间间隔,避免因为checkpoint操作给应用运行带来过大压力
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(Long.valueOf(userConfig.get("checkpoint.min.pause.ms", "5000")));
//设置保存点模式为恰好一次
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//取消时保留检查点
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
}
/**
* 设置重启策略,在启用了checkpoint时会从最近的一个检查点恢复应用
* 1.固定延迟重启策略
* - 最大重启次数默认为3次
* - 重启延迟时间默认为30s
* 2.故障率重启策略
* - 计算故障率的时间区间默认30分钟
* - 区间内最大容忍的失败次数
* - 重启延迟时间默认为30s
*
* @param env
* @param userConfig
*/
default void setRestartStrategy(StreamExecutionEnvironment env, ParameterTool userConfig) { //默认是只允许失败3次,第4次失败不会重启,就会由平台拉起了
if (userConfig.get(Constants.RESTART_STRATEGY).equalsIgnoreCase(FIXED_DELAY)) {
int attempts = userConfig.getInt(Constants.FIXED_DELAY_ATTEMPTS, 3);
long delayTime = userConfig.getLong(Constants.FIXED_DELAY_DELAY, 30);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(attempts, Time.of(delayTime, TimeUnit.SECONDS)));
} else if (userConfig.get(Constants.RESTART_STRATEGY).equalsIgnoreCase(FAILURE_RATE)) { //默认是半小时允许失败3次,半小时内第4次失败不会重启,就会由平台拉起了
int maxFailuresPerInterval = userConfig.getInt(Constants.FAILURE_RATE_MAX_FAILURES, 3);
long intervalTime = userConfig.getInt(Constants.FAILURE_RATE_INTERVAL, 30); //分钟级
long delayTime = userConfig.getLong(Constants.FAILURE_RATE_DELAY, 30); //秒
env.setRestartStrategy(RestartStrategies.failureRateRestart(
maxFailuresPerInterval, Time.of(intervalTime, TimeUnit.MINUTES), Time.of(delayTime, TimeUnit.SECONDS))
);
} else {
System.out.println("使用平台默认的重启策略");
}
}
void process(StreamExecutionEnvironment env, ParameterTool userConfig) throws Exception;
}
package com.cmb.cms.dataflow.saretail.process;
import com.cmb.cms.dataflow.saretail.api.MainProcess;
import com.cmb.cms.dataflow.saretail.beans.*;
import com.cmb.cms.dataflow.saretail.constant.Constants;
import com.cmb.cms.dataflow.saretail.func.*;
import com.cmb.cms.dataflow.saretail.source.schema.KafkaSinkSchema;
import com.cmb.cms.dataflow.saretail.source.KafkaConnector;
import com.cmb.cms.dataflow.saretail.source.schema.ZhaoHuMsgSchema;
import com.esotericsoftware.kryo.util.ObjectMap;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.codehaus.jackson.map.ObjectMapper;
import org.springframework.util.StringUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
/**
* Description : 业务逻辑
* Created By Ho258399 on 2021/4/30
*/
public class Processor implements MainProcess {
private static final Logger logger = LoggerFactory.getLogger(Processor.class);
/**
* 具体数据处理逻辑
*
* @param env
* @param userConfig
* @throws Exception
*/
@Override
public void process(StreamExecutionEnvironment env, ParameterTool userConfig) throws Exception {
System.out.println("into process");
//控制部分输出
boolean isLogOut = userConfig.getBoolean(Constants.LOG_OUT_SWITCH, false);
int popBusyMin = userConfig.getInt(Constants.POP_BUSY_MIN, 2);
String contentBusyFormWork = userConfig.get(Constants.CONTENT_BUSY_FORMWORK);
int popFollowNum = userConfig.getInt(Constants.POP_FOLLOWUP_NUM, 10);
String contentFollowUpFormWork = userConfig.get(Constants.CONTENT_FOLLOWUP_FORMWORK);
int idlenessSec = userConfig.getInt(Constants.IDLENESS_SEC, 10);
//kafka 配置获取
String kafkaConsumerServers = userConfig.get(Constants.CONSUMER_BOOTSTRAP_SERVERS);
List<String> sourceTopicList = Arrays.asList(userConfig.get(Constants.SOURCE_TOPICS).split(","));
String autoResetOffset = userConfig.get(Constants.AUTO_RESET_OFFSET, "latest");
String consumerUserName = userConfig.get(Constants.CONSUMER_USER_NAME);
String consumer_Password = userConfig.get(Constants.CONSUMER_PASSWORD);
String groupID = userConfig.get(Constants.GROUP_ID);
String sinkTopic = userConfig.get(Constants.TARGET_TOPIC);
String kafkaProducerServers = userConfig.get(Constants.PRODUCER_BOOTSTRAP_SERVERS);
String producerUserName = userConfig.get(Constants.PRODUCER_USER_NAME);
String producerPassword = userConfig.get(Constants.PRODUCER_PASSWORD);
//获取外呼的数据
FlinkKafkaConsumer<ZhaoHuInfo> flinkKafkaConsumer = new KafkaConnector<ZhaoHuInfo>()
.buildConsumerSource(kafkaConsumerServers, sourceTopicList, autoResetOffset
, consumerUserName, consumer_Password, groupID, null,
new ZhaoHuMsgSchema());
DataStream<ZhaoHuInfo> zhaohuDataStream = env.addSource(flinkKafkaConsumer);
//分流
OutputTag<ZhaoHuInfo> busyStreamTag = new OutputTag<ZhaoHuInfo>("busyTime"){};
OutputTag<ZhaoHuInfo> followUpStreamTag = new OutputTag<ZhaoHuInfo>("followUp"){};
SingleOutputStreamOperator<ZhaoHuInfo> outputStream = zhaohuDataStream
.filter(x -> !StringUtils.isEmpty(x.getTime()))
.filter(x -> !StringUtils.isEmpty(x.getAgentId()))
.process(new ProcessFunction<ZhaoHuInfo, ZhaoHuInfo>() {
@Override
public void processElement(ZhaoHuInfo zh, Context ctx, Collector<ZhaoHuInfo> collector) throws Exception {
//登录 示忙 表闲状态流
if ("MCCP_agentStatus".equalsIgnoreCase(zh.getMsgType())
&& Integer.valueOf(0).equals(zh.getLocation())) {
ctx.output(busyStreamTag, zh);
}
//待跟随
if ("MCCP_createSession".equalsIgnoreCase(zh.getMsgType())
&& "KB".equalsIgnoreCase(zh.getChannel())
&& "N".equalsIgnoreCase(zh.getSessionUpdate()) && "true".equalsIgnoreCase(zh.getSettingFollowUp())
&& "false".equalsIgnoreCase(zh.getForwardSession())) {
ctx.output(followUpStreamTag, zh);
}
}
});
DataStream<ZhaoHuInfo> busyStream = outputStream.getSideOutput(busyStreamTag)
.assignTimestampsAndWatermarks(WatermarkStrategy.<ZhaoHuInfo>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((line, recordTimes) -> line.getEventTime()).withIdleness(Duration.ofSeconds(idlenessSec)));
busyStream.print("busyStream");
SingleOutputStreamOperator<PopMsg> processBusyStream = busyStream.keyBy(new KeySelector<ZhaoHuInfo, String>() {
@Override
public String getKey(ZhaoHuInfo zh) throws Exception {
return zh.getAgentId();
}
}).process(new PopBusyFunc(popBusyMin, contentBusyFormWork));
processBusyStream.addSink(new KafkaConnector<PopMsg>().buildProducerSink12(sinkTopic, kafkaProducerServers, producerUserName, producerPassword, "N",
new KafkaSinkSchema(sinkTopic)));
processBusyStream.print("busySINK");
//待跟随
DataStream<ZhaoHuInfo> followUpStream = outputStream.getSideOutput(followUpStreamTag)
.assignTimestampsAndWatermarks(WatermarkStrategy.<ZhaoHuInfo>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((line, recordTimes) -> line.getEventTime()).withIdleness(Duration.ofSeconds(idlenessSec)));
followUpStream.print("followUpStream");
SingleOutputStreamOperator<PopMsg> processFollowUpStream
= followUpStream.keyBy(new KeySelector<ZhaoHuInfo, String>() {
@Override
public String getKey(ZhaoHuInfo zhaoHuInfo) throws Exception {
return zhaoHuInfo.getAgentId();
}
}).process(new PopFollowUpFunc(popFollowNum, contentFollowUpFormWork));
processFollowUpStream.addSink(new KafkaConnector<PopMsg>().buildProducerSink12(sinkTopic, kafkaProducerServers, producerUserName, producerPassword, "N",
new KafkaSinkSchema(sinkTopic)));
processBusyStream.print("followUpSINK");
env.execute("Busy_FollowUp_POP");
}
}
package com.cmb.cms.dataflow.saretail.func;
import com.cmb.cms.dataflow.saretail.beans.PopMsg;
import com.cmb.cms.dataflow.saretail.beans.ZhaoHuInfo;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.util.Iterator;
/**
* Description : 描述
* Created By Ho258399 on 2022/5/7
*/
public class PopBusyFunc extends KeyedProcessFunction<String, ZhaoHuInfo, PopMsg> {
//设置的阈值
private int popMin;
private String formWork;
public PopBusyFunc(int popMin, String formWork) {
this.popMin = popMin;
this.formWork = formWork;
}
// 定义状态,保存当前的总count值
MapState<String, PopMsg> stateMap;
MapState<String, Integer> lastSatusMap;
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<PopMsg> out) throws Exception {
System.out.println("定时器触发!");
//到达触发时间,遍历拿到与 timestamp一样的记录
Iterable<PopMsg> values = stateMap.values();
Iterator<PopMsg> iterator = values.iterator();
while (iterator.hasNext()) {
PopMsg next = iterator.next();
if (next.getEventTime() + popMin*60000 == timestamp) {
System.out.println("定时器触发,并且移除上一次的状态");
lastSatusMap.put(next.getAgentId(), -1);
System.out.println(next.toString());
out.collect(next);
}
}
}
@Override
public void open(Configuration parameters) throws Exception {
stateMap = getRuntimeContext().getMapState(new MapStateDescriptor<String, PopMsg>("busy-state", String.class, PopMsg.class));
lastSatusMap = getRuntimeContext().getMapState(new MapStateDescriptor<String, Integer>("agent-status", String.class, Integer.class));
super.open(parameters);
}
/**
* 每来一个进线成功,就开启一个定时器,在定时器触发之前,接收到离线,就删除定时器
*
* @param value
* @param ctx
* @param out
* @throws Exception
*/
@Override
public void processElement(ZhaoHuInfo value, Context ctx, Collector<PopMsg> out) throws Exception {
Integer lastStatus = lastSatusMap.get(value.getAgentId());//取上一次的状态
System.out.println("上一次的状态: "+lastStatus);
lastSatusMap.put(value.getAgentId(),value.getStatus());
if(lastStatus==null){
lastStatus= -1;
}
//连续相同的状态不注册定时器
if(value.getStatus().equals(2)&&Integer.valueOf(2).equals(lastStatus)){
return;
}
//判断示忙,如果上一次状态是示忙直接过滤掉当次
if (value.getStatus().equals(2)) {
Long eventTime = value.getEventTime();
PopMsg popMsg = getPopMsg(value);
stateMap.put(value.getAgentId(), popMsg);
long l = eventTime + popMin * 60000;
System.out.println("注册定时器!"+l);
ctx.timerService().registerEventTimeTimer(l);
} else { //判断离线
//获取之前存过的定时器
String agentId = value.getAgentId();
PopMsg popMsg = stateMap.get(agentId);
if(popMsg!=null){
Long lastTimer = stateMap.get(agentId).getEventTime() + popMin*60000;
//删除定时器
System.out.println("删除定时器!");
ctx.timerService().deleteEventTimeTimer(lastTimer);
}
}
}
/**
* 生成弹屏数据
*
* @param value
* @return
*/
private PopMsg getPopMsg(ZhaoHuInfo value) {
PopMsg popMsg = new PopMsg();
popMsg.setAgentId(value.getAgentId());
popMsg.setChannel(0);
popMsg.setStartTime(value.getDateTime());
popMsg.setEventTime(value.getEventTime());
String content = formWork.replace("~",String.valueOf(popMin));
popMsg.setContent(content);
return popMsg;
}
}
package com.cmb.cms.dataflow.saretail.func;
import com.cmb.cms.dataflow.saretail.beans.FollowUpAlarmInfo;
import com.cmb.cms.dataflow.saretail.beans.PopMsg;
import com.cmb.cms.dataflow.saretail.beans.ZhaoHuInfo;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
/**
* Description : 描述
* Created By Ho258399 on 2022/5/7
*/
public class PopFollowUpFunc extends KeyedProcessFunction<String, ZhaoHuInfo, PopMsg> {
//设置的阈值
private int popNum;
private String formWork;
public PopFollowUpFunc(int popNum, String formWork) {
this.popNum = popNum;
this.formWork = formWork;
}
// 定义状态,保存当前的总count值
MapState<String, FollowUpAlarmInfo> numMap;
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<PopMsg> out) throws Exception {
}
@Override
public void open(Configuration parameters) throws Exception {
numMap = getRuntimeContext().getMapState(new MapStateDescriptor<String, FollowUpAlarmInfo>("followUp-info", String.class, FollowUpAlarmInfo.class));
super.open(parameters);
}
/**
* 每来一个进线成功,就开启一个定时器,在定时器触发之前,接收到离线,就删除定时器
*
* @param value
* @param ctx
* @param out
* @throws Exception
*/
@Override
public void processElement(ZhaoHuInfo value, Context ctx, Collector<PopMsg> out) throws Exception {
String agentId = value.getAgentId();
String todayKey = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))+"_"+agentId;
System.out.println(todayKey);
//说明今日已经告警
if(numMap.get(todayKey)==null){
numMap.put(todayKey, new FollowUpAlarmInfo(1,false));
return;
}
FollowUpAlarmInfo followUpAlarmInfo = numMap.get(todayKey);
followUpAlarmInfo.setCount(followUpAlarmInfo.getCount()+1);
//未告警,并且已经大于阈值,告警一次
if(Boolean.FALSE.equals(followUpAlarmInfo.isAlreadyPop()) && (followUpAlarmInfo.getCount() >= popNum)){
System.out.println("触发告警: "+followUpAlarmInfo);
followUpAlarmInfo.setAlreadyPop(true);
PopMsg popMsg = getPopMsg(value);
out.collect(popMsg);
}else {
System.out.println("未触发告警: "+followUpAlarmInfo);
}
numMap.put(todayKey,followUpAlarmInfo);
}
/**
* 生成弹屏数据
*
* @param value
* @return
*/
private PopMsg getPopMsg(ZhaoHuInfo value) {
PopMsg popMsg = new PopMsg();
popMsg.setAgentId(value.getAgentId());
popMsg.setChannel(0);
popMsg.setStartTime(value.getDateTime());
popMsg.setEventTime(value.getEventTime());
String content = formWork.replace("~",String.valueOf(popNum));
popMsg.setContent(content);
return popMsg;
}
}