I am trying write a sequential processing pipeline in Go using a simple fake example. It traverse a fake directory and run some conversion. So there is a string channel that is shared between them. After a function writes data, the 2nd function reads it.
Seems to me that it only works and works sequentially too, when i place a go keyword in front of WalkFakeDirectory function as shown in code example below (playground).
Would really appreciate if anyone can explain how this works?
package main
import (
"fmt"
"strings"
"sync"
"time"
)
func main() {
done := make(chan int)
path := make(chan string)
defer close(done)
//var wg sync.WaitGroup - Not working too
//wg.Add(1)
fmt.Println("walking file path")
go WalkFakeDirectoy(done, path)
//wg.Add(1)
ConvertToUpperCase(path, done)
//wg.Wait()
fmt.Println("done!")
//time.Sleep(2000) // Not working
}
func ConvertToUpperCase(files chan string, done chan int) {
for file := range files {
fmt.Println("converting data data", strings.ToUpper(file))
}
}
func WalkFakeDirectoy(done chan int, path chan<- string) {
func() {
list := []string{"abc", "def", "fgh", "ijk", "mnn"}
for _, file := range list {
fmt.Println("getting data", file)
path <- file
time.Sleep(3000)
}
}()
}
This Go blog post about pipelines should have enough information to build your own. The gist of it is a code sample:
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
func sample_pipeline() {
// Set up the pipeline.
c := gen(2, 3)
out := sq(c)
out2 := sq(out)
// Consume the output.
for o := range out2 {
fmt.Println(o)
}
}
func main() {
sample_pipeline()
}
sq
is a pipeline stage - it takes a channel with inputs and returns a channel with outputs (squared values of inputs). sample_pipeline
sets up a two-stage pipeline and connects a generator with two values to it.
Note importantly how the finishing is done - each pipeline stage is a goroutine that does the pipeline stage work (wait for new data from input pipe, process it, send output). When the input pipe of each stage is done (range loop on channel stops) it closes its own channel. Closing a channel is the canonical way of signaling "this channel is done" in Go.