I wants to create a fan-in function using multiple go routines returning channel here is my code.
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
var wg, wg2 sync.WaitGroup
func main() {
final := talk(boring("Joe"), boring("Ann"))
for i := 0; i < 10; i++ {
fmt.Println(<-final)
}
fmt.Println("You are both boring I'm leaving")
}
func talk(input1, input2 <-chan string) <-chan string {
out := make(chan string)
go func() {
wg.Add(1)
for {
out <- <-input1
}
}()
go func() {
wg.Add(1)
for {
out <- <-input2
}
}()
wg.Done()
close(out)
return out
}
func boring(msg string) <-chan string {
c := make(chan string)
for i := 0; i < 5; i++ {
c <- fmt.Sprintf("%s%d
", msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
return c
}
But I got an error after running above code
all goroutines are asleep - deadlock
I have tried to close channels but still it is giving me the error. I have tried to assign boring returned channels to Joe and Ann and then pass those channels to talk function for multiplexing still no success. I am new to go learning channels not clear on this concept.
Instead of wait groups, you can use select
: https://tour.golang.org/concurrency/5
The
select
statement lets a goroutine wait on multiple communication operations.A
select
blocks until one of its cases can run, then it executes that case. It chooses one at random if multiple are ready.
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
final := talk(boring("Joe"), boring("Ann"))
for i := 0; i < 10; i++ {
fmt.Println(<-final)
}
fmt.Println("You are both boring I'm leaving")
}
func talk(input1, input2 <-chan string) <-chan string {
c := make(chan string)
go func() {
for {
select {
case s := <-input1:
c <- s
case s := <-input2:
c <- s
}
}
}()
return c
}
func boring(msg string) <-chan string {
c := make(chan string)
go func() {
for i := 0; i < 5; i++ {
c <- fmt.Sprintf("%s: %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}()
return c
}
Edit:
In your given example, boring
function doesn't use goroutine for repeated send over channel which will block forever, because: https://tour.golang.org/concurrency/2
By default, sends and receives block until the other side is ready. This allows goroutines to synchronize without explicit locks or condition variables.
Also, wg.Done()
needs to be part of goroutine.
I got it working by doing above changes: https://play.golang.org/p/YN0kfBO6iT
You cannot stop a goroutine by itself, I suggest signaling the goroutine to quit with something along these lines:
stop := make(chan bool)
go func() {
for {
select {
case <- stop:
return
default:
// Do other stuff
}
}
}()
// Do stuff
// Quit goroutine
stop<- true