go version go1.11.2 darwin/amd64
I have the following code sample, created for SO demo purposes:
package main
import (
...
)
type T struct {
ctx context.Context
ch1 chan string
}
func New(ctx context.Context) *T {
t := &T{ctx: ctx}
go t.run(2)
return t
}
func (t *T) run(workers int) {
t.ch1 = make(chan string)
done := make(chan struct{})
go func() {
<-t.ctx.Done()
close(done)
close(t.ch1)
}()
for i := 0; i < workers; i++ {
go func() {
for {
select {
case <-done:
return
case m, ok := <-t.ch1:
if ok {
t.process(done, m)
}
}
}
}()
}
}
func (t *T) process(done <-chan struct{}, s string) {
select {
case <-done:
return
default:
log.Printf("processing %s", s)
time.Sleep(time.Millisecond * 200)
}
}
func (t *T) Read() <-chan string {
return t.ch1
}
func (t *T) Write(s string) error {
select {
case <-t.ctx.Done():
return errors.New("consumer is closed today")
case t.ch1 <- s:
return nil
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
t := New(ctx)
go func() {
for m := range t.Read() {
log.Printf("got %s", m)
}
<-ctx.Done()
}()
for i := 0; i < 10; i++ {
t.Write(strconv.Itoa(i))
}
cancel()
}
When I build and run this with the race detector, it throws the following data race:
go build -race ./test/ && ./test
==================
WARNING: DATA RACE
Read at 0x00c0000b6030 by goroutine 7:
main.main.func1()
/redacted/test/app.go:60 +0x42
Previous write at 0x00c0000b6030 by goroutine 6:
main.(*T).run()
/redacted/test/app.go:24 +0x6a
Goroutine 7 (running) created at:
main.main()
/redacted/test/app.go:76 +0xbc
Goroutine 6 (running) created at:
main.New()
/redacted/test/app.go:18 +0xcd
main.main()
/redacted/test/app.go:74 +0x86
==================
==================
WARNING: DATA RACE
Read at 0x00c0000b6030 by main goroutine:
main.(*T).Write()
/redacted/test/app.go:67 +0x8a
main.main()
/redacted/test/app.go:84 +0xdc
Previous write at 0x00c0000b6030 by goroutine 6:
main.(*T).run()
/redacted/test/app.go:24 +0x6a
Goroutine 6 (running) created at:
main.New()
/redacted/test/app.go:18 +0xcd
main.main()
/redacted/test/app.go:74 +0x86
==================
2019/01/20 10:48:51 got 0
2019/01/20 10:48:51 got 3
2019/01/20 10:48:51 processing 1
2019/01/20 10:48:51 processing 2
2019/01/20 10:48:51 got 4
2019/01/20 10:48:51 got 5
2019/01/20 10:48:51 got 6
2019/01/20 10:48:51 got 7
2019/01/20 10:48:51 got 8
2019/01/20 10:48:51 got 9
Found 2 data race(s)
The problem I'm getting is that I can't seem to find a way to have a user input something in to a channel, without exposing any channel for writes, without a race. How would this be possible? Is there a better pattern for this that I am missing?
I suggest making the following changes:
ch1
in New
to avoid the race of reading and writing to t.ch1
in multiple goroutinesch1
once all the calls to Write
have finished to avoid "send on closed channel" panicssync.WaitGroup
to wait for all the processing goroutines to finish after writing all the values (so that the program doesn't exit before processing has finished)Bringing these changes together, here's how it would look:
package main
import (
"log"
"strconv"
"sync"
"time"
)
type T struct {
// ch1 receives the values to process
ch1 chan string
// wg is used to wait for the workers to stop
wg sync.WaitGroup
}
func New() *T {
t := &T{
ch1: make(chan string),
}
go t.run(2)
return t
}
func (t *T) run(workers int) {
// add the workers to the WaitGroup
t.wg.Add(workers)
for i := 0; i < workers; i++ {
go func() {
// process values from the channel until it closes
// and then signal to the WaitGroup that we're done
defer t.wg.Done()
for m := range t.ch1 {
t.process(m)
}
}()
}
}
// Stop is called after we're done calling Write and we want to stop the
// processing once all values have been processed
func (t *T) Stop() {
// close t.ch1 so that the workers know to stop processing
close(t.ch1)
// wait for the workers to all finish before returning
t.wg.Wait()
}
func (t *T) process(s string) {
log.Printf("processing %s", s)
time.Sleep(time.Millisecond * 200)
}
func (t *T) Write(s string) {
t.ch1 <- s
}
func main() {
// start the main loop
t := New()
// write 10 values
for i := 0; i < 10; i++ {
t.Write(strconv.Itoa(i))
}
// stop the loop, which will wait for processing to finish before returning
t.Stop()
}