发送到频道时避免比赛条件?

go version go1.11.2 darwin/amd64

I have the following code sample, created for SO demo purposes:

package main

import (
    ...
)

type T struct {
    ctx context.Context
    ch1 chan string
}

func New(ctx context.Context) *T {
    t := &T{ctx: ctx}
    go t.run(2)
    return t

}

func (t *T) run(workers int) {
    t.ch1 = make(chan string)
    done := make(chan struct{})

    go func() {
        <-t.ctx.Done()
        close(done)
        close(t.ch1)
    }()

    for i := 0; i < workers; i++ {
        go func() {
            for {
                select {
                case <-done:
                    return
                case m, ok := <-t.ch1:
                    if ok {
                        t.process(done, m)
                    }
                }
            }
        }()
    }
}

func (t *T) process(done <-chan struct{}, s string) {
    select {
    case <-done:
        return
    default:
        log.Printf("processing %s", s)
        time.Sleep(time.Millisecond * 200)
    }
}

func (t *T) Read() <-chan string {
    return t.ch1
}

func (t *T) Write(s string) error {
    select {
    case <-t.ctx.Done():
        return errors.New("consumer is closed today")
    case t.ch1 <- s:
        return nil
    }
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    t := New(ctx)

    go func() {
        for m := range t.Read() {
            log.Printf("got %s", m)
        }
        <-ctx.Done()
    }()

    for i := 0; i < 10; i++ {
        t.Write(strconv.Itoa(i))
    }
    cancel()
}

When I build and run this with the race detector, it throws the following data race:

go build -race ./test/ && ./test
==================
WARNING: DATA RACE
Read at 0x00c0000b6030 by goroutine 7:
  main.main.func1()
      /redacted/test/app.go:60 +0x42

Previous write at 0x00c0000b6030 by goroutine 6:
  main.(*T).run()
      /redacted/test/app.go:24 +0x6a

Goroutine 7 (running) created at:
  main.main()
      /redacted/test/app.go:76 +0xbc

Goroutine 6 (running) created at:
  main.New()
      /redacted/test/app.go:18 +0xcd
  main.main()
      /redacted/test/app.go:74 +0x86
==================
==================
WARNING: DATA RACE
Read at 0x00c0000b6030 by main goroutine:
  main.(*T).Write()
      /redacted/test/app.go:67 +0x8a
  main.main()
      /redacted/test/app.go:84 +0xdc

Previous write at 0x00c0000b6030 by goroutine 6:
  main.(*T).run()
      /redacted/test/app.go:24 +0x6a

Goroutine 6 (running) created at:
  main.New()
      /redacted/test/app.go:18 +0xcd
  main.main()
      /redacted/test/app.go:74 +0x86
==================
2019/01/20 10:48:51 got 0
2019/01/20 10:48:51 got 3
2019/01/20 10:48:51 processing 1
2019/01/20 10:48:51 processing 2
2019/01/20 10:48:51 got 4
2019/01/20 10:48:51 got 5
2019/01/20 10:48:51 got 6
2019/01/20 10:48:51 got 7
2019/01/20 10:48:51 got 8
2019/01/20 10:48:51 got 9
Found 2 data race(s)

The problem I'm getting is that I can't seem to find a way to have a user input something in to a channel, without exposing any channel for writes, without a race. How would this be possible? Is there a better pattern for this that I am missing?

I suggest making the following changes:

  • assign to ch1 in New to avoid the race of reading and writing to t.ch1 in multiple goroutines
  • only close ch1 once all the calls to Write have finished to avoid "send on closed channel" panics
  • use a sync.WaitGroup to wait for all the processing goroutines to finish after writing all the values (so that the program doesn't exit before processing has finished)

Bringing these changes together, here's how it would look:

package main

import (
    "log"
    "strconv"
    "sync"
    "time"
)

type T struct {
    // ch1 receives the values to process
    ch1 chan string

    // wg is used to wait for the workers to stop
    wg sync.WaitGroup
}

func New() *T {
    t := &T{
        ch1: make(chan string),
    }
    go t.run(2)
    return t
}

func (t *T) run(workers int) {
    // add the workers to the WaitGroup
    t.wg.Add(workers)

    for i := 0; i < workers; i++ {
        go func() {
            // process values from the channel until it closes
            // and then signal to the WaitGroup that we're done
            defer t.wg.Done()
            for m := range t.ch1 {
                t.process(m)
            }
        }()
    }
}

// Stop is called after we're done calling Write and we want to stop the
// processing once all values have been processed
func (t *T) Stop() {
    // close t.ch1 so that the workers know to stop processing
    close(t.ch1)

    // wait for the workers to all finish before returning
    t.wg.Wait()
}

func (t *T) process(s string) {
    log.Printf("processing %s", s)
    time.Sleep(time.Millisecond * 200)
}

func (t *T) Write(s string) {
    t.ch1 <- s
}

func main() {
    // start the main loop
    t := New()

    // write 10 values
    for i := 0; i < 10; i++ {
        t.Write(strconv.Itoa(i))
    }

    // stop the loop, which will wait for processing to finish before returning
    t.Stop()
}