I have following code which contains:
Code:
package main
import (
"log"
"os"
"os/signal"
"syscall"
"time"
)
func user_interupt_register() (writer_stop, reader_stop chan struct{}) {
writer_stop = make(chan struct{}, 1)
reader_stop = make(chan struct{}, 1)
signal_channel := make(chan os.Signal)
signal.Notify(signal_channel, syscall.SIGINT, syscall.SIGTERM)
go func() {
for sig := range signal_channel {
log.Println("Signal received: ", sig)
reader_stop <- struct{}{}
log.Println("Signal sent")
break
}
close(signal_channel)
log.Println("Closing User Interupt Register go routing")
}()
return writer_stop, reader_stop
}
func writer(item_id *uint32, data_channel chan uint32) {
log.Println("[WRITER] Item ID: ", *item_id)
data_channel <- *item_id
*item_id++
}
func reader(reader_stop, writer_stop chan struct{}, data_channel chan uint32) {
var flag = true
for flag {
select {
case <-reader_stop:
writer_stop <- struct{}{}
flag = false
case data, ok := <-data_channel:
if ok {
log.Println("[READER] Item ID: ", data)
}
default:
time.Sleep(1 * time.Second)
log.Println("[READER] No ID to print")
continue
}
}
log.Println("About to close(reader_stop)")
close(reader_stop)
log.Println("close(reader_stop) finished")
}
func main() {
log.Println("Starting Application")
data_channel := make(chan uint32)
defer close(data_channel)
writer_stop, reader_stop := user_interupt_register()
log.Println("Start Timer")
ticker := time.NewTicker(2 * time.Second)
go reader(reader_stop, writer_stop, data_channel)
var item_id uint32
item_id = 1
var flag = true
for flag {
select {
case <-ticker.C:
writer(&item_id, data_channel)
case <-writer_stop:
log.Println("About to stop timer")
ticker.Stop()
log.Println("Timer stopped")
flag = false
}
}
log.Println("About to close(writer_stop)")
close(writer_stop)
log.Println("close(writer_stop) finished")
log.Println("Exiting Application")
}
When I am stopping it occasionally I am getting following outputs:
2016/07/19 16:02:46 Starting Application
2016/07/19 16:02:46 Start Timer
2016/07/19 16:02:47 [READER] No ID to print
2016/07/19 16:02:48 [WRITER] Item ID: 1
2016/07/19 16:02:48 [READER] No ID to print
2016/07/19 16:02:48 [READER] Item ID: 1
2016/07/19 16:02:49 [READER] No ID to print
2016/07/19 16:02:50 [WRITER] Item ID: 2
2016/07/19 16:02:50 [READER] No ID to print
2016/07/19 16:02:50 [READER] Item ID: 2
2016/07/19 16:02:51 [READER] No ID to print
^C2016/07/19 16:02:52 Signal received: interrupt
2016/07/19 16:02:52 Signal sent
2016/07/19 16:02:52 Closing User Interupt Register go routing
2016/07/19 16:02:52 [WRITER] Item ID: 3
2016/07/19 16:02:52 [READER] No ID to print
2016/07/19 16:02:52 About to close(reader_stop)
2016/07/19 16:02:52 close(reader_stop) finished
^Cpanic: send on closed channel
goroutine 5 [running]:
panic(0x4c1d00, 0xc8200980b0)
/usr/local/go/src/runtime/panic.go:481 +0x3e6
os/signal.process(0x7ffa8f64d078, 0xc8200980a0)
/usr/local/go/src/os/signal/signal.go:176 +0x17a
os/signal.loop()
/usr/local/go/src/os/signal/signal_unix.go:22 +0x75
created by os/signal.init.1
/usr/local/go/src/os/signal/signal_unix.go:28 +0x37
However most of times it stops normally with following output.
2016/07/19 16:03:13 Starting Application
2016/07/19 16:03:13 Start Timer
2016/07/19 16:03:14 [READER] No ID to print
2016/07/19 16:03:15 [WRITER] Item ID: 1
2016/07/19 16:03:15 [READER] No ID to print
2016/07/19 16:03:15 [READER] Item ID: 1
2016/07/19 16:03:16 [READER] No ID to print
^C2016/07/19 16:03:16 Signal received: interrupt
2016/07/19 16:03:16 Signal sent
2016/07/19 16:03:16 Closing User Interupt Register go routing
2016/07/19 16:03:17 [WRITER] Item ID: 2
2016/07/19 16:03:17 [READER] No ID to print
2016/07/19 16:03:17 [READER] Item ID: 2
2016/07/19 16:03:17 About to close(reader_stop)
2016/07/19 16:03:17 close(reader_stop) finished
2016/07/19 16:03:17 About to stop timer
2016/07/19 16:03:17 Timer stopped
2016/07/19 16:03:17 About to close(writer_stop)
2016/07/19 16:03:17 close(writer_stop) finished
2016/07/19 16:03:17 Exiting Application
So what is wrong? How can I be sure that it will stop normally stop all go routine and close channels?
What's happening in your code is that as soon as signal is sent, everything works until the end of the reader
function. That is, signal is sent to writer_stop
(statement writer_stop <- struct{}{}
gets executed.
The problem is with your select
in main. You are reading for a close value on writer_stop
in a case
.
select {
case <-ticker.C:
writer(&item_id, data_channel)
case <-writer_stop:
log.Println("About to stop timer")
ticker.Stop()
log.Println("Timer stopped")
flag = false
}
Due to nature of select
, if more than one cases
are ready to be selected, select
selects one at random.
Now in your select
above, after the call to writer_stop <- struct{}{}
has occurred, somehow due to erroneous synchronization, case <-ticker.C:
is selected and writer
gets called.
Since reader
has returned at this point, and it was the only method reading from data_channel
, writer
blocks at data_channel <- *item_id
, causing your program to hang.
The error panic: send on closed channel
occurs because you hit ^C
again and the signal channel has been closed.
You can fix this in several way. One simple solution is to add buffer to data_channel:
data_channel := make(chan uint32, 1)
Here's one way to do this:
func user_interupt_register() (writer_stop, reader_stop chan struct{}) {
writer_stop = make(chan struct{}, 1)
// same as before
go func() {
sig := <-signal_channel
writer_stop <- struct{}{} // change to writer_stop
// same as before
}()
return writer_stop, reader_stop
}
func reader(data_channel chan uint32) {
for item := range data_channel {
log.Println("[READER] Item ID: ", item)
}
}
func main() {
log.Println("Starting Application")
data_channel := make(chan uint32)
// defer close(data_channel)
writer_stop, _ := user_interupt_register()
log.Println("Start Timer")
ticker := time.NewTicker(2 * time.Second)
readTicker := time.NewTicker(1 * time.Second)
go reader(data_channel)
go func() { // to print intermittent message
for _ = range readTicker.C {
data_channel <- 0 // some insignificant value
}
}()
var item_id uint32 = 1
for _ = range ticker.C {
select {
case <-writer_stop:
data_channel <- item_id
close(data_channel)
ticker.Stop()
readTicker.Stop()
return
default:
data_channel <- item_id
item_id++
}
}
close(writer_stop)
}
Do note that this'll require a through testing.
IMO you should change your reader/writer structure. You do not need to stop the reader only the writer.
The problem with the current structure is that there might be items that will be written and never read. That is probably why you don't want to make data_channel a buffered channel. But still one item might not get read that was already written to the channel.
You only need a stop channel for the writer. The writer stops if that channel signals and closes the data channel. The reader has no select, just a for item := range data_channel
. If the data_channel is closed, reader will empty the existing item(s) in it and stop by itself.
Replace this:
func reader(reader_stop, writer_stop chan struct{}, data_channel chan uint32) {
var flag = true
for flag {
select {
case <-reader_stop:
writer_stop <- struct{}{}
flag = false
case data, ok := <-data_channel:
if ok {
log.Println("[READER] Item ID: ", data)
}
default:
time.Sleep(1 * time.Second)
log.Println("[READER] No ID to print")
continue
}
}
log.Println("About to close(reader_stop)")
close(reader_stop)
log.Println("close(reader_stop) finished")
}
with:
func reader(writer_stop chan struct{}, data_channel chan uint32) {
for item := range data_channel {
log.Println("[READER] Item ID: ", data)
}
}
Remove:
defer close(data_channel)
and add it here:
case <-writer_stop:
close(data_channel)
log.Println("About to stop timer")
ticker.Stop()
log.Println("Timer stopped")
flag = false
Then you will need something that waits for the reader to finish. Check out sync.WaitGroup
. Better way would be to put the writer in the go routine and put the reader in the main routine. That way your main routine will stop if the reader is done (which in turn stops if it doesn't receive any more values from the writer - by the writer closing the channel).
Here my working suggestion with as little changes as possible:
package main
import (
"log"
"os"
"os/signal"
"syscall"
"time"
)
func user_interupt_register() (writer_stop chan struct{}) {
writer_stop = make(chan struct{}, 1)
signal_channel := make(chan os.Signal)
signal.Notify(signal_channel, syscall.SIGINT, syscall.SIGTERM)
go func() {
for sig := range signal_channel {
log.Println("Signal received: ", sig)
writer_stop <- struct{}{}
log.Println("Signal sent")
break
}
close(signal_channel)
log.Println("Closing User Interupt Register go routing")
}()
return
}
func writer(item_id *uint32, data_channel chan uint32, writer_stop chan struct{}) {
log.Println("Start Timer")
ticker := time.NewTicker(2 * time.Second)
var flag = true
for flag {
select {
case <-ticker.C:
log.Println("[WRITER] Item ID: ", *item_id)
data_channel <- *item_id
*item_id++
case <-writer_stop:
close(data_channel)
log.Println("About to stop timer")
ticker.Stop()
log.Println("Timer stopped")
flag = false
}
}
}
func main() {
log.Println("Starting Application")
data_channel := make(chan uint32)
writer_stop := user_interupt_register()
var item_id uint32
item_id = 1
go writer(&item_id, data_channel, writer_stop)
for data := range data_channel {
log.Println("[READER] Item ID: ", data)
}
log.Println("About to close(writer_stop)")
close(writer_stop)
log.Println("close(writer_stop) finished")
log.Println("Exiting Application")
}
You will not have the log.Println("[READER] No ID to print")
message but otherwise it should produce the same result.
Note: the other code needs to be adjusted accordingly... like removing the writer_stop channel.