Hi following is the code for my consumer,
$conf->set('group.id','commonqueue2');
$conf->set('offset.store.method', 'broker');
$rk = new \RdKafka\Consumer($conf);
$rk->addBrokers($kafka_servers);
$topicConf = new \RdKafka\TopicConf();
$topicConf->set('auto.commit.interval.ms',1000);
$topicConf->set('offset.store.method', 'broker');
$topicConf->set('auto.offset.reset', 'smallest');
$topic = $rk->newTopic("registration17", $topicConf);
$topic->consumeStart(0,RD_KAFKA_OFFSET_STORED);
while (true) {
$message = $topic->consume(0,1000);
//store offset to broker
$topic->offsetStore($message->partition,$message->offset);
$message_offset = $message->offset;
echo $message_data = $message->payload;
}//end of while loop
In the above program how can i find the last stored offset before executing following code
$topic->consumeStart(0,RD_KAFKA_OFFSET_STORED);
There are 2 methods to get the latest offsets: 1. From zookeeper Shell By using the coomand get /consumers//offsets//num_of_partitions For the help,use this link: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper
2.To retrieve the latest offset,we can even use the command: bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group --zookeeper : --topic