I'm working on picking up a few of concurrency patterns of Go. I looked at implementing background workers using goroutines and input/output channels, and noticed that when I sending new jobs to the receiving channel (essentially enqueuing new jobs) I have to do it in a goroutine or the scheduling gets messed up. Meaning:
for _, jobData := range(dataSet) {
input <- jobData
}
go func() {
for _, jobData := range(dataSet) {
input <- jobData
}
}()
For something more concrete, I played with some nonsense code (here it is in go playground):
package main
import (
"log"
"runtime"
)
func doWork(data int) (result int) {
// ... some 'heavy' computation
result = data * data
return
}
// do the processing of the input and return
// results on the output channel
func Worker(input, output chan int) {
for data := range input {
output <- doWork(data)
}
}
func ScheduleWorkers() {
input, output := make(chan int), make(chan int)
for i := 0 ; i < runtime.NumCPU() ; i++ {
go Worker(input, output)
}
numJobs := 20
// THIS DOESN'T WORK
// and crashes the program
/*
for i := 0 ; i < numJobs ; i++ {
input <- i
}
*/
// THIS DOES
go func() {
for i := 0 ; i < numJobs ; i++ {
input <- i
}
}()
results := []int{}
for i := 0 ; i < numJobs ; i++ {
// read off results
result := <-output
results = append(results, result)
// do stuff...
}
log.Printf("Result: %#v
", results)
}
func main() {
ScheduleWorkers()
}
I'm trying to wrap my head around this subtle difference - help is appreciated. Thanks.
Your ScheduleWorks
function sends, in the main goroutine (ie. the one that runs the main()
function, in which the program starts), a value via input
. A Worker
receives it, and sends another value via output
. But there is nobody receiving from output
at that point, so the program can't go on, and the main goroutine sends the next value to another Worker
.
Repeat this reasoning for each Worker. You have runtime.NumCPU()
workers, that probably is less than numJobs
. Let's say that runtime.NumCPU() == 4
, so you have 4 workers. At the end, you have successfully sent 4 values, each one to one Worker
. Since nobody is reading from output
, all Workers are busy trying to send, so they can't accept more data via input
, so the fifth input <- i
will hang. At this point every goroutine is waiting; that's the deadlock.
You will notice that, if you launch 20 or more Workers instead of runtime.NumCPU()
, the program works. That's because the main goroutine can send everything that it wants via input
, since there are enough workers to receive them.
If, instead of all of this, you put the input <- i
loop in another goroutine, as in your successful example, the main
goroutine (in which ScheduleWorks
runs) can go on and start reading from output
. So, each time this new goroutine sends a value, the worker sends another via output
, the main goroutine gets this output, and the worker can receive another value. Nobody waits, and the program succeeds.
This is because everything in Go is blocking by default.
When you send the first value on an unbuffered channel, it blocks until a receiver takes that value off the channel.
Channels can be buffered by adding a "capacity".
For example:
make(chan int, 20) // Make a buffered channel of int with capacity 20
From the Go spec:
The capacity, in number of elements, sets the size of the buffer in the channel. If the capacity is greater than zero, the channel is asynchronous: communication operations succeed without blocking if the buffer is not full (sends) or not empty (receives), and elements are received in the order they are sent. If the capacity is zero or absent, the communication succeeds only when both a sender and receiver are ready.
You can get your original function working by using buffered channels instead of unbuffered channels, but wrapping your function call in a goroutine is probably a better approach as it's actually concurrent.
From Effective Go (Read this document in full! It's probably the most linked document in Go answers on Stack Overflow):
Receivers always block until there is data to receive. If the channel is unbuffered, the sender blocks until the receiver has received the value. If the channel has a buffer, the sender blocks only until the value has been copied to the buffer; if the buffer is full, this means waiting until some receiver has retrieved a value.
If you use buffered channels then you're just filling up the channel, moving on, and then draining it again. Not concurrently.
Example:
Change
input, output := make(chan int), make(chan int)
To
input, output := make(chan int, 20), make(chan int, 20)
Please note, that for these kind of tasks a sync.WaitGroup
might be an alternative way to get it done. That is, if you require all data to be processed before you want to continue.
Read up on it in the documentation of the sync package: http://golang.org/pkg/sync#WaitGroup