关于Kafka的poll()方法的使用时为什么要用while(true),换成if(true)就没法消费数据的问题

项目中有个短信发送的功能,从客户提供的Kafka中消费数据,拿到这数据后按业务需求进行处理,把处理后的内容以短信的方式发送给客户
这个功能是同事写的,最近项目上线,客户做了个代码评审,发现从Kafka中获取数据(poll方法),使用while循环来实现的,觉得这是死循环,要求整改,查了很多资料都是这么写的,客户说什么也要整改,现在同事离职了,问不到他了

下面是代码

//下面两段代码在同一个类里
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {

        while (true) {
            //启动
            initDedz(khjl, glz);
            //设置当前状态为启动
            is_star = true;
        }
}

void initDedz(BigDecimal khjl, BigDecimal glz){
        // Kafka 获取的流水数据
        List listStr =  KafkaApi.getDEYDJsonList();
        // 处理报文
        List< HashMap> listMap = getJyMap(listStr);
        yyyy=DateUtil.format(new Date(),"yyyy年");
        for(Map map :listMap){
            // 具体的业务处理逻辑,这里就不放了,主要是getDEYDJsonList()这个方法
        }
}

// 这段代码和上面两段代码不在一个Java类里,这段代码在一个叫KafkaApi的类里,kafkaConsumer在这个类里有定义,这里没有贴代码,上面是调用
// 以下就是消费Kafka的逻辑
public static List getDEYDJsonList() {
        List reList = new ArrayList<>();

        try {
            if (kafkaConsumer == null) {
                kafkaConsumer = getKafkaConsumer();
                kafkaConsumer.subscribe(Arrays.asList(HX_KAFKA_TOPCI));
                kafkaConsumer.poll(Duration.ofSeconds(2));
                kafkaConsumer.seekToEnd(kafkaConsumer.assignment());
            }

            ConsumerRecords records = kafkaConsumer.poll(Duration.ofMillis(0));
            for (ConsumerRecord record : records) {
                reList.add(record.value());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return reList;
    }

Kafka之前没怎么接触过,而且上线的计划给的时间很紧,所以想请教一下社区的朋友;我想要的就是:1:为什么要用while循环,通过poll方法来拉取数据,把while改掉,就消费不到数据了;2:有没有其他方式也可以从Kafka中消费数据

我的理解是while循环一直在等待消息,如果不用while消息,消息没有这么快来,就执行完了,后面数据来了,就接不到数据了。

客户是不是理解为TRUE就会一直执行while循环了看代码

while (true) {
            //启动
            initDedz(khjl, glz);
            //设置当前状态为启动
            is_star = true;
        }


is_star启动状态一直是TRUE,while循环就会一直执行,要不要设置当执行结束设置FALSE,然后判断如果是FALSE时再执行,已经是TRUE时相当于逻辑已经处理过了,就不执行

Kafka的poll()方法是一个阻塞的方法,它会一直等待新的消息到达。如果使用if(true)来替代while(true)来调用poll()方法,这个方法只会被执行一次,然后程序就会终止,而不会等待新的消息到达。因此,为了保证消费数据的连续性,通常使用while(true)来调用poll()方法。

可以考虑其他方式来实现数据消费,如使用KafkaConsumer的assign()和seek()方法来控制消费数据的位置。

Kafka的poll()方法是一个阻塞方法,它会等待Kafka集群返回消息。如果没有消息可用,它将一直阻塞直到有消息可用为止。

使用while(true)可以让程序一直循环,直到有消息可用。如果你使用if(true)只会执行一次,如果这时Kafka没有数据,那么poll()方法就会立即返回,程序就会终止,不会再等待消息可用,所以就没法消费数据

通常,Kafka消费者应用程序会在后台运行并不断轮询Kafka集群以检索新消息。因此,使用while(true)循环来不断轮询Kafka集群是一种常见做法。

如果你想在程序中增加一些逻辑来控制 poll() 方法的执行次数,比如在某些特定条件下终止程序,那么可以在while(true)的条件中增加一些逻辑

可以使用Kafka的消费者API来替代while循环,使用Kafka的消费者API可以更好地控制消费者的消费速度,从而避免死循环的问题。Kafka消费者API的使用方法如下:

  1. 创建消费者实例:

consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'],
                         auto_offset_reset='earliest',
                         consumer_timeout_ms=1000)

  1. 订阅主题:
consumer.subscribe(['topic_name'])

  1. 获取消息:
for message in consumer:
    # do something with message

  1. 关闭消费者:
consumer.close()

Kafka的poll()方法用于从Kafka集群中获取消息。在使用poll()方法时,如果使用if(true)而不是while(true),将只会获取一次消息,而不是持续获取消息。因此,使用while(true)可以确保消费者持续地从Kafka集群中获取消息,而不是只获取一次。

if是判断语句只会执行一次,while是循环语句他会循环去执行。
kafka的消费因为这一个消费者一直在运行所以需要while循环,要不然消费了一次就结束了。
另外kafka的poll方法是阻塞的,他会一直等到有消息来,然后再往下执行。
所以这里这个循环是有必要的,客户作审查的人是不是不懂代码?还是他们用的固定的审查代码软件,如果是软件,你把true换成一个变量,并且设置一些不会触发的情况,将这个变量设置为false,把审查对付过去就行了

使用 while(true) 的原因是,poll() 方法需要一直运行,以持续消费 Kafka 中的数据。如果改成 if(true) 的话,只会执行一次,就不能持续消费数据了。

但是,这个 while(true) 的写法显然是错误的,因为它会导致程序一直处于死循环状态,会导致 CPU 占用率过高。

正确的写法应该是在 while 循环中加入一些条件,使得循环在某种情况下结束,如果没有这些限制条件,应该考虑其他方法来控制循环。