学了Kafka我们都知道,生产者如果异步发送消息到kafka,那生产者将不等待kafka的回应,继续发送下一条消息,那生产者异步发送消息的这个过程的底层是依赖了多线程吗?还是怎么样的?和Java的多线程有啥区别?
生产者异步发送消息到Kafka的过程中,确实会使用到多线程技术。
生产者会使用一个专门的线程池来处理异步发送消息的任务。当生产者调用send()
方法发送消息时,实际上是将消息添加到了线程池的任务队列中,然后立即返回,不会等待Kafka的响应。
线程池中的线程会不断地从任务队列中取出任务,并将消息发送到Kafka集群中。
与Java的多线程相比,Kafka生产者异步发送消息的多线程模型有以下几个特点:
线程池的大小是可配置的,可以根据实际情况进行调整,以达到最优的性能表现。
线程池中的线程是长期存在的,可以重复利用,避免了线程的频繁创建和销毁,提高了性能。
线程池中的线程是异步执行的,不会阻塞主线程,提高了程序的并发性能。
你在说java多线程的时候,真的知道自己在说什么吗
java多线程有多少种方式你知道吗
线程,线程池,异步方法,这些听说过吗
正式进入生产者代码实践之前,首先列举出生产者方大致的参数列表如下:
参数 | 解释说明 |
---|---|
bootstrap.servers | 生产者连接集群所需的 broker 地 址 清 单 。 例 如hadoop102:9092,hadoop103:9092,hadoop104:9092,可以设置 1 个或者多个,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者从给定的 broker里查找到其他 broker 信息。 |
key.serializer 和 value.serializer | 指定发送消息的 key 和 value 的序列化类型。一定要写全类名。 |
buffer.memory | RecordAccumulator 缓冲区总大小,默认 32m。 |
batch.size | 缓冲区一批数据最大值,默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。 |
linger.ms | 如果数据迟迟未达到 batch.size,sender 等待 linger.time之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms 之间。 |
acks | 0:生产者发送过来的数据,不需要等数据落盘应答。1:生产者发送过来的数据,Leader 收到数据后应答。-1(all):生产者发送过来的数据,Leader+和 isr 队列里面的所有节点收齐数据后应答。默认值是-1,-1 和all 是等价的。 |
max.in.flight.requests.per.connection | 允许最多没有返回 ack 的次数,默认为 5,开启幂等性要保证该值是 1-5 的数字。 |
retries | 当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是 int 最大值,2147483647。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。 |
retry.backoff.ms | 两次重试之间的时间间隔,默认是 100ms。 |
enable.idempotence | 是否开启幂等性,默认 true,开启幂等性。 |
compression.type | 生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。支持压缩类型:none、gzip、snappy、lz4 和 zstd。 |
根据参考资料中的内容可以得出,Kafka生产者在进行异步发送消息时,底层依赖于Sender线程来处理。具体来说,主线程将消息缓存到消息累加器中,Sender线程从消息累加器中获取消息并发送到Kafka中。
与Java多线程的区别在于,Kafka生产者中只有一个Sender线程负责发送消息,而Java多线程可以有多个线程同时运行。同时,Kafka生产者中的Sender线程是专门用来发送消息的,而Java多线程中的线程可以有不同的任务。
需要注意的是,由于Kafka在特定情况下可能会出现消息丢失的问题,因此在使用Kafka进行消息传输时需要注意数据保留策略和解决方案,例如设置ack=all / -1、配置min.insync.replicas > 1等。同时,在消费消息时也需要先commit再处理消息,避免因异常而导致消息丢失。如果出现不可恢复的异常,可以捕获异常并记录到数据库或缓存中进行单独处理。
Kafka 中生产者异步发送是指生产者将消息发送到缓冲区,然后由另一个线程负责将缓冲区中的消息批量发送到Kafka集群。这样可以提高生产者的吞吐量和性能。
Kafka 能持久化存储,多线程能吗;Kafka 能保证顺序性,多线程能吗?
可以看看 Kafka 官网介绍的特点,就知道它哪里比多线程强了。