配置:
Properties props = new Properties();
//broker地址
props.put("bootstrap.servers", "39.108.61.252:9092,39.108.61.252:9093,39.108.61.252:9094");
//请求时候需要验证
props.put("acks", "0");
//请求失败时候需要重试
props.put("retries", 1);
//生产者就会尝试将记录组合成一个batch的请求。 这有助于客户端和服务器的性能。不能大于此默认值,否则浪费内存,反而降低吞吐量
//props.put("batch.size", 16384);
//汇聚一定时间内的记录一起发出
// props.put("linger.ms", 50);
//内存缓存区大小
props.put("buffer.memory", 33554432);
//指定消息key序列化方式
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
//指定消息本身的序列化方式
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(props);
生产数据
没有Thread.sleep()就不能成功发送数据!,有了就可已在消费者端接受到数据。若把Thread.sleep()删除,在生产末尾加上close()方法也能成功生产
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
kafkaProducer.send(new ProducerRecord<>("topic_user_general_info_update", "simpleKey", "value-"+i));
}
困扰很久,不知道配置还是哪里问题。
close() 方法存在的意义就是为了在使用完队列后释放资源,如果不释放资源 所属的线程就会直接占用一整个 核 的资源。你在末尾添加 close() 方法后能运行就证明是没问题的。
这个props.put("linger.ms", 50);改成props.put("linger.ms", 0);试一下
props.put("linger.ms", 50);这个配置代表消息会延迟50ms才会发送。
其实kafka生产者发送消息的时候并不是立即发送到kafka服务端,而是会保存在本地一段时间,再由另一个守护线程Sender线程发送。
现在是你发送了一条,如果没有sleep50ms,主线程结束了,守护线程同样会结束,这样这条消息就丢失了,所以要显性执行close才可以
close方法会阻塞当前线程,并立即唤醒Sender线程,直到把保存在本地的请求发送完成,这样数据就不会丢失了
消费不到数据不一定就是没有生产成功,有很多种情况。当然大概率是生产失败。具体情况得具体分析,按照你提到的配置。要么是消息总量达到batch.size", 16384
,要么等待时间达到linger.ms", 50,你的消息是发送不出去了。这点是可以确定。