I am having trouble structuring my goroutines and channels. My select statement keeps quitting before all goroutines are finished, I know the problem is where I am sending the done signal. Where should I send the done signal.
func startWorker(ok chan LeadRes, err chan LeadResErr, quit chan int, verbose bool, wg *sync.WaitGroup) {
var results ProcessResults
defer wg.Done()
log.Info("Starting . . .")
start := time.Now()
for {
select {
case lead := <-ok:
results.BackFill = append(results.BackFill, lead.Lead)
case err := <-err:
results.BadLeads = append(results.BadLeads, err)
case <-quit:
if verbose {
log.Info("Logging errors from unprocessed leads . . .")
logBl(results.BadLeads)
}
log.WithFields(log.Fields{
"time-elapsed": time.Since(start),
"number-of-unprocessed-leads": len(results.BadLeads),
"number-of-backfilled-leads": len(results.BackFill),
}).Info("Done")
return
}
}
}
//BackFillParallel . . .
func BackFillParallel(leads []Lead, verbose bool) {
var wg sync.WaitGroup
gl, bl, d := getChans()
for i, lead := range leads {
done := false
if len(leads)-1 == i {
done = true
}
wg.Add(1)
go func(lead Lead, done bool, wg *sync.WaitGroup) {
ProcessLead(lead, gl, bl, d, done, wg)
}(lead, done, &wg)
}
startWorker(gl, bl, d, verbose, &wg)
}
//ProcessLead . . .
func ProcessLead(lead Lead, c1 chan LeadRes, c2 chan LeadResErr, c3 chan int, done bool, wg *sync.WaitGroup) {
defer wg.Done()
var payloads []Payload
for _, p := range lead.Payload {
decMDStr, err := base64.StdEncoding.DecodeString(p.MetaData)
if err != nil {
c2 <- LeadResErr{lead, err.Error()}
}
var decMetadata Metadata
if err := json.Unmarshal(decMDStr, &decMetadata); err != nil {
goodMetadata, err := FixMDStr(string(decMDStr))
if err != nil {
c2 <- LeadResErr{lead, err.Error()}
}
p.MetaData = goodMetadata
payloads = append(payloads, p)
}
}
lead.Payload = payloads
c1 <- LeadRes{lead}
if done {
c3 <- 0
}
}
First a comment on what main problem I see with the code:
You are passing a done
variable to the last ProcessLead
call which in turn you use in ProcessLead
to stop your worker via quit
channel. The problem with this is, that the "last" ProcessLead
call may finish BEFORE other ProcessLead
calls as they are executed in parallel.
Think of your problem as a pipeline. You have 3 steps:
After spreading out in step 2 the simplest way to synchronise is the WaitGroup
. As already mentioned you are not calling the synchronise and if you would, you would currently create a deadlock in connection with your collecting routine. You need another goroutine separating the sync from the collecting routine for this to work.
How that could look like (sry for removing some code, so I could better see the structure):
//BackFillParallel . . .
func BackFillParallel(leads []Lead, verbose bool) {
gl, bl, d := make(chan LeadRes), make(chan LeadResErr), make(chan int)
// additional goroutine with wg.Wait() and closing the quit channel
go func(d chan int) {
var wg sync.WaitGroup
for i, lead := range leads {
wg.Add(1)
go func(lead Lead, wg *sync.WaitGroup) {
ProcessLead(lead, gl, bl, wg)
}(lead, &wg)
}
wg.Wait()
// stop routine after all other routines are done
// if your channels have buffers you might want make sure there is nothing in the buffer before closing
close(d) // you can simply close a quit channel. just make sure to only close it once
}(d)
// now startworker is running parallel to wg.Wait() and close(d)
startWorker(gl, bl, d, verbose)
}
func startWorker(ok chan LeadRes, err chan LeadResErr, quit chan int, verbose bool) {
for {
select {
case lead := <-ok:
fmt.Println(lead)
case err := <-err:
fmt.Println(err)
case <-quit:
return
}
}
}
//ProcessLead . . .
func ProcessLead(lead Lead, c1 chan LeadRes, c2 chan LeadResErr, wg *sync.WaitGroup) {
defer wg.Done()
var payloads []Payload
for _, p := range lead.Payload {
decMDStr, err := base64.StdEncoding.DecodeString(p.MetaData)
if err != nil {
c2 <- LeadResErr{lead, err.Error()}
}
var decMetadata Metadata
if err := json.Unmarshal(decMDStr, &decMetadata); err != nil {
goodMetadata, err := FixMDStr(string(decMDStr))
if err != nil {
c2 <- LeadResErr{lead, err.Error()}
}
p.MetaData = goodMetadata
payloads = append(payloads, p)
}
}
lead.Payload = payloads
c1 <- LeadRes{lead}
}
As mentioned in a comment you might run into trouble if you have buffered channels. The complication comes with the two output channels you have (for Lead and LeadErr). You could avoid this with the following structure:
//BackFillParallel . . .
func BackFillParallel(leads []Lead, verbose bool) {
gl, bl := make(chan LeadRes), make(chan LeadResErr)
// one goroutine that blocks until all ProcessLead functions are done
go func(gl chan LeadRes, bl chan LeadResErr) {
var wg sync.WaitGroup
for _, lead := range leads {
wg.Add(1)
go func(lead Lead, wg *sync.WaitGroup) {
ProcessLead(lead, gl, bl, wg)
}(lead, &wg)
}
wg.Wait()
}(gl, bl)
// main routine blocks until all results and errors are collected
var wg sync.WaitGroup
res, errs := []LeadRes{}, []LeadResErr{}
wg.Add(2) // add 2 for resCollector and errCollector
go resCollector(&wg, gl, res)
go errCollector(&wg, bl, errs)
wg.Wait()
fmt.Println(res, errs) // in these two variables you will have the results.
}
func resCollector(wg *sync.WaitGroup, ok chan LeadRes, res []LeadRes) {
defer wg.Done()
for lead := range ok {
res = append(res, lead)
}
}
func errCollector(wg *sync.WaitGroup, ok chan LeadResErr, res []LeadResErr) {
defer wg.Done()
for err := range ok {
res = append(res, err)
}
}
// ProcessLead function as in "First improvement"