Here is a small example program with the basic architecture/flow that I am trying to get working. How do I get all the numbers and "end" messages to print out? I have tried putting close statements here and there, but it either doesn't work, or I get panics about trying to close an already closed channel...
package main
import (
"fmt"
"time"
)
func main() {
d := make(chan uint)
go bar(d)
c1 := make(chan uint)
c2 := make(chan uint)
c3 := make(chan uint)
go foo(c1, d)
go foo(c2, d)
go foo(c3, d)
c1 <- 1
c2 <- 2
c3 <- 3
c1 <- 4
c2 <- 5
c3 <- 6
c1 <- 7
c2 <- 8
c3 <- 9
}
func foo(c chan uint, d chan uint) {
fmt.Println("foo start")
for stuff := range c {
time.Sleep(1)
d <- stuff * 2
}
fmt.Println("foo end")
}
func bar(d chan uint) {
fmt.Println("bar start")
for stuff := range d {
fmt.Printf("bar received %d
", stuff)
}
fmt.Println("bar end")
}
The output I am getting looks like this. Notice the last set of numbers and the "end" outputs are missing.
foo start
bar start
foo start
foo start
bar received 6
bar received 2
bar received 4
bar received 12
bar received 8
bar received 10
In my actual program, each "foo" function is doing filtering and a bunch of heavy string regexp stuff. And I need the "bar" function, because it has the job of reordering based on a timestamp, and serializing printing, so output doesn't get interlaced.
Your program is exiting before all goroutines are done. You need to wait for both the foo
and bar
goroutines to finish before returning from main
.
The usual way of doing this is by using a sync.WaitGroup
, but since main
isn't the producer for the d
channel, you will have to ensure that all sends on that channel are finished before closing that with a second WaitGroup (or equivalent).
var (
fooWG sync.WaitGroup
barWG sync.WaitGroup
)
func main() {
d := make(chan uint)
barWG.Add(1)
go bar(d)
c1 := make(chan uint)
c2 := make(chan uint)
c3 := make(chan uint)
fooWG.Add(3)
go foo(c1, d)
go foo(c2, d)
go foo(c3, d)
c1 <- 1
c2 <- 2
c3 <- 3
c1 <- 4
c2 <- 5
c3 <- 6
c1 <- 7
c2 <- 8
c3 <- 9
// close the channels so the foo goroutines can exit
close(c1)
close(c2)
close(c3)
fooWG.Wait()
// all foo are done, so it's safe to close d and wait for bar
close(d)
barWG.Wait()
}
func foo(c chan uint, d chan uint) {
defer fooWG.Done()
fmt.Println("foo start")
for stuff := range c {
time.Sleep(1)
d <- stuff * 2
}
fmt.Println("foo end")
}
func bar(d chan uint) {
defer barWG.Done()
fmt.Println("bar start")
for stuff := range d {
fmt.Printf("bar received %d
", stuff)
}
fmt.Println("bar end")
}
JimB's answer definitely works, but it's adding more complexity than is actually needed in the code. A simple complete channel would suffice to synchronize this code though completion.
Also, with channel synchronization, the time.Sleep(1)
command is no longer needed for functionality:
package main
import (
"fmt"
"time"
)
func main() {
d := make(chan uint)
complete := make(chan bool)
go bar(d, complete)
c1 := make(chan uint)
c2 := make(chan uint)
c3 := make(chan uint)
go foo(c1, d)
go foo(c2, d)
go foo(c3, d)
c1 <- 1
c2 <- 2
c3 <- 3
c1 <- 4
c2 <- 5
c3 <- 6
c1 <- 7
c2 <- 8
c3 <- 9
//If you know the number of inputs, count them to ensure completion
for i:=0; i < 9; i++{
<-complete
}
//Clean up after yourself, to keep away the memory leaks
close(c1)
close(c2)
close(c3)
close(d)
//Verify bar is done and closed correctly
<-complete
close(complete)
}
func foo(c chan uint, d chan uint) {
fmt.Println("foo start")
for stuff := range c {
time.Sleep(1) //Not needed for the program to function
d <- stuff * 2
}
fmt.Println("foo end")
}
func bar(d chan uint, cmp chan bool) {
fmt.Println("bar start")
for stuff := range d {
fmt.Printf("bar received %d
", stuff)
cmp <- true
}
fmt.Println("bar end")
//verify that cmp can be closed (all output is done, and d is closed)
cmp <- true
}