Olivere软件包中的BulkIndexer用于Golang替换Elastigo

I notice that I can use BulkIndexer if I want to send data into elasticsearch in bulk. As stated in the Elastigo documentation

A bulk indexer creates goroutines, and channels for connecting and sending data to elasticsearch in bulk, using buffers.

Code in elastigo to insert in bulk

var c_es = elastigo.NewConn()
var indexer = c_es.NewBulkIndexer(50)

func insertInBulkElastic(){
    //Create a custom error function when inserting data into elasticsearch 
   //in bulk
    indexer.Sender = func(buf *bytes.Buffer) error {
    // @buf is the buffer of docs about to be written
    respJson, err := c_es.DoCommand("POST", "/_bulk", nil, buf)
    if err != nil {
        // handle it better than this

        fmt.Println("Error", string(respJson)) // 

        fmt.Println("Error", err)
    }

    if err == nil {
        fmt.Println("The data was inserted successfullly to elastic search")
    }
    return err
  }



}

Does anyone know how to send bulk request using olivere for golang?

Thanks

Here is a working example using olivere in Go. You can read more about the BulkProcessor here

Hope this help :)

package main

import (
    "context"
    "log"
    "time"

    elastic "gopkg.in/olivere/elastic.v5"
)

func main() {
    options := []elastic.ClientOptionFunc{
        elastic.SetHealthcheck(true),
        elastic.SetHealthcheckTimeout(20 * time.Second),
        elastic.SetSniff(false),
        elastic.SetHealthcheckInterval(30 * time.Second),
        elastic.SetURL("http://127.0.0.1:9200"),
        elastic.SetRetrier(elastic.NewBackoffRetrier(elastic.NewConstantBackoff(5 * time.Second))),
    }
    client, err := elastic.NewClient(options...)
    if err != nil {
        panic(err)
    }
    // ensure index exist
    exists, err := client.IndexExists("my_index").Do(context.Background())
    if err != nil {
        panic(err)
    }
    if !exists {
        if _, err := client.CreateIndex("my_index").Do(context.Background()); err != nil {
            panic(err)
        }
    }
    client.PutMapping().Index("my_index").BodyJson(map[string]interface{}{
        "properties": map[string]string{
            "name": "keyword",
        },
    }).Do(context.Background())

    // create new bulk processor from client
    bulkProcessor, err := elastic.NewBulkProcessorService(client).
        Workers(5).
        BulkActions(1000).
        FlushInterval(1 * time.Second).
        After(after).
        Do(context.Background())

    // now the bulk processor can be reused for entire the app
    myDoc := struct {
        Name string
    }{
        Name: "jack",
    }
    req := elastic.NewBulkIndexRequest()
    req.Index("my_index").Type("type").Id("my_doc_id").Doc(myDoc)

    // Use Add method to add request into the processor
    bulkProcessor.Add(req)

    // wait for sometime...
    time.Sleep(5 * time.Second)
}

func after(executionID int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
    if err != nil {
        log.Printf("bulk commit failed, err: %v
", err)
    }
    // do what ever you want in case bulk commit success
    log.Printf("commit successfully, len(requests)=%d
", len(requests))
}