种族暂停了一组goroutines

I have a bunch of goroutines doing something in a loop. I want to be able to pause all of them, run some arbitrary code, then resume them. The way I attempted to do this is probably not idiomatic (and I'd appreciate a better solution), but I can't understand why it doesn't work.

Stripped down to the essentials (driver code at the bottom):

type looper struct {
    pause  chan struct{}
    paused sync.WaitGroup
    resume chan struct{}
}

func (l *looper) loop() {
    for {
        select {
        case <-l.pause:
            l.paused.Done()
            <-l.resume
        default:
            dostuff()
        }
    }
}

func (l *looper) whilePaused(fn func()) {
    l.paused.Add(32)
    l.resume = make(chan struct{})
    close(l.pause)
    l.paused.Wait()
    fn()
    l.pause = make(chan struct{})
    close(l.resume)
}

I spin up 32 goroutines all running loop(), then call whilePaused 100 times in a row, and everything seems to work… but if I run it with -race, it tells me that there's a race on l.resume between writing it in whilePaused (l.resume = make(chan struct{})) and reading it in loop (<-l.resume).

I don't understand why this happens. According to The Go Memory Model, that close(l.pause) should happen before the <-l.pause in every loop goroutine. This should mean the make(chan struct{}) value is visible as the value of l.resume in all of those loop goroutines, in the same way the string "hello world" is visible as the value of a in the f goroutine in the docs example.


Some additional information that might be relevant:

  • If I replace l.resume with an unsafe.Pointer and access the chan struct{} value with atomic.LoadPointer in loop and atomic.StorePointer in whilePaused, the race goes away. This seems to be providing the exact same acquire-release ordering that the channel is already supposed to provide?

  • If I add a time.Sleep(10 * time.Microsecond) between the l.paused.Done() and <-l.resume, the program usually deadlocks after calling fn one or two times.

  • If I add a fmt.Printf(".") instead, the program prints 28 .s, calls the first function, prints another 32 .s, then hangs (or, occasionally, calls the second function, then prints another 32 .s and hangs).


Here's the rest of my code, in case you want to run the whole thing:

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
)

// looper code from above

var n int64    
func dostuff() {
    atomic.AddInt64(&n, 1)
}

func main() {
    l := &looper{
        pause: make(chan struct{}),
    }
    var init sync.WaitGroup
    init.Add(32)
    for i := 0; i < 32; i++ {
        go func() {
            init.Done()
            l.loop()
        }()
    }
    init.Wait()
    for i := 0; i < 100; i++ {
        l.whilePaused(func() { fmt.Printf("%d ", i) })
    }
    fmt.Printf("
%d
", atomic.LoadInt64(&n))
}

This is because after the thread does l.paused.Done(), the other thread is able to go around the loop and assign l.resume again

Here is sequence of operations

Looper thread    |    Pauser thread
------------------------------------
l.paused.Done()  |   
                 |   l.paused.Wait()
                 |   l.pause = make(chan struct{})
                 |   round the loop
                 |   l.paused.Add(numThreads)
<- l.resume      |   l.resume = make(chan struct{})   !!!RACE!!