dolphinscheduler调度flink,使用on yarn application模式部署flink jar到yarn,任务被提交后工作流节点状态就会改为成功,但是yarn上的任务实际执行失败

dolphinscheduler调度flink,使用on yarn application模式部署flink jar到yarn,任务被提交后工作流节点状态就会改为成功,但是yarn上的任务实际执行失败
如何让dolphinscheduler获取yarn上任务实际执行结果后再变更节点状态,执行下一节点,原dolphinscheduler是否就支撑,导致这种问题的原因是否是我少增加了配置 或者flink的参数问题,或者yarn集群配置的问题

img

详解Flink中yarn部署模式以及测试
可以参考下

你这个截图看起来没有具体的错误信息,都是一些正常的启动信息。如果DolphinScheduler支持Yarn应用程序的状态检查,可以看下是否在Flink作业的配置中启用该功能。还有就是能否启用Yarn应用程序状态回调函数。

检查一下Yarn应用程序状态回调函数是否能启动

一下输出来源于ChatGPT4
如果您的DolphinScheduler任务被提交后工作流节点状态就会改为成功,但是Yarn上的任务实际执行失败,那么可能是Yarn上的任务没有成功启动或者因为一些原因导致了失败。为了获取Yarn上任务的实际执行结果,可以通过以下步骤来实现:

首先,在提交任务到Yarn时,可以指定保存日志的目录,在提交完任务后可以通过查看该目录下的日志文件来查看任务的运行状态和异常信息。

可以使用Yarn提供的Web UI来查看任务的实际执行结果和状态。可以通过访问Yarn ResourceManager的Web UI来查看正在运行的任务列表、已完成的任务列表、已失败的任务列表等。

在需要等待Yarn任务完成的节点之后,可以在DolphinScheduler中插入一个脚本节点,在脚本节点中通过调用Yarn REST API来获取任务的状态。根据任务状态和异常信息来判断任务的执行结果,并将结果反馈给DolphinScheduler。至此,DolphinScheduler就能获取到Yarn上任务的实际执行结果,并据此更改工作流节点状态,执行下一节点。

需要注意的是,在思考解决方案之前,需要先确定Yarn上任务失败的具体原因,并对异常进行排查和解决

可以通过以下步骤实现让DolphinScheduler获取YARN上任务的执行结果,然后再变更节点状态,执行下一节点。

  1. 在DolphinScheduler中创建YARN任务,并配置好相关参数,包括YARN队列、作业类型、主类名、Jar包路径、参数等。
  2. 启动任务并提交到YARN集群,在YARN中运行该任务。此时,YARN会为该任务分配一个Application ID(也称为Application Attempt ID)。
  3. 在DolphinScheduler中,使用YarnJobType类来监控YARN任务的执行情况,包括任务开始时间、结束时间、状态等。
  4. 当YARN任务执行完成后,从YARN中获取该任务的执行结果,并将结果保存到DolphinScheduler中。可以使用YarnClient类的getApplicationReport()方法来获取应用程序报告,包括任务的状态和最终状态,然后将任务的执行结果更新到DolphinScheduler中。
  5. 根据任务的执行结果,修改节点的状态,执行下一节点。可以使用DolphinScheduler提供的Java API实现这一功能。

以下是一个示例代码片段,展示如何在DolphinScheduler中获取YARN任务的执行结果并更新节点状态:

// 获取DolphinScheduler任务信息
TaskNode taskNode = taskNodeMapper.queryByCode(taskCode);

// 获取任务关联的应用程序ID
String applicationId = dagXxlJobInfo.getApplicationId();

// 从YARN获取任务的执行报告
YarnConfiguration yarnConf = new YarnConfiguration();
YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(yarnConf);
yarnClient.start();

ApplicationReport appReport = yarnClient.getApplicationReport(
        ConverterUtils.toApplicationId(applicationId));
YarnApplicationState appState = appReport.getYarnApplicationState();

// 根据任务执行结果修改节点状态
if (appState.equals(YarnApplicationState.FINISHED)) {
    // 任务执行成功,将任务的执行结果更新到DolphinScheduler中
    String taskResult = getTaskResultFromYARN(applicationId);
    taskNode.setRes(taskResult);
    taskNodeMapper.updateById(taskNode);

    // 修改节点状态,执行下一节点
    processDao.changeTaskStatusAndPostEvent(taskNode, Constants.TASK_EXEC_SUCCESS, null);
} else if (appState.equals(YarnApplicationState.KILLED)
        || appState.equals(YarnApplicationState.FAILED)) {
    // 任务执行失败,将任务的执行结果更新到DolphinScheduler中
    String taskResult = getTaskResultFromYARN(applicationId);
    taskNode.setRes(taskResult);
    taskNodeMapper.updateById(taskNode);

    // 修改节点状态,停止后续节点的执行
    processDao.changeTaskStatusAndPostEvent(taskNode, Constants.TASK_KILLED, null);
}

在这个示例中,我们使用YarnClient类从YARN获取应用程序报告,并根据任务的状态来更新DolphinScheduler中节点的状态。如果任务执行成功,则将任务的执行结果保存到DolphinScheduler中,并执行下一节点;如果任务执行失败,则将任务的执行结果保存到DolphinScheduler中,并停止后续节点的执行。

检查你的dolphinscheduler和flink的参数配置

  1. DolphinScheduler配置的YARN Application ID错误或者缺失
  2. DolphinScheduler配置的Flink REST Endpoint错误或者缺失
  3. DolphinScheduler由于以上错误,无法正确监控Flink任务在YARN上的实际运行状态