下面是代码
//下面两段代码在同一个类里
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
while (true) {
//启动
initDedz(khjl, glz);
//设置当前状态为启动
is_star = true;
}
}
void initDedz(BigDecimal khjl, BigDecimal glz){
// Kafka 获取的流水数据
List listStr = KafkaApi.getDEYDJsonList();
// 处理报文
List< HashMap> listMap = getJyMap(listStr);
yyyy=DateUtil.format(new Date(),"yyyy年");
for(Map map :listMap){
// 具体的业务处理逻辑,这里就不放了,主要是getDEYDJsonList()这个方法
}
}
// 这段代码和上面两段代码不在一个Java类里,这段代码在一个叫KafkaApi的类里,kafkaConsumer在这个类里有定义,这里没有贴代码,上面是调用
// 以下就是消费Kafka的逻辑
public static List getDEYDJsonList() {
List reList = new ArrayList<>();
try {
if (kafkaConsumer == null) {
kafkaConsumer = getKafkaConsumer();
kafkaConsumer.subscribe(Arrays.asList(HX_KAFKA_TOPCI));
kafkaConsumer.poll(Duration.ofSeconds(2));
kafkaConsumer.seekToEnd(kafkaConsumer.assignment());
}
ConsumerRecords records = kafkaConsumer.poll(Duration.ofMillis(0));
for (ConsumerRecord record : records) {
reList.add(record.value());
}
} catch (Exception e) {
e.printStackTrace();
}
return reList;
}
我的理解是while循环一直在等待消息,如果不用while消息,消息没有这么快来,就执行完了,后面数据来了,就接不到数据了。
客户是不是理解为TRUE就会一直执行while循环了看代码
while (true) {
//启动
initDedz(khjl, glz);
//设置当前状态为启动
is_star = true;
}
is_star启动状态一直是TRUE,while循环就会一直执行,要不要设置当执行结束设置FALSE,然后判断如果是FALSE时再执行,已经是TRUE时相当于逻辑已经处理过了,就不执行
Kafka的poll()方法是一个阻塞的方法,它会一直等待新的消息到达。如果使用if(true)来替代while(true)来调用poll()方法,这个方法只会被执行一次,然后程序就会终止,而不会等待新的消息到达。因此,为了保证消费数据的连续性,通常使用while(true)来调用poll()方法。
可以考虑其他方式来实现数据消费,如使用KafkaConsumer的assign()和seek()方法来控制消费数据的位置。
Kafka的poll()方法是一个阻塞方法,它会等待Kafka集群返回消息。如果没有消息可用,它将一直阻塞直到有消息可用为止。
使用while(true)可以让程序一直循环,直到有消息可用。如果你使用if(true)只会执行一次,如果这时Kafka没有数据,那么poll()方法就会立即返回,程序就会终止,不会再等待消息可用,所以就没法消费数据
通常,Kafka消费者应用程序会在后台运行并不断轮询Kafka集群以检索新消息。因此,使用while(true)循环来不断轮询Kafka集群是一种常见做法。
如果你想在程序中增加一些逻辑来控制 poll() 方法的执行次数,比如在某些特定条件下终止程序,那么可以在while(true)的条件中增加一些逻辑
可以使用Kafka的消费者API来替代while循环,使用Kafka的消费者API可以更好地控制消费者的消费速度,从而避免死循环的问题。Kafka消费者API的使用方法如下:
consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
consumer_timeout_ms=1000)
consumer.subscribe(['topic_name'])
for message in consumer:
# do something with message
consumer.close()
Kafka的poll()方法用于从Kafka集群中获取消息。在使用poll()方法时,如果使用if(true)而不是while(true),将只会获取一次消息,而不是持续获取消息。因此,使用while(true)可以确保消费者持续地从Kafka集群中获取消息,而不是只获取一次。
if是判断语句只会执行一次,while是循环语句他会循环去执行。
kafka的消费因为这一个消费者一直在运行所以需要while循环,要不然消费了一次就结束了。
另外kafka的poll方法是阻塞的,他会一直等到有消息来,然后再往下执行。
所以这里这个循环是有必要的,客户作审查的人是不是不懂代码?还是他们用的固定的审查代码软件,如果是软件,你把true换成一个变量,并且设置一些不会触发的情况,将这个变量设置为false,把审查对付过去就行了
使用 while(true) 的原因是,poll() 方法需要一直运行,以持续消费 Kafka 中的数据。如果改成 if(true) 的话,只会执行一次,就不能持续消费数据了。
但是,这个 while(true) 的写法显然是错误的,因为它会导致程序一直处于死循环状态,会导致 CPU 占用率过高。
正确的写法应该是在 while 循环中加入一些条件,使得循环在某种情况下结束,如果没有这些限制条件,应该考虑其他方法来控制循环。