同时,如何管理价值/状态并避免竞争状况

How to properly set/modify a value based on events/conditions that happen after process has started, while dealing with Goroutines without creating a race condition.

For example, the following "works (is buggy)" and output is:

ping, foo=true
ping, foo=false
ping, foo=true
ping, foo=true
ping, foo=true

https://play.golang.org/p/Y3FafF-nBc

package main

import "fmt"

type test struct {
    ch  chan string
    foo bool
}

func (t *test) run() {
    for {
        select {
        case v := <-t.ch:
            fmt.Printf("%+v, foo=%+v
", v, t.foo)
            t.foo = false
        default:
        }
    }
}

func (t *test) Ping() {
    t.ch <- "ping"
}

func New() *test {
    t := &test{
        ch: make(chan string),
    }
    go t.run()
    return t
}

func main() {
    t := New()
    for i := 0; i <= 10; i++ {
        if t.foo {
            t.Ping()
        }
        if i%3 == 0 {
            t.foo = true
        }
    }
}

But if compiled or run using the -race option, I get this output:

$ go run -race main.go
ping, foo=true
==================
WARNING: DATA RACE
Write at 0x00c4200761b8 by goroutine 6:
  main.(*test).run()
      /main.go:16 +0x1fb

Previous read at 0x00c4200761b8 by main goroutine:
  main.main()
      /main.go:37 +0x5e

Goroutine 6 (running) created at:
  main.New()
      /main.go:30 +0xd0
  main.main()
      /main.go:35 +0x33
==================
ping, foo=false
ping, foo=true
ping, foo=true
ping, foo=true
Found 1 data race(s)
exit status 66

Therefore, I would like to know what concurrency pattern could I use for being available to change the value of foo outside the gorutine and also inside the gorutine without creating a race condition.

You have some options:

  • Using atomic.Value : Sample (1)
  • Using sync.RWMutex : Sample (3)
  • Using sync/atomic : Sample (6)
  • Using only channels and goroutines : Sample (7)

Also see: Use a sync.Mutex or a channel?


1- You may use atomic.Value:

A Value provides an atomic load and store of a consistently typed value. Values can be created as part of other data structures. The zero value for a Value returns nil from Load. Once Store has been called, a Value must not be copied.

A Value must not be copied after first use.

Like this working sample:

// to test the panic use go build -race
package main

import (
    "fmt"
    "sync/atomic"
)

type test struct {
    ch chan string
    atomic.Value
}

func (t *test) run() {
    for {
        select {
        case v := <-t.ch:
            fmt.Printf("%+v, foo=%+v
", v, t.Load())
            t.Store(false)
        default:
        }
    }
}

func (self *test) Ping() {
    self.ch <- "ping"
}

func New() *test {
    t := &test{
        ch: make(chan string),
    }
    t.Store(false)
    go t.run()
    return t
}

func main() {
    t := New()
    for i := 0; i <= 10; i++ {
        if x, _ := t.Load().(bool); x {
            t.Ping()
        }
        //  time.Sleep(time.Second)
        if i%3 == 0 {
            t.Store(true)
        }
    }
}

output with go build -race:

ping, foo=true
ping, foo=false
ping, foo=false
ping, foo=false
ping, foo=false

2- A little improvment to func (t *test) run():

func (t *test) run() {
    for v := range t.ch {
        fmt.Printf("%+v, foo=%+v
", v, t.Load())
        t.Store(false)
    }
}

3- You may use sync.RWMutex and sync.WaitGroup, like this working sample:

// to test the panic use go build -race
package main

import (
    "fmt"
    "sync"
)

type test struct {
    ch  chan string
    foo bool
    sync.RWMutex
    sync.WaitGroup
}

func (t *test) run() {
    for v := range t.ch {
        t.Lock()
        r := t.foo
        t.foo = false
        t.Unlock()
        fmt.Printf("%+v, foo=%+v
", v, r)

    }
    t.Done()
}

func (self *test) Ping() {
    self.ch <- "ping"
}

func New() *test {
    t := &test{ch: make(chan string)}
    t.Add(1)
    go t.run()
    return t
}

func main() {
    t := New()
    for i := 0; i <= 10; i++ {
        t.RLock()
        r := t.foo
        t.RUnlock()
        if r {
            t.Ping()
        }
        //  time.Sleep(time.Second)
        if i%3 == 0 {
            t.Lock()
            t.foo = true
            t.Unlock()
        }
    }
    close(t.ch)
    t.Wait()
}

output with go build -race:

ping, foo=true
ping, foo=true
ping, foo=false
ping, foo=true
ping, foo=false
ping, foo=true

4- So let's follow this approach https://talks.golang.org/2013/bestpractices.slide#29:
Original Code:

package main

import (
    "fmt"
    "time"
)

type Server struct{ quit chan bool }

func NewServer() *Server {
    s := &Server{make(chan bool)}
    go s.run()
    return s
}

func (s *Server) run() {
    for {
        select {
        case <-s.quit:
            fmt.Println("finishing task")
            time.Sleep(time.Second)
            fmt.Println("task done")
            s.quit <- true
            return
        case <-time.After(time.Second):
            fmt.Println("running task")
        }
    }
}
func (s *Server) Stop() {
    fmt.Println("server stopping")
    s.quit <- true
    <-s.quit
    fmt.Println("server stopped")
}

func main() {
    s := NewServer()
    time.Sleep(2 * time.Second)
    s.Stop()
}

5- Let's simplify it:

package main

import (
    "fmt"
    "time"
)

var quit = make(chan bool)

func main() {
    go run()
    time.Sleep(2 * time.Second)
    fmt.Println("server stopping")

    quit <- true // signal to quit

    <-quit // wait for quit signal

    fmt.Println("server stopped")
}

func run() {
    for {
        select {
        case <-quit:
            fmt.Println("finishing task")
            time.Sleep(time.Second)
            fmt.Println("task done")
            quit <- true
            return
        case <-time.After(time.Second):
            fmt.Println("running task")
        }
    }
}

output:

running task
running task
server stopping
finishing task
task done
server stopped

6- Simplified version of your sample:

// to test the panic use go build -race
package main

import "fmt"
import "sync/atomic"

var ch = make(chan string)
var state int32

func main() {
    go run()
    for i := 0; i <= 10; i++ {
        if atomic.LoadInt32(&state) == 1 {
            ch <- "ping"
        }
        if i%3 == 0 {
            atomic.StoreInt32(&state, 1)
        }
    }
}

func run() {
    for v := range ch {
        fmt.Printf("%+v, state=%+v
", v, atomic.LoadInt32(&state))
        atomic.StoreInt32(&state, 0)
    }
}

output:

ping, state=1
ping, state=0
ping, state=1
ping, state=0
ping, state=1
ping, state=0

7- working sample with channels and without using Lock() (The Go Playground):

// to test the panic use go build -race
package main

import "fmt"

func main() {
    go run()
    for i := 0; i <= 10; i++ {
        signal <- struct{}{}
        if <-read {
            ping <- "ping"
        }
        if i%3 == 0 {
            write <- true
        }
    }
}

func run() {
    foo := false
    for {
        select {
        case <-signal:
            fmt.Println("signal", foo)
            read <- foo
        case foo = <-write:
            fmt.Println("write", foo)
        case v := <-ping:
            fmt.Println(v, foo)
            foo = false
        }
    }
}

var (
    ping   = make(chan string)
    signal = make(chan struct{})
    read   = make(chan bool)
    write  = make(chan bool)
)

output:

signal false
write true
signal true
ping true
signal false
signal false
write true
signal true
ping true
signal false
signal false
write true
signal true
ping true
signal false
signal false
write true
signal true
ping true

Use Mutexes

package main

import (
    "sync"
    "time"
    "fmt"
)

var myvar int
var mut sync.Mutex

func main() {
    for {
        go other()
        go printer()
        time.Sleep(time.Duration(1) * time.Second)
    }
}

func other() {
    mut.Lock()
    myvar = myvar +1
    mut.Unlock()
}

func printer() {
    mut.Lock()
    fmt.Println(myvar)
    mut.Unlock()
}

Run (with mutexes)

$ go build -race t1.go 
$ ./t1 
1
2
3
4
5
6
7
7
9
10

Run (without mutexes)

$ go build t2.go 
$ go build -race t2.go 
$ ./t2 
==================
WARNING: DATA RACE
Read at 0x000000580ce8 by goroutine 7:
  runtime.convT2E()
      /usr/local/go/src/runtime/iface.go:155 +0x0
  main.printer()
      /.../.../.../GOPATH/t2.go:23 +0x65

Previous write at 0x000000580ce8 by goroutine 6:
  main.other()
      /.../.../.../GOPATH/t2.go:19 +0x3d

Goroutine 7 (running) created at:
  main.main()
      /.../.../.../GOPATH/t2.go:13 +0x5a

Goroutine 6 (finished) created at:
  main.main()
      /.../.../.../GOPATH/t2.go:12 +0x42
==================
1
2