序列化goroutine(并行但保证顺序)

Let's say we want to process some computations in parallel but we have to guarantee that the ordering of the results is the same as the ordering of computations:

This can be done through for example:

https://play.golang.org/p/jQbo0EVLzvX

package main

import (
    "fmt"
    "time"
)

func main() {
    orderPutChans := make([]chan bool, 8)
    orderGetChans := make([]chan bool, 8)
    doneChans := make([]chan bool, 8)

    for i := 0; i < 8; i++ {
        orderPutChans[i] = make(chan bool, 1)
        orderGetChans[i] = make(chan bool)
        doneChans[i] = make(chan bool)
    }

    srcCh := make(chan int)
    dstCh := make(chan int)

    for i := 0; i < 8; i++ {
        go func(j int) {
            myGetCh := orderGetChans[j]
            nextGetCh := orderGetChans[(j+1) % 8]
            myPutCh := orderPutChans[j]
            nextPutCh := orderPutChans[(j+1) % 8]

            for {
                _ = <- myGetCh

                v, ok := <- srcCh

                if !ok {
                    k := (j + 1) % 8
                    if orderGetChans[k] != nil {
                            orderGetChans[k] <- true
                    }
                    orderGetChans[j] = nil

                    break
                }

                nextGetCh <- true

                time.Sleep(1000)

                v *= v

                _ = <- myPutCh

                dstCh <- v

                nextPutCh <- true
            }

            doneChans[j] <- true
        }(i)
    }

    go func() {
        for i := 0; i < 8; i++ {
            _ = <- doneChans[i]
        }
        close(dstCh)
    }()

    orderGetChans[0] <- true
    orderPutChans[0] <- true

    go func() {
        for i := 0; i < 100; i++ {
            srcCh <- i
        }
        close(srcCh)
    }()

    for vv := range dstCh {
        fmt.Println(vv)
    }
}

One can use channels to pass around read/write permissions for the channels. The code is messy and doesn't look very tidy. Is there a cleaner way in Go to achieve that?

Edit: I'm not asking for "simple" replacements such as using chan struct{} or using close on doneChans in favor of doneChans[i] <- true.

Edit2:

A much simpler approach (at least as far as code is concerned) would be to have a results array and the consumer sends the data together with an index (which is going to be mod number of workers) and goroutines write the result to results[j] and then have a WaitGroup to wait until all are done (with one batch of many batches) and then iterate through the results and send them to the destination channel. (Maybe not so good due to false sharing?)

If I understand correctly this is the version of your code that uses a "pipeline" style. Where there are a number of steps in the pipeline:

  1. Sending the src values
  2. Workers that do work in the received src values, sending to their own results channel
  3. Merging the slice of results channels from the workers into a single unordered channel
  4. Ordering the unordered values from the unordered merged channel

Here is the code, it uses the Indexed pair style that you mention in the edits to your original question.

type idxPair struct {
    idx, val int
}

func main() {
    // add a done channel, an ability to stop the world by closing this.
    done := make(chan struct{})
    defer close(done)

    // create srcChan, this will be where the values go into the pipeline
    srcCh := make(chan idxPair)

    // create a slice of result channels, one for each of the go workers
    const numWorkers = 8
    resChans := make([]<-chan idxPair, numWorkers)

    // waitgroup to wait for all the workers to stop
    var wg sync.WaitGroup
    wg.Add(numWorkers)

    // start the workers, passing them each the src channel,
    // collecting the result channels they return
    for i := 0; i < numWorkers; i++ {
        resChans[i]  = worker(done, &wg, srcCh)
    }

    // start a single goroutine to send values into the pipeline
    // all values are sent with an index, to be pieces back into order at the end.
    go func() {
        defer close(srcCh)
        for i := 1; i < 100; i++ {
            srcCh <- idxPair{idx: i, val: i}
        }
    }()

    // merge all the results channels into a single results channel
    // this channel is unordered.
    mergedCh := merge(done, resChans...)

    // order the values coming from the mergedCh according the the idxPair.idx field.
    orderedResults := order(100, mergedCh)

    // iterate over each of the ordered results
    for _, v := range orderedResults {
        fmt.Println(v)
    }
}

func order(len int, res <-chan idxPair) []int {
    results := make([]int, len)

    // collect all the values to order them
    for r := range res {
        results[r.idx] = r.val
    }

    return results
}

func worker(done <- chan struct{}, wg *sync.WaitGroup, src <-chan idxPair) <-chan idxPair {
    res := make(chan idxPair)

    go func() {
        defer wg.Done()
        defer close(res)
        sendValue := func(pair idxPair) {
            v := pair.val
            v *= v
            ip := idxPair{idx: pair.idx, val: v}
            select {
            case res <- ip:
            case <-done:
            }
        }

        for v := range src{
             sendValue(v)
        }
    }()

    return res
}


// example and explanation here: https://blog.golang.org/pipelines
func merge(done <-chan struct{}, cs ...<-chan idxPair) <-chan idxPair {
    var wg sync.WaitGroup
    out := make(chan idxPair)

    output := func(c <-chan idxPair) {
        defer wg.Done()
        for n := range c {
            select {
            case out <- n:
            case <-done:
                return
            }
        }
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

The reason that I think this is slightly cleaner and not just "different for the sake of it", is because:

  1. You can model and implement each of the stages independently. The order stage can be easily optimised to send values through a channel when they are received etc.
  2. It's much more composable; instead of one large method that operates on a number of channels stored in arrays, you can do async work on elements and leave the ordering as something elses resposibility. This promotes reuse.