“扇入”-一种“扇出”行为

Say, we have three methods to implement "fan in" behavior

func MakeChannel(tries int) chan int {
    ch := make(chan int)

    go func() {
        for i := 0; i < tries; i++ {
            ch <- i
        }
        close(ch)
    }()

    return ch
}

func MergeByReflection(channels ...chan int) chan int {
    length := len(channels)
    out := make(chan int)
    cases := make([]reflect.SelectCase, length)
    for i, ch := range channels {
        cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
    }
    go func() {
        for length > 0 {
            i, line, opened := reflect.Select(cases)
            if !opened {
                cases[i].Chan = reflect.ValueOf(nil)
                length -= 1
            } else {
                out <- int(line.Int())
            }
        }
        close(out)
    }()
    return out
}

func MergeByCode(channels ...chan int) chan int {
    length := len(channels)
    out := make(chan int)
    go func() {
        var i int
        var ok bool

        for length > 0 {
            select {
            case i, ok = <-channels[0]:
                out <- i
                if !ok {
                    channels[0] = nil
                    length -= 1
                }
            case i, ok = <-channels[1]:
                out <- i
                if !ok {
                    channels[1] = nil
                    length -= 1
                }
            case i, ok = <-channels[2]:
                out <- i
                if !ok {
                    channels[2] = nil
                    length -= 1
                }
            case i, ok = <-channels[3]:
                out <- i
                if !ok {
                    channels[3] = nil
                    length -= 1
                }
            case i, ok = <-channels[4]:
                out <- i
                if !ok {
                    channels[4] = nil
                    length -= 1
                }
            }
        }
        close(out)
    }()
    return out
}

func MergeByGoRoutines(channels ...chan int) chan int {
    var group sync.WaitGroup

    out := make(chan int)
    for _, ch := range channels {
        go func(ch chan int) {
            for i := range ch {
                out <- i
            }
            group.Done()
        }(ch)
    }
    group.Add(len(channels))
    go func() {
        group.Wait()
        close(out)
    }()
    return out
}

type MergeFn func(...chan int) chan int

func main() {
    length := 5
    tries := 1000000
    channels := make([]chan int, length)
    fns := []MergeFn{MergeByReflection, MergeByCode, MergeByGoRoutines}

    for _, fn := range fns {
        sum := 0
        t := time.Now()
        for i := 0; i < length; i++ {
            channels[i] = MakeChannel(tries)
        }
        for i := range fn(channels...) {
            sum += i
        }
        fmt.Println(time.Since(t))
        fmt.Println(sum)
    }
}

Results are (at 1 CPU, I have used runtime.GOMAXPROCS(1)):
19.869s (MergeByReflection)
2499997500000
8.483s (MergeByCode)
2499997500000
4.977s (MergeByGoRoutines)
2499997500000

Results are (at 2 CPU, I have used runtime.GOMAXPROCS(2)):
44.94s
2499997500000
10.853s
2499997500000
3.728s
2499997500000

  • I understand the reason why MergeByReflection is slowest, but what is about the difference between MergeByCode and MergeByGoRoutines?
  • And when we increase the CPU number why "select" clause (used MergeByReflection directly and in MergeByCode indirectly) becomes slower?

Here is a preliminary remark. The channels in your examples are all unbuffered, meaning they will likely block at put or get time.

In this example, there is almost no processing except channel management. The performance is therefore dominated by synchronization primitives. Actually, there is very little of this code that can be parallelized.

In the MergeByReflection and MergeByCode functions, select is used to listen to multiple input channels, but nothing is done to take in account the output channel (which may therefore block, while some event could be available on one of the input channels).

In the MergeByGoRoutines function, this situation cannot happen: when the output channel blocks, it does not prevent an other input channel to be read by another goroutine. There are therefore better opportunities for the runtime to parallelize the goroutines, and less contention on the input channels.

The MergeByReflection code is the slowest because it has the overhead of reflection, and almost nothing can be parallelized.

The MergeByGoRoutines function is the fastest because it reduces the contention (less synchronization is needed), and because output contention has a lesser impact on the input performance. It can therefore benefit of a small improvement when running with multiple cores (contrary to the two other methods).

There is so much synchronization activity with MergeByReflection and MergeByCode, that running on multiple cores negatively impacts the performance. You could have different performance by using buffered channels though.