在配置flume时出现了报错:It's essential to set compress codec when fileType is: CompressedStream

在配置flume时出现了如下报错

 (conf-file-poller-0) [ERROR - org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:469)] Sink k1 has been removed due to an error during configuration
java.lang.NullPointerException: It's essential to set compress codec when fileType is: CompressedStream
        at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:895)
        at org.apache.flume.sink.hdfs.HDFSEventSink.configure(HDFSEventSink.java:262)
        at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
        at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:453)
        at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:106)
        at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:145)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)


flume的conf如下

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
# source中的avro组件是一个接收者服务
a1.sources.r1.type = avro
a1.sources.r1.bind = node03
a1.sources.r1.port = 4141
a1.sources.r1.channels = c1

# 配置channel
a1.channels.c1.type = file
#检查点文件目录
a1.channels.c1.checkpointDir = /zkr/bigdata/flume_checkpoint
#缓存数据文件夹
a1.channels.c1.dataDirs = /zkr/bigdata/flume_data

#配置sink
a1.sinks.k1.channel = c1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://node01:8020/origin_data/game/log/%{topic}/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logevent-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = second
#不要产生大量小文件
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 1000
#a1.sinks.k1.hdfs.batchSize = 1
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.minBlockReplicas = 1
#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本 CompressedStream
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = lzop
#a1.sinks.k1.hdfs.fileSuffix=.lzo

这个是什么问题导致的,请求帮忙解惑。

这个错误来自于 Flume(一个用于在分布式系统中收集、聚合和移动大量日志数据的可扩展工具)。从错误信息中可以看出,它是关于在配置HDFSEventSink时遇到的问题。具体地说,错误消息指出: "在fileType为CompressedStream时,设置压缩编解码器(compress codec)是基本的"。因此,您需要检查您的 Flume 配置文件的HDFSEventSink部分,并确保已正确设置压缩编解码器。

该错误表示在配置Flume的HDFS Sink时缺少必要参数。具体来说,错误消息指出必须设置压缩编解码器(compress codec),但是在文件类型(fileType)为CompressedStream的情况下未设置。

要解决此错误,可以编辑Flume配置文件以确保已设置所有必需参数。根据错误消息,需要在HDFSEventSink中设置压缩编解码器。以下是HDFS Sink的示例配置代码,其中设置了“gzip”压缩编解码器:

agent.sinks.k1.type = hdfs
agent.sinks.k1.hdfs.path = /user/flume/events/
agent.sinks.k1.hdfs.fileType = CompressedStream
agent.sinks.k1.hdfs.codeC = gzip

在此示例配置中,设置了“gzip”作为压缩编解码器,并将文件类型设置为CompressedStream,以便Flume能够正确处理压缩的文件流。确保在编辑配置文件时使用正确的参数和值,并重新启动Flume代理以使更改生效。

a1.sinks.k1.hdfs.compressCodec = <compress-codec-class-name>

compress-codec-class-name是可以使用的压缩解压缩编解码器的全限定类名,根据你自己使用的压缩解压编解码器调整。
例如:使用Gzip压缩解压缩编解码器时,你应该配置以下参数:

a1.sinks.k1.hdfs.compressCodec = org.apache.hadoop.io.compress.GzipCodec

a1.sinks.k1.hdfs.compressCodec 设置了gzip和snappy 也是无效的,
另外 a1.sinks.k1.hdfs.compressCodec = org.apache.hadoop.io.compress.GzipCodec
也是报同样的错

以下内容引用CHATGPT、有用望采纳:

根据报错信息,可以看出是在配置hdfs的sink时出现了问题。具体来说,是设置了文件类型为CompressedStream,但是没有设置压缩编码codec。因此,需要在配置文件中添加如下代码:

a1.sinks.k1.hdfs.codeC = lzop

其中,lzop是一种压缩编码方式,可以根据实际情况选择其他的编码方式。

修改后的完整配置如下:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
# source中的avro组件是一个接收者服务
a1.sources.r1.type = avro
a1.sources.r1.bind = node03
a1.sources.r1.port = 4141
a1.sources.r1.channels = c1

# 配置channel
a1.channels.c1.type = file
#检查点文件目录
a1.channels.c1.checkpointDir = /zkr/bigdata/flume_checkpoint
#缓存数据文件夹
a1.channels.c1.dataDirs = /zkr/bigdata/flume_data

#配置sink
a1.sinks.k1.channel = c1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://node01:8020/origin_data/game/log/%{topic}/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logevent-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = second
#不要产生大量小文件
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 1000
#a1.sinks.k1.hdfs.batchSize = 1
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.minBlockReplicas = 1
#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本 CompressedStream
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = lzop
#a1.sinks.k1.hdfs.fileSuffix=.lzo
该回答引用ChatGPT
根据报错信息和配置文件,可以看出问题是在HDFSEventSink组件的configure方法中,抛出了NullPointerException异常,原因是当文件类型为CompressedStream时,必须设置压缩格式。在配置中,虽然设置了文件类型为CompressedStream,但明显地缺少了设置压缩格式的配置项,需要在<a1.sinks.k1.hdfs.codeC>配置项中设置对应的压缩格式,例如gzip、snappy、lzo等。可以修改配置文件中的这一行代码:


<a1.sinks.k1.hdfs.codeC> = lzop


将"lzop"改为所需的压缩格式即可。