Golang在Goroutine之间共享大量数据

I have a need to read structure fields set from another goroutine, afaik doing so directly even when knowing for sure there will be no concurrent access(write finished before read occurred, signaled via chan struct{}) may result in stale data

Will sending a pointer to the structure(created in the 1st goroutine, modified in the 2nd, read by the 3rd) resolve the possible staleness issue, considering I can guarantee no concurrent access?

I would like to avoid copying as structure is big and contains huge Bytes.Buffer filled in the 2nd goroutine, I need to read from the 3rd

There is an option for locking, but seems like an overkill considering I know that there will be no concurrent access

There are many answers to this, and it depends to your data structure and program logic.

see: How to lock/synchronize access to a variable in Go during concurrent goroutines?
and: How to use RWMutex in Golang?

1- using Stateful Goroutines and channels
2- using sync.Mutex
3- using sync/atomic
4- using WaitGroup
5- using program logic(Semaphore)
...


1: Stateful Goroutines and channels:
I simulated very similar sample(imagine you want to read from one SSD and write to another SSD with different speed):
In this sample code one goroutine (named write) does some job prepares data and fills the big struct, and another goroutine (named read) reads data from big struct then do some job, And the manger goroutine, guarantee no concurrent access to same data. And communication between three goroutines done with channels. And in your case you can use pointers for channel data, or global struct like this sample.
output will be like this:
mean= 36.6920166015625 stdev= 6.068973186592054

I hope this helps you to get the idea.
Working sample code:

package main

import (
    "fmt"
    "math"
    "math/rand"
    "runtime"
    "sync"
    "time"
)

type BigStruct struct {
    big     []uint16
    rpos    int
    wpos    int
    full    bool
    empty   bool
    stopped bool
}

func main() {
    wg.Add(1)
    go write()
    go read()
    go manage()
    runtime.Gosched()
    stopCh <- <-time.After(5 * time.Second)
    wg.Wait()
    mean := Mean(hist)
    stdev := stdDev(hist, mean)
    fmt.Println("mean=", mean, "stdev=", stdev)
}

const N = 1024 * 1024 * 1024

var wg sync.WaitGroup
var stopCh chan time.Time = make(chan time.Time)

var hist []int = make([]int, 65536)

var s *BigStruct = &BigStruct{empty: true,
    big: make([]uint16, N), //2GB
}

var rc chan uint16 = make(chan uint16)
var wc chan uint16 = make(chan uint16)

func next(pos int) int {
    pos++
    if pos >= N {
        pos = 0
    }
    return pos
}

func manage() {
    dataReady := false
    var data uint16
    for {
        if !dataReady && !s.empty {
            dataReady = true
            data = s.big[s.rpos]
            s.rpos++
            if s.rpos >= N {
                s.rpos = 0
            }
            s.empty = s.rpos == s.wpos
            s.full = next(s.wpos) == s.rpos
        }
        if dataReady {
            select {
            case rc <- data:
                dataReady = false
            default:
                runtime.Gosched()
            }
        }
        if !s.full {
            select {
            case d := <-wc:
                s.big[s.wpos] = d
                s.wpos++
                if s.wpos >= N {
                    s.wpos = 0
                }
                s.empty = s.rpos == s.wpos
                s.full = next(s.wpos) == s.rpos
            default:
                runtime.Gosched()
            }
        }
        if s.stopped {
            if s.empty {
                wg.Done()
                return
            }
        }

    }
}

func read() {
    for {
        d := <-rc
        hist[d]++
    }
}

func write() {
    for {
        wc <- uint16(rand.Intn(65536))
        select {
        case <-stopCh:
            s.stopped = true
            return
        default:
            runtime.Gosched()
        }
    }
}

func stdDev(data []int, mean float64) float64 {
    sum := 0.0
    for _, d := range data {
        sum += math.Pow(float64(d)-mean, 2)
    }
    variance := sum / float64(len(data)-1)
    return math.Sqrt(variance)
}
func Mean(data []int) float64 {
    sum := 0.0
    for _, d := range data {
        sum += float64(d)
    }
    return sum / float64(len(data))
}

5: another way(faster) for some use cases:
here another way to use shared data structure for read job/write job/ processing job which it was separated in first post, now here doing same 3 jobs without channels and without mutex.

working sample:

package main

import (
    "fmt"
    "math"
    "math/rand"
    "time"
)

type BigStruct struct {
    big     []uint16
    rpos    int
    wpos    int
    full    bool
    empty   bool
    stopped bool
}

func manage() {
    for {
        if !s.empty {
            hist[s.big[s.rpos]]++ //sample read job with any time len
            nextPtr(&s.rpos)
        }
        if !s.full && !s.stopped {
            s.big[s.wpos] = uint16(rand.Intn(65536)) //sample wrire job with any time len
            nextPtr(&s.wpos)
        }
        if s.stopped {
            if s.empty {
                return
            }
        } else {
            s.stopped = time.Since(t0) >= 5*time.Second
        }
    }
}

func main() {
    t0 = time.Now()
    manage()
    mean := Mean(hist)
    stdev := StdDev(hist, mean)
    fmt.Println("mean=", mean, "stdev=", stdev)
    d0 := time.Since(t0)
    fmt.Println(d0) //5.8523347s
}

var t0 time.Time

const N = 100 * 1024 * 1024

var hist []int = make([]int, 65536)

var s *BigStruct = &BigStruct{empty: true,
    big: make([]uint16, N), //2GB
}

func next(pos int) int {
    pos++
    if pos >= N {
        pos = 0
    }
    return pos
}
func nextPtr(pos *int) {
    *pos++
    if *pos >= N {
        *pos = 0
    }

    s.empty = s.rpos == s.wpos
    s.full = next(s.wpos) == s.rpos
}

func StdDev(data []int, mean float64) float64 {
    sum := 0.0
    for _, d := range data {
        sum += math.Pow(float64(d)-mean, 2)
    }
    variance := sum / float64(len(data)-1)
    return math.Sqrt(variance)
}
func Mean(data []int) float64 {
    sum := 0.0
    for _, d := range data {
        sum += float64(d)
    }
    return sum / float64(len(data))
}

To prevent concurrent modifications to a struct while retaining the ability to read, you'd typically embed a sync.RWMutex. This is no exemption. You can simply lock your struct for writes while it is in transit and unlock it at a point in time of your convenience.

package main

import (
    "fmt"
    "sync"
    "time"
)

// Big simulates your big struct
type Big struct {
    sync.RWMutex
    value string
}

// pump uses a groutine to take the slice of pointers to Big,
// locks the underlying structs and sends the pointers to
// the locked instances of Big downstream
func pump(bigs []*Big) chan *Big {

    // We make the channel buffered for this example
    // for illustration purposes
    c := make(chan *Big, 3)

    go func() {
        for _, big := range bigs {
            // We lock the struct before sending it to the channel
            // so it can not be changed via pointer while in transit
            big.Lock()
            c <- big
        }
        close(c)
    }()

    return c
}

// sink reads pointers to the locked instances of Big
// reads them and unlocks them
func sink(c chan *Big) {

    for big := range c {
        fmt.Println(big.value)
        time.Sleep(1 * time.Second)
        big.Unlock()

    }
}

// modify tries to achieve locks to the instances and modify them
func modify(bigs []*Big) {
    for _, big := range bigs {

        big.Lock()
        big.value = "modified"
        big.Unlock()
    }
}

func main() {

    bigs := []*Big{&Big{value: "Foo"}, &Big{value: "Bar"}, &Big{value: "Baz"}}
    c := pump(bigs)

    // For the sake of this example, we wait until all entries are
    // send into the channel and hence are locked
    time.Sleep(1 * time.Second)

    // Now we try to modify concurrently before we even start to read
    // the struct of which the pointers were sent into the channel
    go modify(bigs)
    sink(c)

    // We use sleep here to keep waiting for modify() to finish simple.
    // Usually, you'd use a sync.waitGroup
    time.Sleep(1 * time.Second)

    for _, big := range bigs {
        fmt.Println(big.value)
    }

}

Run on playground