I have a list of strings
elems := [n]string{...}
I want to perform a simple mapReduce operation, such that I
Map every string to a different string, let's say string -> $string
Reduce all the strings to one string with a separator, e.g. {s1, s2, s3} -> s1@s2@s3
all in all: {s1, s2, s3} -> $s1@$s2@$s3
What's the best way to do this?
I'm looking for efficiency and readability
Bonus points if it's generic enough to work not only on strings
For mapping just a list, you won't have much choice other than to go over each string. If the transform algo is time-consuming and you need speed, you can consider splitting the job and use a go routine. Finally you can use the strings.Join function which has an option to specify a separator, this normally performs the reduce part efficiently. The size of the dataset can also be a consideration, and for larger sized lists you may want to compare performance with strings.Join and your own customized algo and see if you want to use multiple go routines/channels to achieve what you want to.
If you don't need to do the 2 things separately, the end result can be achieved simply by using strings.Join()
:
package main
import (
"fmt"
"strings"
)
func main() {
a := []string{"a", "b", "c"}
p := "$"
fmt.Println(p + strings.Join(a[:], "@"+p))
}
prints $a@$b@$c
Go is explicitly NOT a functional programming language.
You map and reduce using a for loop.
a := []string{"a", "b", "c"}
result := "initvalue"
for n, i := range a {
result += i + string(n)
}
If you are not going to perform any sort of IO operations inside your map functions (means they are doing just some computations), making it concurrent would make it slower for sure and even if you are doing some IO, you should benchmark. Concurrency would not make things faster necessarily and some times add unnecessary complications. In many cases just a simple for loop is sufficient.
If the map functions here are IO bound or are doing some sort of computation heavy calculations that do benefit from going concurrent, solutions can vary. For example NATS can be used to go beyond one machine and distribute the workload.
This is a relatively simple sample. Reduce phase is not multistage and is blocking:
import (
"fmt"
"strings"
"sync"
"testing"
"github.com/stretchr/testify/assert"
)
type elem struct {
index int
value interface{}
}
func feed(elems []interface{}) <-chan elem {
result := make(chan elem)
go func() {
for k, v := range elems {
e := elem{
index: k,
value: v,
}
result <- e
}
close(result)
}()
return result
}
func mapf(
input <-chan elem,
mapFunc func(elem) elem) <-chan elem {
result := make(chan elem)
go func() {
for e := range input {
eres := mapFunc(e)
result <- eres
}
close(result)
}()
return result
}
// is blocking
func reducef(
input <-chan elem,
reduceFunc func([]interface{}) interface{}) interface{} {
buffer := make(map[int]interface{})
l := 0
for v := range input {
buffer[v.index] = v.value
if v.index > l {
l = v.index
}
}
data := make([]interface{}, l+1)
for k, v := range buffer {
data[k] = v
}
return reduceFunc(data)
}
func fanOutIn(
elemFeed <-chan elem,
mapFunc func(elem) elem, mapCount int,
reduceFunc func([]interface{}) interface{}) interface{} {
MR := make(chan elem)
wg := &sync.WaitGroup{}
for i := 0; i < mapCount; i++ {
mapResult := mapf(elemFeed, mapFunc)
wg.Add(1)
go func() {
defer wg.Done()
for v := range mapResult {
MR <- v
}
}()
}
go func() {
wg.Wait()
close(MR)
}()
return reducef(MR, reduceFunc)
}
func Test01(t *testing.T) {
elemFeed := feed([]interface{}{1, 2, 3})
finalResult := fanOutIn(
elemFeed,
func(e elem) elem {
return elem{
index: e.index,
value: fmt.Sprintf("[%v]", e.value),
}
},
3,
func(sl []interface{}) interface{} {
strRes := make([]string, len(sl))
for k, v := range sl {
strRes[k] = v.(string)
}
return strings.Join(strRes, ":")
})
assert.Equal(t, "[1]:[2]:[3]", finalResult)
}
And since it uses interface{}
as the element type, it can get generalized.