Flink Job 通过Java代码提交报错,java.io.FileNotFoundException: /tmp/executionGraphStore-94c8652f-8743-4ba5-b138-21c437af0250/997044085d23299e9580d424b5b72559 (No such file or directory)
```java
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.deployment.StandaloneClusterId;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import java.io.File;
import java.util.concurrent.CompletableFuture;
public class FlinkJobSumbit3 {
public static void main(String[] args) {
String jarFilePath = "D:\\BaiduNetdiskDownload\\CDC-1.0-SNAPSHOT-jar-with-dependencies.jar";
RestClusterClient client = null;
try {
// 集群信息
Configuration configuration = new Configuration();
configuration.setString(JobManagerOptions.ADDRESS, "172.16.2.131");
configuration.setInteger(JobManagerOptions.PORT, 6123);
configuration.setInteger(RestOptions.PORT, 8081);
client = new RestClusterClient(configuration, StandaloneClusterId.getInstance());
int parallelism = 1;
File jarFile=new File(jarFilePath);
SavepointRestoreSettings savepointRestoreSettings=SavepointRestoreSettings.none();
PackagedProgram program = PackagedProgram.newBuilder()
.setConfiguration(configuration)
.setEntryPointClassName("FlinkCDCToMySQL")
.setJarFile(jarFile)
.setArguments("[{\"TableId\":1292101841059840,\"TableName\":\"sys_job_collect_copy_copy1\",\"StepType\":\"read\",\"Model\":\"Full\",\"DataSourceId\":1305401857409024},{\"TableId\":1305401903546382,\"TableName\":\"sys_job_collect_copy_copy1_copy1\",\"StepType\":\"writer\",\"DataSourceId\":1305401857409024}]").build();
//.setSavepointRestoreSettings(savepointRestoreSettings).build();
JobGraph jobGraph= PackagedProgramUtils.createJobGraph(program,configuration,parallelism,false);
CompletableFuture result = client.submitJob(jobGraph);
JobID jobId= result.get();
System.out.println("提交完成");
System.out.println("jobId:"+ jobId.toString());
} catch (Exception e) {
e.printStackTrace();
}
}
}
报错信息:
2023-03-06 10:45:15,324 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received JobGraph submission 997044085d23299e9580d424b5b72559 (FlinkCDCWithKaPro).
2023-03-06 10:45:15,324 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Submitting job 997044085d23299e9580d424b5b72559 (FlinkCDCWithKaPro).
2023-03-06 10:45:15,326 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/rpc/jobmanager_36 .
2023-03-06 10:45:15,326 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Initializing job FlinkCDCWithKaPro (997044085d23299e9580d424b5b72559).
2023-03-06 10:45:15,327 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Using restart back off time strategy NoRestartBackoffTimeStrategy for FlinkCDCWithKaPro (997044085d23299e9580d424b5b72559).
2023-03-06 10:45:15,328 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job 997044085d23299e9580d424b5b72559 reached globally terminal state FAILED.
2023-03-06 10:45:15,328 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Could not store completed job FlinkCDCWithKaPro(997044085d23299e9580d424b5b72559).
**java.io.FileNotFoundException: /tmp/executionGraphStore-94c8652f-8743-4ba5-b138-21c437af0250/997044085d23299e9580d424b5b72559 (No such file or directory)**
at java.io.FileOutputStream.open0(Native Method) ~[?:1.8.0_333]
at java.io.FileOutputStream.open(FileOutputStream.java:270) ~[?:1.8.0_333]
at java.io.FileOutputStream.(FileOutputStream.java:213) ~[?:1.8.0_333]
at java.io.FileOutputStream.(FileOutputStream.java:162) ~[?:1.8.0_333]
at org.apache.flink.runtime.dispatcher.FileArchivedExecutionGraphStore.storeArchivedExecutionGraph(FileArchivedExecutionGraphStore.java:276) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.runtime.dispatcher.FileArchivedExecutionGraphStore.put(FileArchivedExecutionGraphStore.java:198) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.runtime.dispatcher.Dispatcher.archiveExecutionGraph(Dispatcher.java:855) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.runtime.dispatcher.Dispatcher.jobReachedGloballyTerminalState(Dispatcher.java:848) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.runtime.dispatcher.Dispatcher.handleDispatcherJobResult(Dispatcher.java:446) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:423) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) ~[?:1.8.0_333]
at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811) ~[?:1.8.0_333]
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) ~[?:1.8.0_333]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.12-1.12.2.jar:1.12.2]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.12-1.12.2.jar:1.12.2]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-dist_2.12-1.12.2.jar:1.12.2]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-dist_2.12-1.12.2.jar:1.12.2]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.12-1.12.2.jar:1.12.2]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.12-1.12.2.jar:1.12.2]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.12.2.jar:1.12.2]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.12.2.jar:1.12.2]
at akka.actor.Actor.aroundReceive(Actor.scala:517) [flink-dist_2.12-1.12.2.jar:1.12.2]
at akka.actor.Actor.aroundReceive$(Actor.scala:515) [flink-dist_2.12-1.12.2.jar:1.12.2]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.12-1.12.2.jar:1.12.2]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.12-1.12.2.jar:1.12.2]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.12-1.12.2.jar:1.12.2]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.12-1.12.2.jar:1.12.2]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.12-1.12.2.jar:1.12.2]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.12-1.12.2.jar:1.12.2]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.12.2.jar:1.12.2]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.12.2.jar:1.12.2]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.12.2.jar:1.12.2]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.12.2.jar:1.12.2]
```
参考:
检查 Flink 的配置,确保 executionGraphStore 相关的配置项正确设置。
确认 Flink 安装包中是否包含 /tmp/executionGraphStore-94c8652f-8743-4ba5-b138-21c437af0250 目录。如果没有,则手动创建该目录即可。
如果以上两个方法都不行,那么尝试重新安装 Flink 或者在官网中寻找解决方法。