进行并发切片访问

I'm doing some stream processing in Go and got stuck trying to figure out how to do this the "Go way" without locks.

This contrived example shows the problem I'm facing.

  • We get one thing at a time.
  • There is a goroutine which buffers them into a slice called things.
  • When things becomes full len(things) == 100 then it is processed somehow and reset
  • There are n number of concurrent goroutines that need to access things before it's full
  • Access to the "incomplete" things from other goroutines is not predictable.
  • Neither doSomethingWithPartial nor doSomethingWithComplete needs to mutate things

Code:

var m sync.Mutex
var count int64
things := make([]int64, 0, 100)

// slices of data are constantly being generated and used
go func() {
  for {
    m.Lock()
    if len(things) == 100 {
      // doSomethingWithComplete does not modify things
      doSomethingWithComplete(things)
      things = make([]int64, 0, 100)
    }
    things = append(things, count)
    m.Unlock()
    count++
  }
}()

// doSomethingWithPartial needs to access the things before they're ready
for {
  m.Lock()
  // doSomethingWithPartial does not modify things
  doSomethingWithPartial(things)
  m.Unlock()
}
  1. I know that slices are immutable so does that mean I can remove the mutex and expect it to still work (I assume no).

  2. How can I refactor this to use channels instead of a mutex.

Edit: Here's the solution I came up with that does not use a mutex

package main

import (
    "fmt"
    "sync"
    "time"
)

func Incrementor() chan int {
    ch := make(chan int)
    go func() {
        count := 0
        for {
            ch <- count
            count++
        }
    }()
    return ch
}

type Foo struct {
    things   []int
    requests chan chan []int
    stream   chan int
    C        chan []int
}

func NewFoo() *Foo {
    foo := &Foo{
        things:   make([]int, 0, 100),
        requests: make(chan chan []int),
        stream:   Incrementor(),
        C:        make(chan []int),
    }
    go foo.Launch()
    return foo
}

func (f *Foo) Launch() {
    for {
        select {
        case ch := <-f.requests:
            ch <- f.things
        case thing := <-f.stream:
            if len(f.things) == 100 {
                f.C <- f.things
                f.things = make([]int, 0, 100)
            }
            f.things = append(f.things, thing)
        }
    }
}

func (f *Foo) Things() []int {
    ch := make(chan []int)
    f.requests <- ch
    return <-ch
}

func main() {

    foo := NewFoo()

    var wg sync.WaitGroup
    wg.Add(10)

    for i := 0; i < 10; i++ {
        go func(i int) {
            time.Sleep(time.Millisecond * time.Duration(i) * 100)
            things := foo.Things()
            fmt.Println("got things:", len(things))
            wg.Done()
        }(i)
    }

    go func() {
        for _ = range foo.C {
            // do something with things
        }
    }()

    wg.Wait()
}

It should be noted that the "Go way" is probably just to use a mutex for this. It's fun to work out how to do it with a channel but a mutex is probably simpler and easier to reason about for this particular problem.