Flume自定义sink报异常错误!!!

求求各路大神指点指点孩子吧,心态崩了,555555555

Flume自定义sink报如下图异常错误
图片说明
但是查看了自己的代码,配置文件,还有启动命令感觉一点问题都没。。。

代码:
package cn.hrb.sink;

import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySink1 extends AbstractSink implements Configurable {

// 得到logger对象
private Logger logger = LoggerFactory.getLogger(MySink1.class); //!!!

private String prefix = null;
private String subfix = null;

// 定义配置信息
public void configure(Context context) {

    // 读取配置文件内容
    prefix = context.getString("prefix", "zsk-->");
    subfix = context.getString("subfix");
}

// 编写核心代码逻辑(把event写出去)
public Status process() throws EventDeliveryException {

    Status status = null;

    // 得到channel对象
    Channel channel = getChannel();

    // 得到事务对象
    Transaction transaction = channel.getTransaction();

    // 开启事务
    transaction.begin();

        try {

            // take() called when transaction is NEW!
            // 从channel对象中拿事件
            Event takeEvent = channel.take();

            // 判断channel中是否有事件
            if(takeEvent != null) {

                // 解析事件并写出
                String data = new String(takeEvent.getBody());

                logger.info(prefix + data + subfix);
            }

            // 提交事务
            transaction.commit();
            status = Status.READY;

        } catch (Exception e) {

            // 有异常回滚
            transaction.rollback();
            status = Status.BACKOFF;

            e.printStackTrace();

        } finally {

            // 关闭事务
            transaction.close();
        }


    return status;
}

}

配置文件(/opt/module/flume/case3/sink/flume1.conf)

Name the components on this agent

a1.sources = r1
a1.sinks = k1
a1.channels = c1

Describe/configure the source

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

Describe the sink

a1.sinks.k1.type = cn.hrb.sink.MySink1
a1.sinks.k1.prefix = begin-
a1.sinks.k1.subfix = -end

Use a channel which buffers events in memory

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

Bind the source and sink to the channel

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动命令
bin/flume-ng agent -c conf/ -f case3/sink/flume1.conf -n a1 -Dflume.root.logger=INFO,console