Is there a way to update unread data that been sent to a channel with more up to date data?
I have a goroutine (producer) with a channel that's provides progress updates to another goroutine (consumer). In some scenarios, the progress can update much faster than the consumer consumes the update messages.
This causes me issues as I can either:
As an example, I might have something like this:
Progress reporting goroutine: Posts "1%" to channel Progress reporting goroutine: Posts "2%" to channel Progress reporting goroutine: Posts "3%" to channel Progress consuming goroutine: Reads "1%", "2%" and "3%" from channel. "1% and "2%" are outdated information.
Is there any way to update unread channel data? Or is there a better way of going about this issue?
You can store some value in a global variable protected with RWMutex
It keeps progress. Generator updates it. Consumer reads and shows.
Also you can make a non-blocking writing to channel with length of 1:
var c = make(chan struct{}, 1)
select {
case c <- struct{}{}:
default:
}
This way sender either adds one element to the channel either do nothing if it’s full.
Reader treats this empty struct as a signal - it should take updated value in the global variable.
Another way: Updatable channel
var c = make(chan int, 1)
select {
case c <- value: // channel was empty - ok
default: // channel if full - we have to delete a value from it with some precautions to not get locked in our own channel
switch {
case <- c: // read stale value and put a fresh one
c <- value
default: // consumer have read it - so skip and not get locked
}
}
Just clear the channel each time before sending value to the channel.
In other words, when you send 3%, 3% becomes the only value in channel.
You can make your channel with buffer length of 1, so simply use <-ch
to clear the channel.
Edit: use a select with default to clear the channel with <-ch
and so do not block in the case the previous value is already read.
package main
import "fmt"
func main() {
// channel buffer must be 1 in this case
var ch = make(chan int, 1)
// when channel was not consumed and you want to update value at channel
produceToChannel(ch, 1)
produceToChannel(ch, 2)
fmt.Println(consumeFromChannel(ch)) // prints 2
// when channel was already consumed and you are producing new value
produceToChannel(ch, 3)
consumeFromChannel(ch)
produceToChannel(ch, 4)
fmt.Println(consumeFromChannel(ch)) // prints 4
}
func produceToChannel(ch chan int, v int) {
select {
case <-ch:
default:
}
ch <- v
}
func consumeFromChannel(ch chan int) int {
return <- ch
}
How about a concurrent map that stores versions for all your inbound objects, with 0 say being the default version
import "sync"
var Versions sync.Map = sync.Map{}
type Data struct {
Payload interface{}
Version int
ID int
}
func produce(c chan Data) {
for {
data := produceData()
if hasNewVersion(data) {
Versions.Store(data.ID, data.Version)
}
c <- data
}
}
func consume(c chan Data) {
for {
select {
case val:= <- c:
if ver, ok := Versions.Load(val.ID); ok {
if ver.(int) == val.Version {
// process
}
}
}
}
}