FlinkSQL 抽取 文件系统 csv格式文件,Format配置csv.ignore-parse-errors为false时任务并没有异常终止。
csv格式数据
结果
参考GPT:根据您提供的Flink SQL代码,似乎在创建名为“transactions”的表时,您正确地将'csv.ignore-parse-errors'参数设置为false,以指示在解析CSV文件时如果遇到解析错误,则应该引发异常。
然而,您说任务并没有异常终止,这可能有几个原因:
1 您的CSV文件中可能没有解析错误,因此即使将'csv.ignore-parse-errors'设置为false,任务也不会抛出异常。
2 如果您的CSV文件中确实存在解析错误,但错误是可恢复的,即解析器可以跳过错误行并继续解析剩余的数据,则任务可能不会异常终止,而是仅仅跳过错误行并继续执行。
3 您的代码中可能存在其他问题导致任务没有异常终止,比如可能在其他地方捕获了异常而没有将其向上抛出。
为了确保'csv.ignore-parse-errors'参数能够正常工作,您可以在CSV文件中故意引入一些解析错误并观察任务的行为。如果任务跳过了错误行并继续执行,则表示解析错误是可恢复的;如果任务异常终止,则表示'csv.ignore-parse-errors'参数起作用。
FLink SQL 在设定各类数据源和数据目标端的时候非常方便,可以说写个建表语句就能获取各种各样的数据。但是在使用FileSystem获取某目录下的数据时需要小心使用,比如:
CREATE TABLE `cfg_city`(
`provincecode` int, `city_id` int, `city_name` string
)
WITH (
'connector'='filesystem',
'path'='hdfs://pathroot/cfg_city',
'format' = 'csv',
'csv.field-delimiter' = ',',
'csv.ignore-parse-errors' = 'true'
)
;
在程序正常运行时不会报错,但是如果程序fail and retry时,遇到hdfs://pathroot/cfg_city路径下的文件发生改变则可能报错。原因是因为Flink SQL FileSystem 是Bounded Source,在第一次运行时就记录了FileList,导致如果有新文件出现不会被读取/旧文件改变文件名会报错。
在社区汇报了该issue后,发现也有类似的需求将FileSystem Bounded Change to Stream,Link。最下面有不使用SQL的办法,但使用SQL的各位还没有类似的配置。
该回答引用于gpt与OKX安生共同编写:
Flink SQL 中的 csv.ignore-parse-errors
参数用于指定是否忽略在 CSV 文件解析过程中出现的错误。当该参数设置为 true
时,Flink 会尝试跳过格式错误的行并继续处理其他数据;而当该参数设置为 false
时,Flink 会在遇到格式错误时抛出异常,并停止当前任务。
从您提供的截图中看到,您已经将 csv.ignore-parse-errors
设置为了 false
,但任务并没有异常终止。这可能是由于在 CSV 文件中出现格式错误的行被忽略或者未被正确识别的原因导致的。
为了进一步确认问题的原因,建议您检查以下几点:
将csv.ignore-parse-errors参数设置为false,并在Flink任务的启动脚本或配置文件中启用严格模式(例如--strict命令行选项或strict-mode: true配置选项)。这将强制Flink在遇到任何异常情况时终止任务的执行。
启动严格模式方式,如下:
打开Flink的启动脚本文件(例如start-cluster.sh或start-standalone-job.sh)。
在启动命令中添加--strict选项,例如:
bin/start-cluster.sh --strict
或者在flink-conf.yaml配置文件中添加以下选项:
sql-client.strict-mode: true
这将启用严格模式,以便在遇到任何异常情况时都会终止任务的执行。
保存更改后,重新启动Flink。
在严格模式下,Flink将在任何异常情况下终止任务的执行,以避免数据质量问题。请注意,在严格模式下,某些查询可能会失败,因为它们可能会遇到不合法或不完整的数据。因此,在使用严格模式时,您应该确保您的数据源质量良好,并且数据能够被正确地解析和处理。
以下答案由GPT-3.5大模型与博主波罗歌共同编写:
在Flink SQL中,csv.ignore-parse-errors
参数仅仅会忽略解析CSV文件时的语法错误,但是语义错误仍然会导致任务抛出异常而停止执行。例如,如果你的数据有一个无效的日期格式,那么即使将csv.ignore-parse-errors
设置为false,任务仍然会失败。
以下是使用Flink SQL抽取CSV文件的示例代码,其中包括了csv.ignore-parse-errors
的配置:
CREATE TABLE MyTable (
id INT,
name STRING,
time TIMESTAMP
) WITH (
'connector' = 'filesystem',
'path' = '/path/to/csv/file',
'format' = 'csv',
'csv.ignore-parse-errors' = 'false'
);
在这个例子中,我们创建了一个名为MyTable
的表,从指定的CSV文件中读取数据。csv.ignore-parse-errors
参数设置为false,表示当CSV文件包含语法错误时,任务应该立即停止执行。因此,如果CSV文件中包含无效的日期格式等语义错误,任务会失败。
如果您仍然遇到了任务并没有异常终止的情况,可能是因为您的CSV文件中没有包含任何语法错误。你可以尝试在CSV文件中使用错误的日期格式等故意引发语义错误,以验证csv.ignore-parse-errors
参数是否被正确地应用。
如果我的回答解决了您的问题,请采纳!