Let's say we want to process some computations in parallel but we have to guarantee that the ordering of the results is the same as the ordering of computations:
This can be done through for example:
https://play.golang.org/p/jQbo0EVLzvX
package main
import (
"fmt"
"time"
)
func main() {
orderPutChans := make([]chan bool, 8)
orderGetChans := make([]chan bool, 8)
doneChans := make([]chan bool, 8)
for i := 0; i < 8; i++ {
orderPutChans[i] = make(chan bool, 1)
orderGetChans[i] = make(chan bool)
doneChans[i] = make(chan bool)
}
srcCh := make(chan int)
dstCh := make(chan int)
for i := 0; i < 8; i++ {
go func(j int) {
myGetCh := orderGetChans[j]
nextGetCh := orderGetChans[(j+1) % 8]
myPutCh := orderPutChans[j]
nextPutCh := orderPutChans[(j+1) % 8]
for {
_ = <- myGetCh
v, ok := <- srcCh
if !ok {
k := (j + 1) % 8
if orderGetChans[k] != nil {
orderGetChans[k] <- true
}
orderGetChans[j] = nil
break
}
nextGetCh <- true
time.Sleep(1000)
v *= v
_ = <- myPutCh
dstCh <- v
nextPutCh <- true
}
doneChans[j] <- true
}(i)
}
go func() {
for i := 0; i < 8; i++ {
_ = <- doneChans[i]
}
close(dstCh)
}()
orderGetChans[0] <- true
orderPutChans[0] <- true
go func() {
for i := 0; i < 100; i++ {
srcCh <- i
}
close(srcCh)
}()
for vv := range dstCh {
fmt.Println(vv)
}
}
One can use channels to pass around read/write permissions for the channels. The code is messy and doesn't look very tidy. Is there a cleaner way in Go to achieve that?
Edit: I'm not asking for "simple" replacements such as using chan struct{}
or using close
on doneChans
in favor of doneChans[i] <- true
.
Edit2:
A much simpler approach (at least as far as code is concerned) would be to have a results
array and the consumer sends the data together with an index (which is going to be mod number of workers) and goroutines write the result to results[j]
and then have a WaitGroup to wait until all are done (with one batch of many batches) and then iterate through the results and send them to the destination channel. (Maybe not so good due to false sharing?)
If I understand correctly this is the version of your code that uses a "pipeline" style. Where there are a number of steps in the pipeline:
Here is the code, it uses the Indexed pair style that you mention in the edits to your original question.
type idxPair struct {
idx, val int
}
func main() {
// add a done channel, an ability to stop the world by closing this.
done := make(chan struct{})
defer close(done)
// create srcChan, this will be where the values go into the pipeline
srcCh := make(chan idxPair)
// create a slice of result channels, one for each of the go workers
const numWorkers = 8
resChans := make([]<-chan idxPair, numWorkers)
// waitgroup to wait for all the workers to stop
var wg sync.WaitGroup
wg.Add(numWorkers)
// start the workers, passing them each the src channel,
// collecting the result channels they return
for i := 0; i < numWorkers; i++ {
resChans[i] = worker(done, &wg, srcCh)
}
// start a single goroutine to send values into the pipeline
// all values are sent with an index, to be pieces back into order at the end.
go func() {
defer close(srcCh)
for i := 1; i < 100; i++ {
srcCh <- idxPair{idx: i, val: i}
}
}()
// merge all the results channels into a single results channel
// this channel is unordered.
mergedCh := merge(done, resChans...)
// order the values coming from the mergedCh according the the idxPair.idx field.
orderedResults := order(100, mergedCh)
// iterate over each of the ordered results
for _, v := range orderedResults {
fmt.Println(v)
}
}
func order(len int, res <-chan idxPair) []int {
results := make([]int, len)
// collect all the values to order them
for r := range res {
results[r.idx] = r.val
}
return results
}
func worker(done <- chan struct{}, wg *sync.WaitGroup, src <-chan idxPair) <-chan idxPair {
res := make(chan idxPair)
go func() {
defer wg.Done()
defer close(res)
sendValue := func(pair idxPair) {
v := pair.val
v *= v
ip := idxPair{idx: pair.idx, val: v}
select {
case res <- ip:
case <-done:
}
}
for v := range src{
sendValue(v)
}
}()
return res
}
// example and explanation here: https://blog.golang.org/pipelines
func merge(done <-chan struct{}, cs ...<-chan idxPair) <-chan idxPair {
var wg sync.WaitGroup
out := make(chan idxPair)
output := func(c <-chan idxPair) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-done:
return
}
}
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
The reason that I think this is slightly cleaner and not just "different for the sake of it", is because:
order
stage can be easily optimised to send values through a channel when they are received etc.