I have the following function which spins off a given amount of go routines
func (r *Runner) Execute() {
var wg sync.WaitGroup
wg.Add(len(r.pipelines))
for _, p := range r.pipelines {
go executePipeline(p, &wg)
}
wg.Wait()
errs := ....//contains list of errors reported by any/all go routines
}
I was thinking there might be some way with channels, but I can't seem to figure it out.
One way to do this is using mutexes if you can make executePipeline retuen errors:
// ...
for _, p := range r.pipelines {
go func(p pipelineType) {
if err := executePipeline(p, &wg); err != nil {
mu.Lock()
errs = append(errs, err)
mu.UnLock()
}
}(p)
}
To use a channel, you can have a separate goroutine listning for errors:
errCh := make(chan error)
go func() {
for e := range errCh {
errs = append(errs, e)
}
}
and in the Execute
function, make the following changes:
// ...
wg.Add(len(r.pipelines))
for _, p := range r.pipelines {
go func(p pipelineType) {
if err := executePipeline(p, &wg); err != nil {
errCh <- err
}
}(p)
}
wg.Wait()
close(errCh)
You can always use @zerkms method listed above if the number of goroutines is not high.
instead of returning error from executePipleline
and using a anonymous function wrapper, you can always make above changes within the function itself.
You can use channels as @Kaveh Shahbazian suggested:
func (r *Runner) Execute() {
pipelineChan := makePipeline(r.pipelines)
for cnt := 0; cnt < len(r.pipelines); cnt++{
//recieve from channel
p := <- pipelineChan
//do something with the result
}
}
func makePipeline(pipelines []pipelineType) <-chan pipelineType{
pipelineChan := make(chan pipelineType)
go func(){
for _, p := range pipelines {
go func(p pipelineType){
pipelineChan <- executePipeline(p)
}(p)
}
}()
return pipelineChan
}
Please see this example: https://gist.github.com/steven-ferrer/9b2eeac3eed3f7667e8976f399d0b8ad