flink 使用 s3 报403

使用s3 做状态后端时候 运行任务报错 ?
是不是哪里配置有问题?
有没有完整的配置参考

org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
    at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Failed to create checkpoint storage at checkpoint coordinator side.
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
    ... 7 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to create checkpoint storage at checkpoint coordinator side.
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:325)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:241)
    at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.enableCheckpointing(DefaultExecutionGraph.java:448)
    at org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:311)
    at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:107)
    at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342)
    at org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:190)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:122)
    at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132)
    at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110)
    at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340)
    at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:317)
    at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107)
    at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
    at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
    ... 7 more
Caused by: java.nio.file.AccessDeniedException: s3://test/cp/d6fd3122283c208164d8cb55e178d283/shared: getFileStatus on s3://test/cp/d6fd3122283c208164d8cb55e178d283/shared: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: null; S3 Extended Request ID: null; Proxy: null), S3 Extended Request ID: null:403 Forbidden
    at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:218)
    at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:145)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2184)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.innerMkdirs(S3AFileSystem.java:2037)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.mkdirs(S3AFileSystem.java:2007)
    at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:2326)
    at org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.mkdirs(HadoopFileSystem.java:183)
    at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.mkdirs(PluginFileSystemFactory.java:162)
    at org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess.initializeBaseLocations(FsCheckpointStorageAccess.java:113)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:323)
    ... 22 more
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: null; S3 Extended Request ID: null; Proxy: null), S3 Extended Request ID: null
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1811)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1395)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1371)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008)
    at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1338)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getObjectMetadata$4(S3AFileSystem.java:1235)
    at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
    at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:280)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:1232)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2169)
    ... 31 more

这个错误看起来是在创建检查点存储时出现了访问S3时的权限问题。S3权限与身份验证配置不正确可能是导致此类错误的原因之一。
要使用S3作为状态后端,请确保以下配置正确:
在Flink配置文件(flink-conf.yaml)中添加S3文件系统的配置,如下所示:
s3.access-key: YOUR_ACCESS_KEY
s3.secret-key: YOUR_SECRET_KEY
s3.endpoint: S3_ENDPOINT
s3.path.style.access: true
在您的任务代码中,创建一个与您的S3存储桶相对应的状态后端。例如:
final String checkpointPath = "s3://my-bucket/checkpoints";
final StateBackend stateBackend = new FsStateBackend(checkpointPath);
如果您使用的是IAM角色来访问S3,则需要确保该角色具有访问S3的权限。您可以使用Amazon的IAM控制台来管理角色的权限。
如果您确定S3身份验证和权限配置正确,但仍然遇到此错误,则可能是由于您的S3存储桶不正确地设置了访问权限。请确保存储桶设置为允许访问它的实体。
此外,还要确保Flink的版本与您使用的Hadoop版本兼容,因为Flink使用Hadoop的S3A文件系统来访问S3存储桶。您可以在Flink的文档中找到有关版本兼容性的更多信息。