I'm trying to implement a mapreduce-like method in Golang. My design is as follows:
Map workers pull items off a mapper input channel and output to a mapper output channel
The mapper output channel is then read by a single goroutine. This routine maintains a map of previously-seen key-value pairs. If the next item from the mapper output has a matching key, it sends both the new and old values with matching keys to a reduce-input channel.
This leads to a circular dependency between the mapper output and the reduce input, and I now do not know how to signal that the mapper output is complete (and close the channel).
What is the best way of breaking this cyclic dependency or knowing when to close channels with such cyclical behavior?
The code below has a deadlock with the map output channel and the reduce input channel waiting on each other.
type MapFn func(input int) (int, int)
type ReduceFn func(a int, b int) int
type kvPair struct {
k int
v int
}
type reducePair struct {
k int
v1 int
v2 int
}
func MapReduce(mapFn MapFn, reduceFn ReduceFn, input []int, nMappers int, nReducers int) (map[int]int, error) {
inputMapChan := make(chan int, len(input))
outputMapChan := make(chan *kvPair, len(input))
reduceInputChan := make(chan *reducePair)
outputMapMap := make(map[int]int)
go func() {
for v := range input {
inputMapChan <- v
}
close(inputMapChan)
}()
for i := 0; i < nMappers; i++ {
go func() {
for v := range inputMapChan {
k, v := mapFn(v)
outputMapChan <- &kvPair{k, v}
}
}()
}
for i := 0; i < nReducers; i++ {
go func() {
for v := range reduceInputChan {
reduceValue := reduceFn(v.v1, v.v2)
outputMapChan <- &kvPair{v.k, reduceValue}
}
}()
}
for v := range outputMapChan {
key := v.k
value := v.v
other, ok := outputMapMap[key]
if ok {
delete(outputMapMap, key)
reduceInputChan <- &reducePair{key, value, other}
} else {
outputMapMap[key] = value
}
}
return outputMapMap, nil
}
Try this:
package main
import "fmt"
import "sync"
import "sync/atomic"
import "runtime"
import "math/rand"
import "time"
type MapFn func(input int) *kvPair
type ReduceFn func(a int, b int) int
type kvPair struct {
k int
v int
}
type reducePair struct {
k int
v1 int
v2 int
}
func MapReduce(mapFn MapFn, reduceFn ReduceFn, input []int, nMappers int, nReducers int) (map[int]int, error) {
inputMapChan := make(chan int, len(input))
outputMapChan := make(chan *kvPair, len(input))
reduceInputChan := make(chan *reducePair)
outputMapMap := make(map[int]int)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for _, v := range input {
inputMapChan <- v
}
close(inputMapChan)
}()
for i := 0; i < nMappers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for v := range inputMapChan {
outputMapChan <- mapFn(v)
}
}()
}
finished := false
go func() {
wg.Wait()
finished = true
}()
var count int64
for i := 0; i < nReducers; i++ {
go func() {
for v := range reduceInputChan {
reduceValue := reduceFn(v.v1, v.v2)
outputMapChan <- &kvPair{v.k, reduceValue}
atomic.AddInt64(&count, -1)
}
}()
}
wg2 := sync.WaitGroup{}
wg2.Add(1)
go func() {
defer wg2.Done()
for {
select {
default:
if finished && atomic.LoadInt64(&count) == 0 && len(outputMapChan) == 0 {
return
}
//runtime.Gosched()
case v := <-outputMapChan:
key := v.k
value := v.v
if other, ok := outputMapMap[key]; ok {
delete(outputMapMap, key)
atomic.AddInt64(&count, 1)
reduceInputChan <- &reducePair{key, value, other}
} else {
outputMapMap[key] = value
}
}
}
}()
wg2.Wait()
return outputMapMap, nil
}
func main() {
fmt.Println("NumCPU =", runtime.NumCPU())
t := time.Now()
a := rand.Perm(1000000)
//a = []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 12, 13, 1, 16, 2}
m, err := MapReduce(mp, rdc, a, 2, 2)
if err != nil {
panic(err)
}
fmt.Println(time.Since(t)) //883ms
fmt.Println(m)
fmt.Println("done.")
}
func mp(input int) *kvPair {
return &kvPair{input & 7, input >> 3}
}
func rdc(a int, b int) int {
b <<= 3
if a != 0 {
b |= a
}
return b
}