FlinkSQL csv.ignore-parse-errors不起作用

FlinkSQL 抽取 文件系统 csv格式文件,Format配置csv.ignore-parse-errors为false时任务并没有异常终止。

img

csv格式数据

img

结果

img

参考GPT:根据您提供的Flink SQL代码,似乎在创建名为“transactions”的表时,您正确地将'csv.ignore-parse-errors'参数设置为false,以指示在解析CSV文件时如果遇到解析错误,则应该引发异常。

然而,您说任务并没有异常终止,这可能有几个原因:

1 您的CSV文件中可能没有解析错误,因此即使将'csv.ignore-parse-errors'设置为false,任务也不会抛出异常。

2 如果您的CSV文件中确实存在解析错误,但错误是可恢复的,即解析器可以跳过错误行并继续解析剩余的数据,则任务可能不会异常终止,而是仅仅跳过错误行并继续执行。
3 您的代码中可能存在其他问题导致任务没有异常终止,比如可能在其他地方捕获了异常而没有将其向上抛出。
为了确保'csv.ignore-parse-errors'参数能够正常工作,您可以在CSV文件中故意引入一些解析错误并观察任务的行为。如果任务跳过了错误行并继续执行,则表示解析错误是可恢复的;如果任务异常终止,则表示'csv.ignore-parse-errors'参数起作用。

FLink SQL 在设定各类数据源和数据目标端的时候非常方便,可以说写个建表语句就能获取各种各样的数据。但是在使用FileSystem获取某目录下的数据时需要小心使用,比如:

CREATE TABLE `cfg_city`(
    `provincecode` int, `city_id` int, `city_name` string
)
WITH (
    'connector'='filesystem',
    'path'='hdfs://pathroot/cfg_city',
    'format' = 'csv',
    'csv.field-delimiter' = ',',
    'csv.ignore-parse-errors' = 'true'
)
;

在程序正常运行时不会报错,但是如果程序fail and retry时,遇到hdfs://pathroot/cfg_city路径下的文件发生改变则可能报错。原因是因为Flink SQL FileSystem 是Bounded Source,在第一次运行时就记录了FileList,导致如果有新文件出现不会被读取/旧文件改变文件名会报错。

在社区汇报了该issue后,发现也有类似的需求将FileSystem Bounded Change to Stream,Link。最下面有不使用SQL的办法,但使用SQL的各位还没有类似的配置。

该回答引用于gpt与OKX安生共同编写:
  • 该回答引用于gpt与OKX安生共同编写:

Flink SQL 中的 csv.ignore-parse-errors 参数用于指定是否忽略在 CSV 文件解析过程中出现的错误。当该参数设置为 true 时,Flink 会尝试跳过格式错误的行并继续处理其他数据;而当该参数设置为 false 时,Flink 会在遇到格式错误时抛出异常,并停止当前任务。

从您提供的截图中看到,您已经将 csv.ignore-parse-errors 设置为了 false,但任务并没有异常终止。这可能是由于在 CSV 文件中出现格式错误的行被忽略或者未被正确识别的原因导致的。

为了进一步确认问题的原因,建议您检查以下几点:

  1. 检查 CSV 文件是否真的包含格式错误的行,可以通过手动检查文件或使用其他工具进行验证。
  2. 确认 Flink 是否正确加载了目标 CSV 文件。可以通过在程序中添加日志语句或调试器进行检查。
  3. 确认 Flink 配置是否正确生效。可以通过在程序中添加日志语句或调试器进行检查。
  • 如有用的话,还望采纳哦~

将csv.ignore-parse-errors参数设置为false,并在Flink任务的启动脚本或配置文件中启用严格模式(例如--strict命令行选项或strict-mode: true配置选项)。这将强制Flink在遇到任何异常情况时终止任务的执行。

启动严格模式方式,如下:
打开Flink的启动脚本文件(例如start-cluster.sh或start-standalone-job.sh)。

在启动命令中添加--strict选项,例如:

bin/start-cluster.sh --strict

或者在flink-conf.yaml配置文件中添加以下选项:

sql-client.strict-mode: true

这将启用严格模式,以便在遇到任何异常情况时都会终止任务的执行。

保存更改后,重新启动Flink。

在严格模式下,Flink将在任何异常情况下终止任务的执行,以避免数据质量问题。请注意,在严格模式下,某些查询可能会失败,因为它们可能会遇到不合法或不完整的数据。因此,在使用严格模式时,您应该确保您的数据源质量良好,并且数据能够被正确地解析和处理。

以下答案由GPT-3.5大模型与博主波罗歌共同编写:
在Flink SQL中,csv.ignore-parse-errors参数仅仅会忽略解析CSV文件时的语法错误,但是语义错误仍然会导致任务抛出异常而停止执行。例如,如果你的数据有一个无效的日期格式,那么即使将csv.ignore-parse-errors设置为false,任务仍然会失败。

以下是使用Flink SQL抽取CSV文件的示例代码,其中包括了csv.ignore-parse-errors的配置:

CREATE TABLE MyTable (
  id INT,
  name STRING,
  time TIMESTAMP
) WITH (
  'connector' = 'filesystem',
  'path' = '/path/to/csv/file',
  'format' = 'csv',
  'csv.ignore-parse-errors' = 'false'
);

在这个例子中,我们创建了一个名为MyTable的表,从指定的CSV文件中读取数据。csv.ignore-parse-errors参数设置为false,表示当CSV文件包含语法错误时,任务应该立即停止执行。因此,如果CSV文件中包含无效的日期格式等语义错误,任务会失败。

如果您仍然遇到了任务并没有异常终止的情况,可能是因为您的CSV文件中没有包含任何语法错误。你可以尝试在CSV文件中使用错误的日期格式等故意引发语义错误,以验证csv.ignore-parse-errors参数是否被正确地应用。
如果我的回答解决了您的问题,请采纳!

问题分析:
根据问题描述,FlinkSQL 抽取 文件系统 csv格式文件,Format配置csv.ignore-parse-errors为false时任务并没有异常终止。这意味着,即使在解析CSV文件时出现错误,任务也不会停止,而是继续执行下去。
在Flink中,CSV格式的数据可以使用Flink的CSV格式化程序进行解析。CSV格式化程序提供了许多选项,可以控制如何解析CSV文件。其中一个选项是csv.ignore-parse-errors,它控制是否忽略解析错误。如果设置为true,则解析错误将被忽略,并且任务将继续执行。如果设置为false,则解析错误将导致任务失败。
因此,问题可能出在CSV格式化程序的配置上。可能是csv.ignore-parse-errors选项没有正确设置,或者可能是其他选项导致了这种行为。
解决方案:
为了解决这个问题,可以尝试以下几个步骤:
1. 确认csv.ignore-parse-errors选项是否正确设置为false。可以检查FlinkSQL语句中的配置,或者在代码中设置选项。
2. 确认是否有其他选项导致了这种行为。可以检查其他CSV格式化程序选项的设置,例如csv.allow-comments、csv.field-delimiter等。
3. 尝试使用其他CSV格式化程序。Flink支持多种CSV格式化程序,例如OpenCSV、Apache Commons CSV等。可以尝试使用其他格式化程序,看看是否有不同的行为。
4. 检查CSV文件是否符合规范。如果CSV文件本身存在问题,例如缺少字段或格式错误,那么即使设置了csv.ignore-parse-errors为false,任务也可能不会失败。因此,需要确保CSV文件符合规范。
总结:
在使用FlinkSQL抽取CSV格式的数据时,需要注意CSV格式化程序的配置。特别是csv.ignore-parse-errors选项,它控制是否忽略解析错误。如果设置为false,则解析错误将导致任务失败。如果任务没有异常终止,那么需要检查CSV格式化程序的配置和CSV文件本身是否符合规范。