将Kafka消息轮询成单片golang

I want to a kafka consumer which polls Kafka message into a slice of events and perform some bulkoperation over that slice and it should block the thread till the bulkoperation is completed just to ensure I am not missing any record from Kafka

I am using confluent-kafka-go library which gives us the flexibility to poll the consumer for some interval of time which returns an Event everytime. I want to get slices of event polled in the interval of time mentioned so that I can run batch operations over that slice.


    var records []struct
    maxBatchsize := 500
    ev := c.Poll(1000)
    switch e := ev.(type) {
    case *kafka.Message:
        fmt.Println(string(e.Value))
        <<<json unmarshalled to struct>>>
        records = append(records, struct)
        if len(records) >= maxBatchsize {
        err := BulkInsert(records)
            if err != nil {
         _, _ = c.Commit()
        records = nil
        }
    }

Above code helps me just for hack but dont solve my problem. I want to avoid using maxBatchSize. I want a functionality like

val x = consumer.poll(5000).asScala \\which returns me and iterable consumer<k,v> on which I can perform bulk operations.