通道在workerpool上的死锁

I am playing around with channels by making a workerpool of a 1000 workers. Currently I am getting the following error:

fatal error: all goroutines are asleep - deadlock!

Here is my code:

package main

import "fmt"
import "time"


func worker(id int, jobs <-chan int, results chan<- int) {
    for j := range jobs {
        fmt.Println("worker", id, "started  job", j)
        time.Sleep(time.Second)
        fmt.Println("worker", id, "finished job", j)
        results <- j * 2
    }
}

func main() {
    jobs := make(chan int, 100)
    results := make(chan int, 100)

    for w := 1; w <= 1000; w++ {
        go worker(w, jobs, results)
    }

    for j := 1; j < 1000000; j++ {
        jobs <- j
    }
    close(jobs)
    fmt.Println("==========CLOSED==============")

    for i:=0;i<len(results);i++ {
        <-results
    }
}

Why is this happening? I am still new to go and I am hoping to make sense of this.

The problem is that your channels are filling up. The main() routine tries to put all jobs into the jobs channel before reading any results. But the results channel only has space for 100 results before any write to the channel will block, so all the workers will eventually block waiting for space in this channel – space that will never come, because main() has not started reading from results yet.

To quickly fix this, you can either make jobs big enough to hold all jobs, so the main() function can continue to the reading phase; or you can make results big enough to hold all results, so the workers can output their results without blocking.

A nicer approach is to make another goroutine to fill up the jobs queue, so main() can go straight to reading results:

func main() {
    jobs := make(chan int, 100)
    results := make(chan int, 100)

    for w := 1; w <= 1000; w++ {
        go worker(w, jobs, results)
    }

    go func() {
        for j := 1; j < 1000000; j++ {
            jobs <- j
        }
        close(jobs)
        fmt.Println("==========CLOSED==============")
    }

    for i := 1; i < 1000000; i++ {
        <-results
    }
}

Note that I had to change the final for loop to a fixed number of iterations, otherwise it might terminate before all the results have been read.

The following code:

    for j := 1; j < 1000000; j++ {
        jobs <- j
    }

should run in a separate goroutine, since all the workers will block waiting for the main gorourine to receive on the results channel, while the main goroutine is stuck in the loop.

While Thomas' answer is basically correct, I post my version which is IMO better Go and also works with unbuffered channels:

func main() {
    jobs := make(chan int)
    results := make(chan int)

    var wg sync.WaitGroup

    // you could init the WaitGroup's count here with one call but this is error
    // prone - if you change the loop's size you could forget to change the
    // WG's count. So call wg.Add in loop
    //wg.Add(1000)
    for w := 1; w <= 1000; w++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            worker(w, jobs, results)
        }()
    }

    go func() {
        for j := 1; j < 2000; j++ {
            jobs <- j
        }
        close(jobs)
        fmt.Println("==========CLOSED==============")
    }()

    // in this gorutine we wait until all "producer" routines are done
    // then close the results channel so that the consumer loop stops
    go func() {
        wg.Wait()
        close(results)
    }()

    for i := range results {
        fmt.Print(i, " ")
    }
    fmt.Println("==========DONE==============")
}
package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()

    for j := range jobs {
        fmt.Println("worker", id, "started  job", j)
        time.Sleep(time.Millisecond * time.Duration(10))
        fmt.Println("worker", id, "finished job", j)
        results <- j * 2
    }
}

func main() {
    jobs := make(chan int, 100)
    results := make(chan int, 100)

    wg := new(sync.WaitGroup)
    wg.Add(1000)

    for w := 1; w <= 1000; w++ {
        go worker(w, jobs, results, wg)
    }

    go func() {
        wg.Wait()
        close(results)
    }()

    go func() {
        for j := 1; j < 1000000; j++ {
            jobs <- j
        }
        close(jobs)
    }()

    sum := 0
    for v := range results {
        sum += v
    }

    fmt.Println("==========CLOSED==============")
    fmt.Println("sum", sum)
}