Elasticsearch容器返回格式错误的http状态响应

I'm using the official go-elasticsearch 7.x api from golang to query an elasticsearch container:

I keep getting this response when I try to index some documents (after connecting successfully)

Error getting response: net/http: HTTP/1.x transport connection broken: malformed HTTP status code "is"

From what I can tell my request to index the document seems normal and it appears as though the error is coming from within the container? I've tried different TLS configurations, including no tls configuration at all and I still get the same response.

No error gets returns when I call elasticClient.Info() which it's connected to the client successfully

golang:

var err error
    cfg := elasticsearch.Config{
        Addresses: []string{
            "http://elasticsearch:9200",
            "http://elasticsearch:9300",
        },
        Transport: &http.Transport{
            TLSClientConfig: &tls.Config{
                Renegotiation: tls.RenegotiateOnceAsClient,
                Certificates:  []tls.Certificate{cer},
            },
        },
    }
    elasticClient, err = elasticsearch.NewClient(cfg)
    if err != nil {
        return err
    }
    return nil

val, err := json.Marshal(&load)
    if err != nil {
        log.Fatalf("Error marshalling load, %v", err)
    }
    log.Printf("|| %s ||", string(val))
    req := esapi.IndexRequest{
        Index:      "load",
        DocumentID: strconv.Itoa(int(load.Id)),
        Body:       strings.NewReader(string(val)),
        Refresh:    "true",
    }
    info, err := elasticClient.Info()
    if err != nil {
        return err
    }
    log.Println(info)
    // Perform the request with the client.
    res, err := req.Do(context.Background(), elasticClient)
    if err != nil {
        log.Fatalf("Error getting response: %s", err.Error())
    }
    if res.IsError() {
        log.Printf("[%s] Error indexing document ID=%s", res.Status(), strconv.Itoa(int(load.Id)))
    } else {

        // Deserialize the response into a map.
        var r map[string]interface{}
        if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
            log.Printf("Error parsing the response body: %s", err)
        } else {
            // Print the response status and indexed document version.
            log.Printf("[%s] %s; version=%d", res.Status(), r["result"], int(r["_version"].(float64)))
        }
    }
    return nil

typical json to index

string(val) in IndexRequest

{"deliverydate":"2019-08-28T13:00:00-04:00","temperature":"34",
"firstname":"****","loaddate":"2019-08-26T13:00:00-04:00","id":"188244","weight":"41918","totalcust":"2609","ponumber":"******","tsize":"53 ft","phone":"*******","equipment":"***","ext":"****","lastname":"****","pzip":"****",
"pcity":"****","pstate":"****","dzip":"22153","dcity":"*****","dstate":"****",
"locations":{"1":{"lat":44.260059,"long":-72.575387},"2":{"lat":38.748411,"long":-77.234767},"3":{},"4":{"lat":39.192978,"long":-76.7238},"5":{"lat":38.694281,"long":-75.772155},"6":{"lat":39.149275,"long":-76.775249},"7":{"lat":39.606779,"long":-75.833272}},"stopTypes":{"1":"P","2":"D","3":"D","4":"D","5":"D","6":"D","7":"D"}}

docker-compose.yml

version: "3"
services:
  redis_server:
    image: "redis"
    ports:
      - "6379:6379"
  lambda_server:
    build: .
    ports:
      - "8080:50051"
    depends_on:
      - redis_server
      - elasticsearch
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.3.1
    ports:
      - "9200:9200"
      - "9300:9300"      
    environment:
      - xpack.security.enabled=false
      - discovery.type=single-node
      - http.cors.enabled=true
      - http.cors.allow-origin=*

Edit 1:

Just tried wrapping the request in a loop to see if something changes and it appears to produce something more useful:

lambda_server_1 | 2019/08/24 17:25:59 [400 Bad Request] Error indexing document ID=188243 lambda_server_1 | 2019/08/24 17:25:59 [400 Bad Request] Error indexing document ID=189265

It appears it's a poorly formed request? Despite following the example exactly.

edited code:

func cacheLoadsToSearch() {
    data := redisClient.Scan(0, "*", totalNewLoads).Iterator()
    accessedKeys := map[string]bool{}
    for data.Next() {
        id := data.Val()
        if accessedKeys[id] {
            continue //continue to next iteration of loop, Scan cursor may return duplicate results
        } else {
            accessedKeys[id] = true
        }
        loadItem, err := readFromRedis(id)
        if err != nil {
            log.Fatalf("Error reading from redis cache, %v", err)
        }
        wg.Add(1)
        pushToElasticSearch(loadItem)
    }
    wg.Wait()
}

func pushToElasticSearch(load *lambdapb.Load) error {
    val, err := json.Marshal(&load)
    if err != nil {
        log.Fatalf("Error marshalling load, %v", err)
    }
    log.Printf("|| %s ||", string(val))
    req := esapi.IndexRequest{
        Index:      "load",
        DocumentID: strconv.Itoa(int(load.Id)),
        Body:       strings.NewReader(string(val)),
        Refresh:    "true",
    }
    info, err := elasticClient.Info()
    if err != nil {
        return err
    }
    log.Println(info)
    // Perform the request with the client.
    go func() {
        defer wg.Done()
        for {
            res, err := req.Do(context.Background(), elasticClient)
            if err != nil {
                log.Printf("Error getting response fron indexRequest: %s", err.Error())
                time.Sleep(5 * time.Second)
                continue
            }
            if res.IsError() {
                log.Printf("[%s] Error indexing document ID=%s", res.Status(), strconv.Itoa(int(load.Id)))
                time.Sleep(5 * time.Second)
                continue
            } else {

                // Deserialize the response into a map.
                var r map[string]interface{}
                if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
                    log.Printf("Error parsing the response body: %s", err)
                } else {
                    // Print the response status and indexed document version.
                    log.Printf("[%s] %s; version=%d", res.Status(), r["result"], int(r["_version"].(float64)))
                }
                break
            }
        }
    }()
    return nil
}

I also edited the setup function

func setupElasticSearch() error {
    var err error
    cfg := elasticsearch.Config{
        Addresses: []string{
            "http://elasticsearch:9200",
            "http://elasticsearch:9300",
        },
        Transport: &http.Transport{
            TLSClientConfig: &tls.Config{
                Renegotiation: tls.RenegotiateOnceAsClient,
                Certificates:  []tls.Certificate{cer},
            },
        },
    }
    elasticClient, err = elasticsearch.NewClient(cfg)
    if err != nil {
        return err
    }
    done := make(chan int)
    go func(done chan<- int) {
        for {
            res, err := elasticClient.Indices.Create("load")
            if err != nil {
                log.Printf("Malformed response to create index: %s", err)
                time.Sleep(5 * time.Second)
                continue
            }
            if res.IsError() {
                log.Printf("Cannot create index: %s", res)
                time.Sleep(5 * time.Second)
                continue
            }
            log.Print("Successfully created index")
            break
        }
        done <- 1
    }(done)
    <-done
    return nil
}