如何使用WaitGroup处理工作池中的错误?

I got a problem using sync.WaitGroup and select together. If you take a look at following http request pool you will notice that if an error occurs it will never be reported as wg.Done() will block and there is no read from the channel anymore.

package pool

import (
    "fmt"
    "log"
    "net/http"
    "sync"
)

var (
    MaxPoolQueue  = 100
    MaxPoolWorker = 10
)

type Pool struct {
    wg *sync.WaitGroup

    queue  chan *http.Request
    errors chan error
}

func NewPool() *Pool {
    return &Pool{
        wg: &sync.WaitGroup{},

        queue:  make(chan *http.Request, MaxPoolQueue),
        errors: make(chan error),
    }
}

func (p *Pool) Add(r *http.Request) {
    p.wg.Add(1)

    p.queue <- r
}

func (p *Pool) Run() error {
    for i := 0; i < MaxPoolWorker; i++ {
        go p.doWork()
    }

    select {
    case err := <-p.errors:
        return err
    default:
        p.wg.Wait()
    }

    return nil
}

func (p *Pool) doWork() {
    for r := range p.queue {
        fmt.Printf("Request to %s
", r.Host)

        p.wg.Done()

        _, err := http.DefaultClient.Do(r)

        if err != nil {
            log.Fatal(err)

            p.errors <- err
        } else {
            fmt.Printf("no error
")
        }
    }
}

Source can be found here

How can I still use WaitGroup but also get errors from go routines?

Just got the answer my self as I wrote the question and as I think it is an interesting case I would like to share it with you.

The trick to use sync.WaitGroup and chan together is that we wrap:

select {
    case err := <-p.errors:
        return err
    default:
        p.wg.Done()
}

Together in a for loop:

for {
    select {
        case err := <-p.errors:
            return err
        default:
            p.wg.Done()
    }
}

In this case select will always check for errors and wait if nothing happens :)

It looks a bit like the fail-fast mechanism enabled by the Tomb library (Tomb V2 GoDoc):

The tomb package handles clean goroutine tracking and termination.

If any of the tracked goroutines returns a non-nil error, or the Kill or Killf method is called by any goroutine in the system (tracked or not), the tomb Err is set, Alive is set to false, and the Dying channel is closed to flag that all tracked goroutines are supposed to willingly terminate as soon as possible.

Once all tracked goroutines terminate, the Dead channel is closed, and Wait unblocks and returns the first non-nil error presented to the tomb via a result or an explicit Kill or Killf method call, or nil if there were no errors.

You can see an example in this playground:

(extract)

// start runs all the given functions concurrently
// until either they all complete or one returns an
// error, in which case it returns that error.
//
// The functions are passed a channel which will be closed
// when the function should stop.
func start(funcs []func(stop <-chan struct{}) error) error {
    var tomb tomb.Tomb
    var wg sync.WaitGroup
    allDone := make(chan struct{})
    // Start all the functions.
    for _, f := range funcs {
        f := f
        wg.Add(1)
        go func() {
            defer wg.Done()
            if err := f(tomb.Dying()); err != nil {
                tomb.Kill(err)
            }
        }()
    }
    // Start a goroutine to wait for them all to finish.
    go func() {
        wg.Wait()
        close(allDone)
    }()

    // Wait for them all to finish, or one to fail
    select {
    case <-allDone:
    case <-tomb.Dying():
    }
    tomb.Done()
    return tomb.Err()
}