Goroutines,通道选择语句

I am having trouble structuring my goroutines and channels. My select statement keeps quitting before all goroutines are finished, I know the problem is where I am sending the done signal. Where should I send the done signal.

func startWorker(ok chan LeadRes, err chan LeadResErr, quit chan int, verbose bool, wg *sync.WaitGroup) {
    var results ProcessResults
    defer wg.Done()
    log.Info("Starting . . .")
    start := time.Now()

    for {
        select {
        case lead := <-ok:
            results.BackFill = append(results.BackFill, lead.Lead)
        case err := <-err:
            results.BadLeads = append(results.BadLeads, err)
        case <-quit:
            if verbose {
                log.Info("Logging errors from unprocessed leads . . .")
                logBl(results.BadLeads)
            }
            log.WithFields(log.Fields{
                "time-elapsed":                time.Since(start),
                "number-of-unprocessed-leads": len(results.BadLeads),
                "number-of-backfilled-leads":  len(results.BackFill),
            }).Info("Done")
            return
        }
    }
}

//BackFillParallel . . .
func BackFillParallel(leads []Lead, verbose bool) {
    var wg sync.WaitGroup
    gl, bl, d := getChans()
    for i, lead := range leads {
        done := false
        if len(leads)-1 == i {
            done = true
        }
        wg.Add(1)
        go func(lead Lead, done bool, wg *sync.WaitGroup) {
            ProcessLead(lead, gl, bl, d, done, wg)
        }(lead, done, &wg)

    }
    startWorker(gl, bl, d, verbose, &wg)
}

//ProcessLead . . .
func ProcessLead(lead Lead, c1 chan LeadRes, c2 chan LeadResErr, c3 chan int, done bool, wg *sync.WaitGroup) {
    defer wg.Done()
    var payloads []Payload
    for _, p := range lead.Payload {
        decMDStr, err := base64.StdEncoding.DecodeString(p.MetaData)
        if err != nil {
            c2 <- LeadResErr{lead, err.Error()}
        }
        var decMetadata Metadata
        if err := json.Unmarshal(decMDStr, &decMetadata); err != nil {
            goodMetadata, err := FixMDStr(string(decMDStr))
            if err != nil {
                c2 <- LeadResErr{lead, err.Error()}
            }
            p.MetaData = goodMetadata

            payloads = append(payloads, p)
        }
    }

    lead.Payload = payloads
    c1 <- LeadRes{lead}
    if done {
        c3 <- 0
    }
}

First a comment on what main problem I see with the code:

You are passing a done variable to the last ProcessLead call which in turn you use in ProcessLead to stop your worker via quit channel. The problem with this is, that the "last" ProcessLead call may finish BEFORE other ProcessLead calls as they are executed in parallel.

First improvement

Think of your problem as a pipeline. You have 3 steps:

  1. going through all the leads and starting a routine for each one
  2. the routines process their lead
  3. collecting the results

After spreading out in step 2 the simplest way to synchronise is the WaitGroup. As already mentioned you are not calling the synchronise and if you would, you would currently create a deadlock in connection with your collecting routine. You need another goroutine separating the sync from the collecting routine for this to work.

How that could look like (sry for removing some code, so I could better see the structure):

//BackFillParallel . . .
func BackFillParallel(leads []Lead, verbose bool) {
    gl, bl, d := make(chan LeadRes), make(chan LeadResErr), make(chan int)
    // additional goroutine with wg.Wait() and closing the quit channel
    go func(d chan int) {
        var wg sync.WaitGroup
        for i, lead := range leads {
            wg.Add(1)
            go func(lead Lead, wg *sync.WaitGroup) {
                ProcessLead(lead, gl, bl, wg)
            }(lead, &wg)
        }
        wg.Wait()
        // stop routine after all other routines are done
        // if your channels have buffers you might want make sure there is nothing in the buffer before closing
        close(d) // you can simply close a quit channel. just make sure to only close it once
    }(d)

    // now startworker is running parallel to wg.Wait() and close(d)
    startWorker(gl, bl, d, verbose)
}

func startWorker(ok chan LeadRes, err chan LeadResErr, quit chan int, verbose bool) {
    for {
        select {
        case lead := <-ok:
            fmt.Println(lead)
        case err := <-err:
            fmt.Println(err)
        case <-quit:
            return
        }
    }
}

//ProcessLead . . .
func ProcessLead(lead Lead, c1 chan LeadRes, c2 chan LeadResErr, wg *sync.WaitGroup) {
    defer wg.Done()
    var payloads []Payload
    for _, p := range lead.Payload {
        decMDStr, err := base64.StdEncoding.DecodeString(p.MetaData)
        if err != nil {
            c2 <- LeadResErr{lead, err.Error()}
        }
        var decMetadata Metadata
        if err := json.Unmarshal(decMDStr, &decMetadata); err != nil {
            goodMetadata, err := FixMDStr(string(decMDStr))
            if err != nil {
                c2 <- LeadResErr{lead, err.Error()}
            }
            p.MetaData = goodMetadata

            payloads = append(payloads, p)
        }
    }

    lead.Payload = payloads
    c1 <- LeadRes{lead}
}

Suggested Solution

As mentioned in a comment you might run into trouble if you have buffered channels. The complication comes with the two output channels you have (for Lead and LeadErr). You could avoid this with the following structure:

//BackFillParallel . . .
func BackFillParallel(leads []Lead, verbose bool) {
    gl, bl := make(chan LeadRes), make(chan LeadResErr)

    // one goroutine that blocks until all ProcessLead functions are done
    go func(gl chan LeadRes, bl chan LeadResErr) {
        var wg sync.WaitGroup
        for _, lead := range leads {
            wg.Add(1)
            go func(lead Lead, wg *sync.WaitGroup) {
                ProcessLead(lead, gl, bl, wg)
            }(lead, &wg)
        }
        wg.Wait()
    }(gl, bl)

    // main routine blocks until all results and errors are collected
    var wg sync.WaitGroup
    res, errs := []LeadRes{}, []LeadResErr{}
    wg.Add(2) // add 2 for resCollector and errCollector
    go resCollector(&wg, gl, res)
    go errCollector(&wg, bl, errs)
    wg.Wait()

    fmt.Println(res, errs) // in these two variables you will have the results.
}

func resCollector(wg *sync.WaitGroup, ok chan LeadRes, res []LeadRes) {
    defer wg.Done()
    for lead := range ok {
        res = append(res, lead)
    }
}

func errCollector(wg *sync.WaitGroup, ok chan LeadResErr, res []LeadResErr) {
    defer wg.Done()
    for err := range ok {
        res = append(res, err)
    }
}

// ProcessLead function as in "First improvement"