更新未读频道数据

Is there a way to update unread data that been sent to a channel with more up to date data?

I have a goroutine (producer) with a channel that's provides progress updates to another goroutine (consumer). In some scenarios, the progress can update much faster than the consumer consumes the update messages.

This causes me issues as I can either:

  • Block on sending data to the channel. This means that if the consumer is slow to read data, the progress updating goroutine totally blocks - which it shouldn't.
  • Don't block on sending and skip over progress updates when the channel is full. This means the consumer is always reading old, out of data data.

As an example, I might have something like this:

Progress reporting goroutine: Posts "1%" to channel Progress reporting goroutine: Posts "2%" to channel Progress reporting goroutine: Posts "3%" to channel Progress consuming goroutine: Reads "1%", "2%" and "3%" from channel. "1% and "2%" are outdated information.

Is there any way to update unread channel data? Or is there a better way of going about this issue?

You can store some value in a global variable protected with RWMutex It keeps progress. Generator updates it. Consumer reads and shows.

Also you can make a non-blocking writing to channel with length of 1:

var c = make(chan struct{}, 1)
select {
case c <- struct{}{}:
default:
}

This way sender either adds one element to the channel either do nothing if it’s full.

Reader treats this empty struct as a signal - it should take updated value in the global variable.

Another way: Updatable channel

var c = make(chan int, 1)
select {
case c <- value:  // channel was empty - ok

default: // channel if full - we have to delete a value from it with some precautions to not get locked in our own channel
    switch {
    case <- c: // read stale value and put a fresh one
         c <- value
    default: // consumer have read it - so skip and not get locked
     }
}

Just clear the channel each time before sending value to the channel.

In other words, when you send 3%, 3% becomes the only value in channel.

You can make your channel with buffer length of 1, so simply use <-ch to clear the channel.


Edit: use a select with default to clear the channel with <-ch and so do not block in the case the previous value is already read.

package main

import "fmt"

func main()  {
    // channel buffer must be 1 in this case
    var ch = make(chan int, 1)

    // when channel was not consumed and you want to update value at channel
    produceToChannel(ch, 1)
    produceToChannel(ch, 2)
    fmt.Println(consumeFromChannel(ch)) // prints 2

    // when channel was already consumed and you are producing new value
    produceToChannel(ch, 3)
    consumeFromChannel(ch)
    produceToChannel(ch, 4)
    fmt.Println(consumeFromChannel(ch)) // prints 4
}

func produceToChannel(ch chan int, v int) {
    select {
    case <-ch:
    default:
    }
    ch <- v
}

func consumeFromChannel(ch chan int) int {
    return <- ch
}

How about a concurrent map that stores versions for all your inbound objects, with 0 say being the default version

import "sync"
var Versions sync.Map = sync.Map{}
type Data struct {
    Payload interface{}
    Version int
    ID int
}

func produce(c chan Data) {
    for {
        data := produceData()
        if hasNewVersion(data) {
            Versions.Store(data.ID, data.Version)
        }
        c <- data 
    }
}

func consume(c chan Data) {
    for {
        select {
        case val:= <- c:
            if ver, ok := Versions.Load(val.ID); ok {
                if ver.(int) == val.Version {
                    // process
                }
            }
        }
    }
}