I have a channel that will receive bursts of writes to it. I want to wait until a burst of sends on the channel have finished before triggering an action.
I have looked at this gist, however, it will send on the output every interval
if there is data in the buffer:
func debounceChannel(interval time.Duration, output chan int) chan int {
input := make(chan int)
go func() {
var buffer int
var ok bool
// We do not start waiting for interval until called at least once
buffer, ok = <-input
// If channel closed exit, we could also close output
if !ok {
return
}
// We start waiting for an interval
for {
select {
case buffer, ok = <-input:
// If channel closed exit, we could also close output
if !ok {
return
}
case <-time.After(interval):
// Interval has passed and we have data, so send it
output <- buffer
// Wait for data again before starting waiting for an interval
buffer, ok = <-input
if !ok {
return
}
// If channel is not closed we have more data and start waiting for interval
}
}
}()
return input
}
In my case, I want to wait until there is no longer any data being sent on the input channel for this burst before triggering or sending on the output.
How do I achieve this?
This is what I ended up implementing as my debouncer:
func Debounce(lull time.Duration, in chan struct{}, out chan struct{}) {
go func() {
var last int64 = 0
for {
select {
case <-in:
last = time.Now().Unix()
case <-time.Tick(lull):
if last != 0 && time.Now().Unix() >= last+int64(lull.Seconds()) {
last = 0
out <- struct{}{}
}
}
}
}()
}
It takes a lull time which is the duration where if we do not receive on the input, then we assume there is a break in the bursts of data. There are 2 channels, 1 input and 1 output. The bursts of data arrives on the input, and for each burst, we write to the output channel at the end of the burst.
The implementation is extremely simplistic. I just store the current unix timestamp every time I receive from the input channel. Then, I have a ticker ticking with a duration of the lull time. All this does is check to see if we've exceeded the wait time for the last burst. If so, it resets last
to 0 an emits an event on the output channel.
Here's some code using the debounce function with a lull time of 2 seconds which sends random bursts on the input channel:
func main() {
out := make(chan struct{})
in := make(chan struct{})
Debounce(2*time.Second, in, out)
// Generating bursts of input data
go func(in chan struct{}) {
for {
select {
case <-time.Tick(1 * time.Second):
in <- struct{}{}
fmt.Println("Sending!")
shouldSleep := rand.Intn(2)
if shouldSleep == 1 {
time.Sleep(5 * time.Second)
}
}
}
}(in)
// Listening for output events
go func(out chan struct{}) {
for _ = range out {
fmt.Println("Got an event!")
}
}(out)
// Do not let main terminate.
done := make(chan struct{})
<-done
}
Sounds like you need synchronization between goroutines, perhaps along this line.
func main() {
// Create a channel for our input
input := make(chan int, 1)
// Create another for synchronization between main and forked goroutines
done := make(chan bool)
go func() {
// block-wait for received value
<-input
// do some more things here
// when done, send signal to the main goroutine
done <- true
}()
// Do something while wait for the forked goroutine
// this block until `<-done`
<-done
close(mychan)
}
This post explains quite clear about synchronization using channels and sync group.