I am creating an email sending application in Go.
I have encountered a problem when an email could not be sent successfully. In these cases, I want to return the email in a go channel, but the channel is closed.
How I can prevent the channel being closed before all tasks have completed?
This is the function with this problem:
func worker(toSend chan combo, tried chan combo, s smptInfo, wg *sync.WaitGroup) {
for try := range toSend {
startSend := time.Now()
delegate := make(chan bool, 1)
go sendEmailSSH(try, delegate, s, try.line)
select {
case res := <-delegate:
if res == true {
try.success = res
tried <- try
} else {
toSend <- try // if send not successfull return back to channel
}
case <-time.After(smtpTimeout):
toSend <- try // if send timeout return back to channel
}
Pauza(startSend) // if send more fast like limit send, wait.
}
wg.Done()
}
The complete application can be found on GitHub.
Here's the approach I would suggest. I have removed few SMTP specific parts, for simplicity. You can plug them in again.
1. Sample code:
package main
import (
"fmt"
"log"
"math/rand"
"sync"
"time"
)
type combo struct {
success bool
toEmail string
}
func sendEmailSSH(try combo, delegate chan bool) {
defer close(delegate)
r := rand.Intn(7)
time.Sleep(time.Duration(r) * time.Second)
if r%2 == 0 {
fmt.Printf("sendEmailSSH: failing randomly after %d seconds: %+v
", r, try)
delegate <- false
} else {
fmt.Printf("sendEmailSSH: successful after %d seconds: %+v
", r, try)
delegate <- true
}
}
func sendEmailSSHWrapper(try combo, toRetry chan combo) {
// do something with try
delegate := make(chan bool, 1)
smtpTimeout := 3 // if no success within 3 seconds, mark it as failed
go sendEmailSSH(try, delegate)
go func() {
select {
case res := <-delegate:
if res == true {
try.success = res
} else {
toRetry <- try // if send not successfull, add to retry channel
}
case <-time.After(time.Duration(smtpTimeout) * time.Second):
fmt.Printf("sendEmailSSHWrapper: failing due to timeout: %+v
", try)
toRetry <- try // if send timeout, add to retry channel
}
}()
}
// where we actually do the work
func worker(toSend chan combo, wg *sync.WaitGroup, id int) {
toRetry := make(chan combo, 5)
retryDone := false
for try := range toSend {
go sendEmailSSHWrapper(try, toRetry)
}
for !retryDone {
select {
case try, ok := <-toRetry:
if !ok {
log.Println("toRetry is already closed")
retryDone = true
continue
}
fmt.Printf("worker %d: picking up %+v for retry
", id, try)
go sendEmailSSHWrapper(try, toRetry)
case <-time.After(time.Duration(15) * time.Second):
fmt.Printf("worker %d: no data in toRetry(%d) channel for 15 seconds. closing toRetry now
", id, id)
close(toRetry)
toRetry = nil
retryDone = true
}
}
wg.Done()
}
func main() {
var wg sync.WaitGroup // keep track of the workers
toSend := make(chan combo) // to the workers
// emailList := []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}
emailList := []string{"a", "b", "c", "d", "e", "f"}
// initialize n workers
for i := 0; i < 5; i++ {
wg.Add(1)
go worker(toSend, &wg, i)
}
var count int = 0
for i := 0; i < len(emailList); i++ {
count++
toSend <- combo{toEmail: emailList[i], success: false}
}
close(toSend)
wg.Wait()
fmt.Println("Send emails Done!")
}
Try it here: https://play.golang.org/p/W7ur2wr3HeD
2. What I have done:
toSend
channel. It won't be used any further.go sendEmailSSH(try, delegate, s, try.line)
is not very optimised, as after starting the go routine, it is anyways blocked on a unary sized channel, which is as good as a serial call.go sendEmailSSHWrapper(try, toRetry)
, and it internally calls actual sendEmailSSHWrapper
function & monitors delegate channel.sendEmailSSH
randomly fails few requests, for the purpose of demonstration.sendEmailSSHWrapper
monitors delegate
it's own channel for failures, and also checks for timeout (I kept it 3 seconds for demonstration)toSend
channel. And then moves to toRetry
channel. It works on it, unless it sees no data on this channel for 15 seconds(again, kept high for demonstration)3. What you will need to add:
You should really implement some mechanism so that you retry only a given number of times for any given email address. Or else, you might be stuck in the for loop forever, for any faulty email address, or any unexpected scenario (multiple continuous timeouts due to latency), maybe using a map to track counts of retried entries. Or even simpler, add a retryCount int
to your combo
struct, and increment it with every retry, and check before retrying every time.
Note that in both your and my approach, we are retrying on timeout. Are you sure if timeout happens, email will never be delivered? If not, you should really be using context with cancellation
, or someway to kill the actual go routine if it takes more time, before retrying. Else, on some bad day, if the latency is more, you might end up timing out a lot, and keep retrying. While in reality you would be spamming the user with a lot of emails.