与多阅读器并发的POST不返回响应

I have a proof of concept http server using echo which takes a POST request with a JSON body. I am trying to stream the request body over to multiple POST requests using pipes and the multiwriter but it is not working correctly.

In the example below I can see the data is sent to the 2 POST endpoints and I can see a log from those requests but I never get a response back it seems the code hangs waiting for the http.Post(...) functions to complete.

If I call these 2 endpoints directly they work fine and give a valid json response, so i believe the problem is with this piece of code which is my handler for the route.

func ImportAggregate(c echo.Context) error {
    oneR, oneW := io.Pipe()
    twoR, twoW := io.Pipe()

    done := make(chan bool, 2)

    go func() {
        fmt.Println("Product Starting")
        response, err := http.Post("http://localhost:1323/products/import", "application/json", oneR)
        if err != nil {
            fmt.Println(err)
        } else {
            fmt.Println(response.Body)
        }
        done <- true
    }()

    go func() {
        fmt.Println("Import Starting")
        response, err := http.Post("http://localhost:1323/discounts/import", "application/json", twoR)
        if err != nil {
            fmt.Println(err)
        } else {
            fmt.Println(response.Body)
        }
        done <- true
    }()

    mw := io.MultiWriter(oneW, twoW)
    io.Copy(mw, c.Request().Body)

    <-done
    <-done

    return c.String(200, "Imported")
}

The output in console is:

Product Starting
Import Starting

The issue in OP code is that the http.Post calls never detects the EOF of the provided io.Reader.

That happens because the provided half write pipe is never closed, thus, the half read pipe never emits the regular EOF error.

As a note about OP comment that closing the half read pipe would generate irregular errors, one has to understand that reading from a closed pipe is not a correct behavior.

Thus in this situation, care should be taken to close the half write side right after the content has been copied.

The resulting source code should be changed to

func ImportAggregate(c echo.Context) error {
    oneR, oneW := io.Pipe()
    twoR, twoW := io.Pipe()

    done := make(chan bool, 2)

    go func() {
        fmt.Println("Product Starting")
        response, err := http.Post("http://localhost:1323/products/import", "application/json", oneR)
        if err != nil {
            fmt.Println(err)
        } else {
            fmt.Println(response.Body)
        }
        done <- true
    }()

    go func() {
        fmt.Println("Import Starting")
        response, err := http.Post("http://localhost:1323/discounts/import", "application/json", twoR)
        if err != nil {
            fmt.Println(err)
        } else {
            fmt.Println(response.Body)
        }
        done <- true
    }()

    mw := io.MultiWriter(oneW, twoW)
    io.Copy(mw, c.Request().Body)
    oneW.Close()
    twoW.Close()

    <-done
    <-done

    return c.String(200, "Imported")
}

Side notes beyond OP question:

  • an error check must implemented around the io.Copy in order to detect a transmission error.

  • it is not needed to close the half read side of the pipe, http.Post will do it after it received the EOF signal.

  • the goroutines responsible to consume the pipes must be declared and started before the input request is copied. The Pipes being synchronous, the code would block during the io.Copy waiting to be consumed on its other end.

  • the done chan does not require to be unbuffered (of length 2)

  • a way to forward error from outgoing requests to the outgoing response would be to use a channel of type (chan error), loop over it two times, and check for the first error encountered.