I'm doing some stream processing in Go and got stuck trying to figure out how to do this the "Go way" without locks.
This contrived example shows the problem I'm facing.
thing
at a time.things
.things
becomes full len(things) == 100
then it is processed somehow and resetn
number of concurrent goroutines that need to access things
before it's fullthings
from other goroutines is not predictable.doSomethingWithPartial
nor doSomethingWithComplete
needs to mutate things
Code:
var m sync.Mutex
var count int64
things := make([]int64, 0, 100)
// slices of data are constantly being generated and used
go func() {
for {
m.Lock()
if len(things) == 100 {
// doSomethingWithComplete does not modify things
doSomethingWithComplete(things)
things = make([]int64, 0, 100)
}
things = append(things, count)
m.Unlock()
count++
}
}()
// doSomethingWithPartial needs to access the things before they're ready
for {
m.Lock()
// doSomethingWithPartial does not modify things
doSomethingWithPartial(things)
m.Unlock()
}
I know that slices are immutable so does that mean I can remove the mutex and expect it to still work (I assume no).
How can I refactor this to use channels instead of a mutex.
Edit: Here's the solution I came up with that does not use a mutex
package main
import (
"fmt"
"sync"
"time"
)
func Incrementor() chan int {
ch := make(chan int)
go func() {
count := 0
for {
ch <- count
count++
}
}()
return ch
}
type Foo struct {
things []int
requests chan chan []int
stream chan int
C chan []int
}
func NewFoo() *Foo {
foo := &Foo{
things: make([]int, 0, 100),
requests: make(chan chan []int),
stream: Incrementor(),
C: make(chan []int),
}
go foo.Launch()
return foo
}
func (f *Foo) Launch() {
for {
select {
case ch := <-f.requests:
ch <- f.things
case thing := <-f.stream:
if len(f.things) == 100 {
f.C <- f.things
f.things = make([]int, 0, 100)
}
f.things = append(f.things, thing)
}
}
}
func (f *Foo) Things() []int {
ch := make(chan []int)
f.requests <- ch
return <-ch
}
func main() {
foo := NewFoo()
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go func(i int) {
time.Sleep(time.Millisecond * time.Duration(i) * 100)
things := foo.Things()
fmt.Println("got things:", len(things))
wg.Done()
}(i)
}
go func() {
for _ = range foo.C {
// do something with things
}
}()
wg.Wait()
}
It should be noted that the "Go way" is probably just to use a mutex for this. It's fun to work out how to do it with a channel but a mutex is probably simpler and easier to reason about for this particular problem.