在Go中使用输入和输出通道

I'm trying to use channels to pass data to and back from go routines. At some point during execution the process freezes up. I haven't been able to determine when or why via Delve. What exactly am I doing wrong here?

func handler(w http.ResponseWriter, r *http.Request) {
    records := liaison.Parse(r) // Parse incoming JSON into a slice.
    nRecords := len(records)
    in, out := make(chan map[string]string), make(chan map[string]string)

    // Start a go routine for each CPU core.          
    for i := 0; i < nCores; i++ {
        go clean(in, out)
    }

    // Pass each map in the slice to the input channel.
    for _, record := range records {
        in <- record
    }

    // Replace each index of the records slice with cleaned data.
    for i := 0; i < nRecords; i++ {
        cleaned := <-out
        records[i] = cleaned
    }

    close(in)
    close(out)

    liaison.Respond(w, &records)
}

func clean(in, out chan map[string]string) {
    record := <-in
    // run cleaning rules on input record.
    out <- record
}

The program uses unbuffered channels. Communication on an unbuffered channel does not succeed until the both the sender and receiver are ready.

If nRecords is greater than nCores, then the program will deadlock: The cleaner goroutines cannot send to out until the handler goroutine receives from out. The handler goroutine cannot receive because it blocks on sending to in.

If nRecords is less than nCores, the cleaners that didn't get work will panic on send to out. When close(in) is called from the handler goroutine, the cleaners that didn't get work will receive a zero value from <-in. The cleaner goroutines will process that zero value and attempt to send to out. Concurrently with this, the handler goroutine closes out. The application panics because send on a closed channel is not allowed.

Here's a fix:

func handler(w http.ResponseWriter, r *http.Request) {
    records := liaison.Parse(r) // Parse incoming JSON into a slice.
    nRecords := len(records)
    in, out := make(chan map[string]string), make(chan map[string]string)

    // Start a go routine for each CPU core.
    for i := 0; i < nCores; i++ {
        go clean(in, out)
    }

    // Start a goroutine to feed the data. This this allows
    // the handler goroutine to continue to receiving on out.
    go func() {
        for _, record := range records {
            in <- record
        }
        // Close the channel. This causes the cleaners to exit
        // from the for / range loops.
        close(in)
    }()

    // Replace each index of the records slice with cleaned data.
    for i := 0; i < nRecords; i++ {
        cleaned := <-out
        records[i] = cleaned
    }

    liaison.Respond(w, &records)
}

func clean(in, out chan map[string]string) {
    // use for / range to read records until in is closed.
    for record := range in {
        // run cleaning rules on input record.
        out <- record
    }
}

An alternative approach is to use to change in to a buffered channel with space for all records:

func handler(w http.ResponseWriter, r *http.Request) {
    records := liaison.Parse(r) // Parse incoming JSON into a slice.
    nRecords := len(records)
    in := make(chan map[string]string, nRecords) // <-- note second argument
    out := make(chan map[string]string)

    // Start a go routine for each CPU core.
    for i := 0; i < nCores; i++ {
        go clean(in, out)
    }

    for _, record := range records {
        // this will not block because cap(in) == len(records).
        in <- record
    }
    close(in)

    // Replace each index of the records slice with cleaned data.
    for i := 0; i < nRecords; i++ {
        cleaned := <-out
        records[i] = cleaned
    }

    liaison.Respond(w, &records)
}