kafka1.0.0的client,生产者生产数据失败

配置:

 Properties props = new Properties();

        //broker地址
        props.put("bootstrap.servers", "39.108.61.252:9092,39.108.61.252:9093,39.108.61.252:9094");

        //请求时候需要验证
        props.put("acks", "0");

        //请求失败时候需要重试
        props.put("retries", 1);

        //生产者就会尝试将记录组合成一个batch的请求。 这有助于客户端和服务器的性能。不能大于此默认值,否则浪费内存,反而降低吞吐量
        //props.put("batch.size", 16384);

        //汇聚一定时间内的记录一起发出
//        props.put("linger.ms", 50);

        //内存缓存区大小
        props.put("buffer.memory", 33554432);

        //指定消息key序列化方式
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");

        //指定消息本身的序列化方式
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");


        producer = new KafkaProducer<>(props);

生产数据

没有Thread.sleep()就不能成功发送数据!,有了就可已在消费者端接受到数据。若把Thread.sleep()删除,在生产末尾加上close()方法也能成功生产

 for (int i = 0; i < 10; i++) {
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    kafkaProducer.send(new ProducerRecord<>("topic_user_general_info_update", "simpleKey", "value-"+i));

                }

困扰很久,不知道配置还是哪里问题。

close() 方法存在的意义就是为了在使用完队列后释放资源,如果不释放资源 所属的线程就会直接占用一整个 核 的资源。你在末尾添加 close() 方法后能运行就证明是没问题的。

这个props.put("linger.ms", 50);改成props.put("linger.ms", 0);试一下

props.put("linger.ms", 50);这个配置代表消息会延迟50ms才会发送。
其实kafka生产者发送消息的时候并不是立即发送到kafka服务端,而是会保存在本地一段时间,再由另一个守护线程Sender线程发送。
现在是你发送了一条,如果没有sleep50ms,主线程结束了,守护线程同样会结束,这样这条消息就丢失了,所以要显性执行close才可以
close方法会阻塞当前线程,并立即唤醒Sender线程,直到把保存在本地的请求发送完成,这样数据就不会丢失了

消费不到数据不一定就是没有生产成功,有很多种情况。当然大概率是生产失败。具体情况得具体分析,按照你提到的配置。要么是消息总量达到batch.size", 16384
,要么等待时间达到linger.ms", 50,你的消息是发送不出去了。这点是可以确定。