在关闭频道之前,我如何等待所有工作完成?

I am creating an email sending application in Go.

I have encountered a problem when an email could not be sent successfully. In these cases, I want to return the email in a go channel, but the channel is closed.

How I can prevent the channel being closed before all tasks have completed?

This is the function with this problem:

func worker(toSend chan combo, tried chan combo, s smptInfo, wg *sync.WaitGroup) {
    for try := range toSend {

        startSend := time.Now()
        delegate := make(chan bool, 1)
        go sendEmailSSH(try, delegate, s, try.line)
        select {
        case res := <-delegate:
            if res == true {
                try.success = res
                tried <- try
            } else {
                toSend <- try // if send not successfull return back to channel
            }
        case <-time.After(smtpTimeout):
            toSend <- try // if send timeout return back to channel
        }

        Pauza(startSend) // if send more fast like limit send, wait.
    }

    wg.Done()
}

The complete application can be found on GitHub.

Here's the approach I would suggest. I have removed few SMTP specific parts, for simplicity. You can plug them in again.

1. Sample code:

package main

import (
    "fmt"
    "log"
    "math/rand"
    "sync"
    "time"
)

type combo struct {
    success bool
    toEmail string
}

func sendEmailSSH(try combo, delegate chan bool) {

    defer close(delegate)

    r := rand.Intn(7)
    time.Sleep(time.Duration(r) * time.Second)
    if r%2 == 0 {
        fmt.Printf("sendEmailSSH: failing randomly after %d seconds: %+v
", r, try)

        delegate <- false
    } else {
        fmt.Printf("sendEmailSSH: successful after %d seconds: %+v
", r, try)

        delegate <- true

    }

}

func sendEmailSSHWrapper(try combo, toRetry chan combo) {

    // do something with try
    delegate := make(chan bool, 1)
    smtpTimeout := 3 // if no success within 3 seconds, mark it as failed

    go sendEmailSSH(try, delegate)


    go func() {
                select {
        case res := <-delegate:
            if res == true {
                try.success = res
            } else {
                toRetry <- try // if send not successfull, add to retry channel
            }
        case <-time.After(time.Duration(smtpTimeout) * time.Second):
            fmt.Printf("sendEmailSSHWrapper: failing due to timeout: %+v
", try)

            toRetry <- try // if send timeout, add to retry channel
        }

    }()

}

// where we actually do the work
func worker(toSend chan combo, wg *sync.WaitGroup, id int) {

    toRetry := make(chan combo, 5)
    retryDone := false

    for try := range toSend {
        go sendEmailSSHWrapper(try, toRetry)
    }

    for !retryDone {
        select {

        case try, ok := <-toRetry:
            if !ok {
                log.Println("toRetry is already closed")
                retryDone = true
                continue
            }
            fmt.Printf("worker %d: picking up %+v for retry
", id, try)
            go sendEmailSSHWrapper(try, toRetry)
        case <-time.After(time.Duration(15) * time.Second):
            fmt.Printf("worker %d: no data in toRetry(%d) channel for 15 seconds. closing toRetry now
", id, id)
            close(toRetry)
            toRetry = nil
            retryDone = true
        }

    }

    wg.Done()
}

func main() {

    var wg sync.WaitGroup // keep track of the workers

    toSend := make(chan combo) // to the workers

    // emailList := []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}
    emailList := []string{"a", "b", "c", "d", "e", "f"}


    // initialize n workers

    for i := 0; i < 5; i++ {
        wg.Add(1)
        go worker(toSend, &wg, i)
    }

    var count int = 0

    for i := 0; i < len(emailList); i++ {
        count++
        toSend <- combo{toEmail: emailList[i], success: false}
    }

    close(toSend)
    wg.Wait()

    fmt.Println("Send emails Done!")
}

Try it here: https://play.golang.org/p/W7ur2wr3HeD

2. What I have done:

  1. Whenever main is done adding data, it should close the toSend channel. It won't be used any further.
  2. Create a wrapper over sendEmailSSH function. Your current implementation of calling go sendEmailSSH(try, delegate, s, try.line) is not very optimised, as after starting the go routine, it is anyways blocked on a unary sized channel, which is as good as a serial call.
  3. Alternatively, I have called go sendEmailSSHWrapper(try, toRetry), and it internally calls actual sendEmailSSHWrapper function & monitors delegate channel.
  4. sendEmailSSH randomly fails few requests, for the purpose of demonstration.
  5. sendEmailSSHWrapper monitors delegate it's own channel for failures, and also checks for timeout (I kept it 3 seconds for demonstration)
  6. Worker first completely exhausts the toSend channel. And then moves to toRetry channel. It works on it, unless it sees no data on this channel for 15 seconds(again, kept high for demonstration)

3. What you will need to add:

  1. You should really implement some mechanism so that you retry only a given number of times for any given email address. Or else, you might be stuck in the for loop forever, for any faulty email address, or any unexpected scenario (multiple continuous timeouts due to latency), maybe using a map to track counts of retried entries. Or even simpler, add a retryCount int to your combo struct, and increment it with every retry, and check before retrying every time.

  2. Note that in both your and my approach, we are retrying on timeout. Are you sure if timeout happens, email will never be delivered? If not, you should really be using context with cancellation, or someway to kill the actual go routine if it takes more time, before retrying. Else, on some bad day, if the latency is more, you might end up timing out a lot, and keep retrying. While in reality you would be spamming the user with a lot of emails.