优雅地终止基于计时器的编写器,并进入例程进行读取器处理

I have following code which contains:

  • ticker based writer
  • go routine for reading
  • data channel for writer and reader communication
  • stop_writer and stop_reader channels for stopping writer and reader gracefully
  • signal handler in order to handle "Ctrl-C" user input

Code:

package main

import (
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"
)

func user_interupt_register() (writer_stop, reader_stop chan struct{}) {
    writer_stop = make(chan struct{}, 1)
    reader_stop = make(chan struct{}, 1)

    signal_channel := make(chan os.Signal)
    signal.Notify(signal_channel, syscall.SIGINT, syscall.SIGTERM)

    go func() {
        for sig := range signal_channel {
            log.Println("Signal received: ", sig)
            reader_stop <- struct{}{}
            log.Println("Signal sent")
            break
        }

        close(signal_channel)
        log.Println("Closing User Interupt Register go routing")
    }()

    return writer_stop, reader_stop
}

func writer(item_id *uint32, data_channel chan uint32) {
    log.Println("[WRITER] Item ID: ", *item_id)
    data_channel <- *item_id
    *item_id++
}

func reader(reader_stop, writer_stop chan struct{}, data_channel chan uint32) {
    var flag = true
    for flag {
        select {
        case <-reader_stop:
            writer_stop <- struct{}{}
            flag = false
        case data, ok := <-data_channel:
            if ok {
                log.Println("[READER] Item ID: ", data)
            }
        default:
            time.Sleep(1 * time.Second)
            log.Println("[READER] No ID to print")
            continue
        }
    }

    log.Println("About to close(reader_stop)")
    close(reader_stop)
    log.Println("close(reader_stop) finished")
}

func main() {
    log.Println("Starting Application")

    data_channel := make(chan uint32)
    defer close(data_channel)

    writer_stop, reader_stop := user_interupt_register()

    log.Println("Start Timer")
    ticker := time.NewTicker(2 * time.Second)

    go reader(reader_stop, writer_stop, data_channel)

    var item_id uint32
    item_id = 1

    var flag = true
    for flag {
        select {
        case <-ticker.C:
            writer(&item_id, data_channel)
        case <-writer_stop:
            log.Println("About to stop timer")
            ticker.Stop()
            log.Println("Timer stopped")
            flag = false
        }
    }

    log.Println("About to close(writer_stop)")
    close(writer_stop)
    log.Println("close(writer_stop) finished")
    log.Println("Exiting Application")
}

When I am stopping it occasionally I am getting following outputs:

2016/07/19 16:02:46 Starting Application
2016/07/19 16:02:46 Start Timer
2016/07/19 16:02:47 [READER] No ID to print
2016/07/19 16:02:48 [WRITER] Item ID:  1
2016/07/19 16:02:48 [READER] No ID to print
2016/07/19 16:02:48 [READER] Item ID:  1
2016/07/19 16:02:49 [READER] No ID to print
2016/07/19 16:02:50 [WRITER] Item ID:  2
2016/07/19 16:02:50 [READER] No ID to print
2016/07/19 16:02:50 [READER] Item ID:  2
2016/07/19 16:02:51 [READER] No ID to print
^C2016/07/19 16:02:52 Signal received:  interrupt
2016/07/19 16:02:52 Signal sent
2016/07/19 16:02:52 Closing User Interupt Register go routing
2016/07/19 16:02:52 [WRITER] Item ID:  3
2016/07/19 16:02:52 [READER] No ID to print
2016/07/19 16:02:52 About to close(reader_stop)
2016/07/19 16:02:52 close(reader_stop) finished
^Cpanic: send on closed channel

goroutine 5 [running]:
panic(0x4c1d00, 0xc8200980b0)
    /usr/local/go/src/runtime/panic.go:481 +0x3e6
os/signal.process(0x7ffa8f64d078, 0xc8200980a0)
    /usr/local/go/src/os/signal/signal.go:176 +0x17a
os/signal.loop()
    /usr/local/go/src/os/signal/signal_unix.go:22 +0x75
created by os/signal.init.1
    /usr/local/go/src/os/signal/signal_unix.go:28 +0x37

However most of times it stops normally with following output.

2016/07/19 16:03:13 Starting Application
2016/07/19 16:03:13 Start Timer
2016/07/19 16:03:14 [READER] No ID to print
2016/07/19 16:03:15 [WRITER] Item ID:  1
2016/07/19 16:03:15 [READER] No ID to print
2016/07/19 16:03:15 [READER] Item ID:  1
2016/07/19 16:03:16 [READER] No ID to print
^C2016/07/19 16:03:16 Signal received:  interrupt
2016/07/19 16:03:16 Signal sent
2016/07/19 16:03:16 Closing User Interupt Register go routing
2016/07/19 16:03:17 [WRITER] Item ID:  2
2016/07/19 16:03:17 [READER] No ID to print
2016/07/19 16:03:17 [READER] Item ID:  2
2016/07/19 16:03:17 About to close(reader_stop)
2016/07/19 16:03:17 close(reader_stop) finished
2016/07/19 16:03:17 About to stop timer
2016/07/19 16:03:17 Timer stopped
2016/07/19 16:03:17 About to close(writer_stop)
2016/07/19 16:03:17 close(writer_stop) finished
2016/07/19 16:03:17 Exiting Application

So what is wrong? How can I be sure that it will stop normally stop all go routine and close channels?

What's happening in your code is that as soon as signal is sent, everything works until the end of the reader function. That is, signal is sent to writer_stop (statement writer_stop <- struct{}{} gets executed.

The problem is with your select in main. You are reading for a close value on writer_stop in a case.

    select {
    case <-ticker.C:
        writer(&item_id, data_channel)
    case <-writer_stop:
        log.Println("About to stop timer")
        ticker.Stop()
        log.Println("Timer stopped")
        flag = false
    }

Due to nature of select, if more than one cases are ready to be selected, select selects one at random.

Now in your select above, after the call to writer_stop <- struct{}{} has occurred, somehow due to erroneous synchronization, case <-ticker.C: is selected and writer gets called.

Since reader has returned at this point, and it was the only method reading from data_channel, writer blocks at data_channel <- *item_id, causing your program to hang.

The error panic: send on closed channel occurs because you hit ^C again and the signal channel has been closed.

You can fix this in several way. One simple solution is to add buffer to data_channel:

data_channel := make(chan uint32, 1)

Here's one way to do this:

func user_interupt_register() (writer_stop, reader_stop chan struct{}) {
    writer_stop = make(chan struct{}, 1)
    // same as before

    go func() {
        sig := <-signal_channel
        writer_stop <- struct{}{} // change to writer_stop
        // same as before
    }()

    return writer_stop, reader_stop
}

func reader(data_channel chan uint32) {
    for item := range data_channel {
        log.Println("[READER] Item ID: ", item)
    }
}

func main() {
    log.Println("Starting Application")

    data_channel := make(chan uint32)
    // defer close(data_channel)

    writer_stop, _ := user_interupt_register()

    log.Println("Start Timer")
    ticker := time.NewTicker(2 * time.Second)
    readTicker := time.NewTicker(1 * time.Second)

    go reader(data_channel)

    go func() { // to print intermittent message
        for _ = range readTicker.C {
            data_channel <- 0 // some insignificant value
        }
    }()

    var item_id uint32 = 1

    for _ = range ticker.C {
        select {
        case <-writer_stop:
            data_channel <- item_id
            close(data_channel)
            ticker.Stop()
            readTicker.Stop()
            return
        default:
            data_channel <- item_id
            item_id++
        }
    }

    close(writer_stop)
}

Do note that this'll require a through testing.

IMO you should change your reader/writer structure. You do not need to stop the reader only the writer.

The problem with the current structure is that there might be items that will be written and never read. That is probably why you don't want to make data_channel a buffered channel. But still one item might not get read that was already written to the channel.

You only need a stop channel for the writer. The writer stops if that channel signals and closes the data channel. The reader has no select, just a for item := range data_channel. If the data_channel is closed, reader will empty the existing item(s) in it and stop by itself.

Replace this:

func reader(reader_stop, writer_stop chan struct{}, data_channel chan uint32) {
    var flag = true
    for flag {
        select {
        case <-reader_stop:
            writer_stop <- struct{}{}
            flag = false
        case data, ok := <-data_channel:
            if ok {
                log.Println("[READER] Item ID: ", data)
            }
        default:
            time.Sleep(1 * time.Second)
            log.Println("[READER] No ID to print")
            continue
        }
    }

    log.Println("About to close(reader_stop)")
    close(reader_stop)
    log.Println("close(reader_stop) finished")
}

with:

func reader(writer_stop chan struct{}, data_channel chan uint32) {
    for item := range data_channel {
      log.Println("[READER] Item ID: ", data)
    }
}

Remove:

defer close(data_channel)

and add it here:

case <-writer_stop:
   close(data_channel)
   log.Println("About to stop timer")
   ticker.Stop()
   log.Println("Timer stopped")
   flag = false

Then you will need something that waits for the reader to finish. Check out sync.WaitGroup. Better way would be to put the writer in the go routine and put the reader in the main routine. That way your main routine will stop if the reader is done (which in turn stops if it doesn't receive any more values from the writer - by the writer closing the channel).

Here my working suggestion with as little changes as possible:

package main

import (
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"
)

func user_interupt_register() (writer_stop chan struct{}) {
    writer_stop = make(chan struct{}, 1)

    signal_channel := make(chan os.Signal)
    signal.Notify(signal_channel, syscall.SIGINT, syscall.SIGTERM)

    go func() {
        for sig := range signal_channel {
            log.Println("Signal received: ", sig)
            writer_stop <- struct{}{}
            log.Println("Signal sent")
            break
        }

        close(signal_channel)
        log.Println("Closing User Interupt Register go routing")
    }()

    return
}

func writer(item_id *uint32, data_channel chan uint32, writer_stop chan struct{}) {
    log.Println("Start Timer")
    ticker := time.NewTicker(2 * time.Second)

    var flag = true
    for flag {
        select {
        case <-ticker.C:
            log.Println("[WRITER] Item ID: ", *item_id)
            data_channel <- *item_id
            *item_id++
        case <-writer_stop:
            close(data_channel)
            log.Println("About to stop timer")
            ticker.Stop()
            log.Println("Timer stopped")
            flag = false
        }
    }
}

func main() {
    log.Println("Starting Application")

    data_channel := make(chan uint32)

    writer_stop := user_interupt_register()

    var item_id uint32
    item_id = 1

    go writer(&item_id, data_channel, writer_stop)

    for data := range data_channel {
        log.Println("[READER] Item ID: ", data)
    }

    log.Println("About to close(writer_stop)")
    close(writer_stop)
    log.Println("close(writer_stop) finished")
    log.Println("Exiting Application")
}

You will not have the log.Println("[READER] No ID to print") message but otherwise it should produce the same result.

Note: the other code needs to be adjusted accordingly... like removing the writer_stop channel.