flume sink到kafla接收不到消息

flume source kafka,sink kafka,sink file
source是读取kafka,sink 到file没有问题,sink到kafka接收不到消息,没报错
以下是配置,求大佬解惑

##########################
gd.sources = r1
gd.sinks = k1 k2 k3 k4
gd.channels = c1 c2

#sink gruop
gd.sinkgroups = g1

#interceptors
gd.sources.r1.interceptors = i1
gd.sources.r1.interceptors.i1.type = com.hcom.loggerInfo$Builder

#kafka source
gd.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
gd.sources.r1.kafka.bootstrap.servers = 192.168.0.1:9200
gd.sources.r1.kafka.topics = test-topic
gd.sources.r1.kafka.consumer.group.id = group-test
gd.sources.r1.kafka.consumer.auto.offset.reset = earliest
gd.sources.r1.migrateZookeeperOffsets = false

#file sink
gd.sinks.k1.type = com.hcom.MySinks
gd.sinks.k1.fileName = /opt/syslog/test.log

#kafka sinks2
gd.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
gd.sinks.k2.topic = iams-ncolog
gd.sinks.k2.brokerList = 192.168.0.2:9092
gd.sinks.k2.requiredAcks = 1
gd.sinks.k2.batchSize = 240

#kafka sinks3
gd.sinks.k3.type = org.apache.flume.sink.kafka.KafkaSink
gd.sinks.k3.topic = iams-ncolog
gd.sinks.k3.brokerList = 192.168.0.3:9092
gd.sinks.k3.requiredAcks = 1
gd.sinks.k3.batchSize = 160

#kafka sinks4
gd.sinks.k4.type = org.apache.flume.sink.kafka.KafkaSink
gd.sinks.k4.topic = iams-ncolog
gd.sinks.k4.brokerList = 192.168.0.4:9092
gd.sinks.k4.requiredAcks = 1
gd.sinks.k4.batchSize = 100

memory channels

gd.channels.c1.type = memory
gd.channels.c1.capacity = 2048
gd.channels.c1.transactionCapacity = 1000

memory channels

gd.channels.c2.type = memory
gd.channels.c2.capacity = 2048
gd.channels.c2.transactionCapacity = 1000

#set sink group
gd.sinkgroups.g1.sinks = k2 k3 k4

#set failover
gd.sinkgroups.g1.processor.type = failover
gd.sinkgroups.g1.processor.priority.k2 = 10
gd.sinkgroups.g1.processor.priority.k3 = 5
gd.sinkgroups.g1.processor.priority.k4 = 3
gd.sinkgroups.g1.processor.maxpenalty = 30000

Bind the source and sink to the channel

gd.sources.r1.channels = c1 c2
gd.sinks.k1.channel = c1
gd.sinks.k2.channel = c2
gd.sinks.k3.channel = c2
gd.sinks.k4.channel = c2

http://wenda.chinahadoop.cn/question/873