求求各路大神指点指点孩子吧,心态崩了,555555555
Flume自定义sink报如下图异常错误
但是查看了自己的代码,配置文件,还有启动命令感觉一点问题都没。。。
代码:
package cn.hrb.sink;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MySink1 extends AbstractSink implements Configurable {
// 得到logger对象
private Logger logger = LoggerFactory.getLogger(MySink1.class); //!!!
private String prefix = null;
private String subfix = null;
// 定义配置信息
public void configure(Context context) {
// 读取配置文件内容
prefix = context.getString("prefix", "zsk-->");
subfix = context.getString("subfix");
}
// 编写核心代码逻辑(把event写出去)
public Status process() throws EventDeliveryException {
Status status = null;
// 得到channel对象
Channel channel = getChannel();
// 得到事务对象
Transaction transaction = channel.getTransaction();
// 开启事务
transaction.begin();
try {
// take() called when transaction is NEW!
// 从channel对象中拿事件
Event takeEvent = channel.take();
// 判断channel中是否有事件
if(takeEvent != null) {
// 解析事件并写出
String data = new String(takeEvent.getBody());
logger.info(prefix + data + subfix);
}
// 提交事务
transaction.commit();
status = Status.READY;
} catch (Exception e) {
// 有异常回滚
transaction.rollback();
status = Status.BACKOFF;
e.printStackTrace();
} finally {
// 关闭事务
transaction.close();
}
return status;
}
}
配置文件(/opt/module/flume/case3/sink/flume1.conf)
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sinks.k1.type = cn.hrb.sink.MySink1
a1.sinks.k1.prefix = begin-
a1.sinks.k1.subfix = -end
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动命令
bin/flume-ng agent -c conf/ -f case3/sink/flume1.conf -n a1 -Dflume.root.logger=INFO,console