I'm trying to use channels to pass data to and back from go routines. At some point during execution the process freezes up. I haven't been able to determine when or why via Delve. What exactly am I doing wrong here?
func handler(w http.ResponseWriter, r *http.Request) {
records := liaison.Parse(r) // Parse incoming JSON into a slice.
nRecords := len(records)
in, out := make(chan map[string]string), make(chan map[string]string)
// Start a go routine for each CPU core.
for i := 0; i < nCores; i++ {
go clean(in, out)
}
// Pass each map in the slice to the input channel.
for _, record := range records {
in <- record
}
// Replace each index of the records slice with cleaned data.
for i := 0; i < nRecords; i++ {
cleaned := <-out
records[i] = cleaned
}
close(in)
close(out)
liaison.Respond(w, &records)
}
func clean(in, out chan map[string]string) {
record := <-in
// run cleaning rules on input record.
out <- record
}
The program uses unbuffered channels. Communication on an unbuffered channel does not succeed until the both the sender and receiver are ready.
If nRecords
is greater than nCores
, then the program will deadlock: The cleaner goroutines cannot send to out
until the handler goroutine receives from out
. The handler goroutine cannot receive because it blocks on sending to in
.
If nRecords
is less than nCores
, the cleaners that didn't get work will panic on send to out
. When close(in)
is called from the handler goroutine, the cleaners that didn't get work will receive a zero value from <-in
. The cleaner goroutines will process that zero value and attempt to send to out
. Concurrently with this, the handler goroutine closes out
. The application panics because send on a closed channel is not allowed.
Here's a fix:
func handler(w http.ResponseWriter, r *http.Request) {
records := liaison.Parse(r) // Parse incoming JSON into a slice.
nRecords := len(records)
in, out := make(chan map[string]string), make(chan map[string]string)
// Start a go routine for each CPU core.
for i := 0; i < nCores; i++ {
go clean(in, out)
}
// Start a goroutine to feed the data. This this allows
// the handler goroutine to continue to receiving on out.
go func() {
for _, record := range records {
in <- record
}
// Close the channel. This causes the cleaners to exit
// from the for / range loops.
close(in)
}()
// Replace each index of the records slice with cleaned data.
for i := 0; i < nRecords; i++ {
cleaned := <-out
records[i] = cleaned
}
liaison.Respond(w, &records)
}
func clean(in, out chan map[string]string) {
// use for / range to read records until in is closed.
for record := range in {
// run cleaning rules on input record.
out <- record
}
}
An alternative approach is to use to change in
to a buffered channel with space for all records:
func handler(w http.ResponseWriter, r *http.Request) {
records := liaison.Parse(r) // Parse incoming JSON into a slice.
nRecords := len(records)
in := make(chan map[string]string, nRecords) // <-- note second argument
out := make(chan map[string]string)
// Start a go routine for each CPU core.
for i := 0; i < nCores; i++ {
go clean(in, out)
}
for _, record := range records {
// this will not block because cap(in) == len(records).
in <- record
}
close(in)
// Replace each index of the records slice with cleaned data.
for i := 0; i < nRecords; i++ {
cleaned := <-out
records[i] = cleaned
}
liaison.Respond(w, &records)
}