在配置flume时出现了如下报错
(conf-file-poller-0) [ERROR - org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:469)] Sink k1 has been removed due to an error during configuration
java.lang.NullPointerException: It's essential to set compress codec when fileType is: CompressedStream
at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:895)
at org.apache.flume.sink.hdfs.HDFSEventSink.configure(HDFSEventSink.java:262)
at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:453)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:106)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:145)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
flume的conf如下
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
# source中的avro组件是一个接收者服务
a1.sources.r1.type = avro
a1.sources.r1.bind = node03
a1.sources.r1.port = 4141
a1.sources.r1.channels = c1
# 配置channel
a1.channels.c1.type = file
#检查点文件目录
a1.channels.c1.checkpointDir = /zkr/bigdata/flume_checkpoint
#缓存数据文件夹
a1.channels.c1.dataDirs = /zkr/bigdata/flume_data
#配置sink
a1.sinks.k1.channel = c1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://node01:8020/origin_data/game/log/%{topic}/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logevent-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = second
#不要产生大量小文件
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 1000
#a1.sinks.k1.hdfs.batchSize = 1
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.minBlockReplicas = 1
#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本 CompressedStream
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = lzop
#a1.sinks.k1.hdfs.fileSuffix=.lzo
这个是什么问题导致的,请求帮忙解惑。
这个错误来自于 Flume(一个用于在分布式系统中收集、聚合和移动大量日志数据的可扩展工具)。从错误信息中可以看出,它是关于在配置HDFSEventSink时遇到的问题。具体地说,错误消息指出: "在fileType为CompressedStream时,设置压缩编解码器(compress codec)是基本的"。因此,您需要检查您的 Flume 配置文件的HDFSEventSink部分,并确保已正确设置压缩编解码器。
该错误表示在配置Flume的HDFS Sink时缺少必要参数。具体来说,错误消息指出必须设置压缩编解码器(compress codec),但是在文件类型(fileType)为CompressedStream的情况下未设置。
要解决此错误,可以编辑Flume配置文件以确保已设置所有必需参数。根据错误消息,需要在HDFSEventSink中设置压缩编解码器。以下是HDFS Sink的示例配置代码,其中设置了“gzip”压缩编解码器:
agent.sinks.k1.type = hdfs
agent.sinks.k1.hdfs.path = /user/flume/events/
agent.sinks.k1.hdfs.fileType = CompressedStream
agent.sinks.k1.hdfs.codeC = gzip
在此示例配置中,设置了“gzip”作为压缩编解码器,并将文件类型设置为CompressedStream,以便Flume能够正确处理压缩的文件流。确保在编辑配置文件时使用正确的参数和值,并重新启动Flume代理以使更改生效。
a1.sinks.k1.hdfs.compressCodec = <compress-codec-class-name>
compress-codec-class-name是可以使用的压缩解压缩编解码器的全限定类名,根据你自己使用的压缩解压编解码器调整。
例如:使用Gzip压缩解压缩编解码器时,你应该配置以下参数:
a1.sinks.k1.hdfs.compressCodec = org.apache.hadoop.io.compress.GzipCodec
a1.sinks.k1.hdfs.compressCodec 设置了gzip和snappy 也是无效的,
另外 a1.sinks.k1.hdfs.compressCodec = org.apache.hadoop.io.compress.GzipCodec
也是报同样的错
以下内容引用CHATGPT、有用望采纳:
根据报错信息,可以看出是在配置hdfs的sink时出现了问题。具体来说,是设置了文件类型为CompressedStream,但是没有设置压缩编码codec。因此,需要在配置文件中添加如下代码:
a1.sinks.k1.hdfs.codeC = lzop
其中,lzop是一种压缩编码方式,可以根据实际情况选择其他的编码方式。
修改后的完整配置如下:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
# source中的avro组件是一个接收者服务
a1.sources.r1.type = avro
a1.sources.r1.bind = node03
a1.sources.r1.port = 4141
a1.sources.r1.channels = c1
# 配置channel
a1.channels.c1.type = file
#检查点文件目录
a1.channels.c1.checkpointDir = /zkr/bigdata/flume_checkpoint
#缓存数据文件夹
a1.channels.c1.dataDirs = /zkr/bigdata/flume_data
#配置sink
a1.sinks.k1.channel = c1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://node01:8020/origin_data/game/log/%{topic}/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logevent-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = second
#不要产生大量小文件
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 1000
#a1.sinks.k1.hdfs.batchSize = 1
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.minBlockReplicas = 1
#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本 CompressedStream
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = lzop
#a1.sinks.k1.hdfs.fileSuffix=.lzo
该回答引用ChatGPT
<a1.sinks.k1.hdfs.codeC> = lzop