Golang:关闭具有周期性依赖性的渠道

I'm trying to implement a mapreduce-like method in Golang. My design is as follows:

  • Map workers pull items off a mapper input channel and output to a mapper output channel

  • The mapper output channel is then read by a single goroutine. This routine maintains a map of previously-seen key-value pairs. If the next item from the mapper output has a matching key, it sends both the new and old values with matching keys to a reduce-input channel.

  • The reduce-input pipeline reduces two values to one key-value pair, and submits the result to the same map-output channel.

This leads to a circular dependency between the mapper output and the reduce input, and I now do not know how to signal that the mapper output is complete (and close the channel).

What is the best way of breaking this cyclic dependency or knowing when to close channels with such cyclical behavior?

The code below has a deadlock with the map output channel and the reduce input channel waiting on each other.

type MapFn func(input int) (int, int)
type ReduceFn func(a int, b int) int

type kvPair struct {
    k int
    v int
}

type reducePair struct {
    k  int
    v1 int
    v2 int
}

func MapReduce(mapFn MapFn, reduceFn ReduceFn, input []int, nMappers int, nReducers int) (map[int]int, error) {
    inputMapChan := make(chan int, len(input))
    outputMapChan := make(chan *kvPair, len(input))
    reduceInputChan := make(chan *reducePair)
    outputMapMap := make(map[int]int)
    go func() {
        for v := range input {
            inputMapChan <- v
        }
        close(inputMapChan)
    }()
    for i := 0; i < nMappers; i++ {
        go func() {
            for v := range inputMapChan {
                k, v := mapFn(v)
                outputMapChan <- &kvPair{k, v}
            }
        }()
    }
    for i := 0; i < nReducers; i++ {
        go func() {
            for v := range reduceInputChan {
                reduceValue := reduceFn(v.v1, v.v2)
                outputMapChan <- &kvPair{v.k, reduceValue}
            }
        }()
    }
    for v := range outputMapChan {
        key := v.k
        value := v.v
        other, ok := outputMapMap[key]
        if ok {
            delete(outputMapMap, key)
            reduceInputChan <- &reducePair{key, value, other}
        } else {
            outputMapMap[key] = value
        }
    }
    return outputMapMap, nil
}

Try this:

package main

import "fmt"
import "sync"
import "sync/atomic"
import "runtime"
import "math/rand"
import "time"

type MapFn func(input int) *kvPair
type ReduceFn func(a int, b int) int

type kvPair struct {
    k int
    v int
}

type reducePair struct {
    k  int
    v1 int
    v2 int
}

func MapReduce(mapFn MapFn, reduceFn ReduceFn, input []int, nMappers int, nReducers int) (map[int]int, error) {
    inputMapChan := make(chan int, len(input))
    outputMapChan := make(chan *kvPair, len(input))
    reduceInputChan := make(chan *reducePair)
    outputMapMap := make(map[int]int)

    wg := sync.WaitGroup{}
    wg.Add(1)
    go func() {
        defer wg.Done()
        for _, v := range input {
            inputMapChan <- v
        }
        close(inputMapChan)
    }()

    for i := 0; i < nMappers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for v := range inputMapChan {
                outputMapChan <- mapFn(v)
            }
        }()
    }

    finished := false
    go func() {
        wg.Wait()
        finished = true
    }()

    var count int64
    for i := 0; i < nReducers; i++ {
        go func() {
            for v := range reduceInputChan {
                reduceValue := reduceFn(v.v1, v.v2)
                outputMapChan <- &kvPair{v.k, reduceValue}
                atomic.AddInt64(&count, -1)
            }
        }()
    }

    wg2 := sync.WaitGroup{}
    wg2.Add(1)
    go func() {
        defer wg2.Done()
        for {
            select {
            default:
                if finished && atomic.LoadInt64(&count) == 0 && len(outputMapChan) == 0 {
                    return
                }
                //runtime.Gosched()
            case v := <-outputMapChan:
                key := v.k
                value := v.v
                if other, ok := outputMapMap[key]; ok {
                    delete(outputMapMap, key)
                    atomic.AddInt64(&count, 1)
                    reduceInputChan <- &reducePair{key, value, other}
                } else {
                    outputMapMap[key] = value
                }
            }
        }
    }()

    wg2.Wait()
    return outputMapMap, nil
}

func main() {
    fmt.Println("NumCPU =", runtime.NumCPU())
    t := time.Now()
    a := rand.Perm(1000000)
    //a = []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 12, 13, 1, 16, 2}
    m, err := MapReduce(mp, rdc, a, 2, 2)
    if err != nil {
        panic(err)
    }
    fmt.Println(time.Since(t)) //883ms
    fmt.Println(m)
    fmt.Println("done.")
}

func mp(input int) *kvPair {
    return &kvPair{input & 7, input >> 3}
}
func rdc(a int, b int) int {
    b <<= 3
    if a != 0 {
        b |= a
    }
    return b
}