当阻塞读取连接关闭时,为什么此go例程随机无法退出?

Why does this receiver go routine refuse to terminate when the connection is closed

This runs as expected but then randomly, every 20-10,000x it is called, a receiver will fail to shutdown, which then causes a go routine leak, leading to 100% cpu.

Note: If I log all errors, I will see read on a closed channel if the conn.SetReadDeadline is commented out. When used, I see i/o timeout as the error.

This ran for 10k cycles, where the main process starts 11 pairs of these send/receivers and they process 1000 jobs before the main process sends the shutdown signal. This setup ran for > 6 hours without any issue to to 10k cycles mark overnight, but this morning I can't get it to run more than 20 cycles without getting a receiver flagged as not shutting down and exiting.

func sender(w worker, ch channel) {

    var j job
    for {
        select {
        case <-ch.quit: // shutdown broadcast, exit
            w.Close()
            ch.stopped <- w.id // debug, send stop confirmed
            return

        case j = <-w.job: // worker designated jobs
        case j = <-ch.spawner: // FCFS jobs
        }

        ... prepare job ...

        w.WriteToUDP(buf, w.addr)

}

func receiver(w worker, ch channel) {

    deadline := 100 * time.Millisecond
out:
    for {
        w.SetReadDeadline(time.Now().Add(deadline))
        // blocking read, should release on close (or deadline)
        n, err = w.c.Read(buf)

        select {
        case <-ch.quit: // shutdown broadcast, exit
            ch.stopped <- w.id+100 // debug, receiver stop confirmed
            return
        default:
        }

        if n == 0 || err != nil {
            continue
        }
        update := &update{id: w.id}

         ... process update logic ...

        select {
        case <-ch.quit: // shutting down
            break out
        case ch.update <- update
        }

}

I need a reliable way to get the receiver to shutdown when it gets either the shutdown broadcast OR the conn is closed. Functionally, closing the channel should be enough and is the preferred method according to the go package documentation, see Conn interface.

I upgraded to the most recent go, which is 1.12.1 with no change. Running on MacOS in development and CentOS in production.

Any run into this problem? If so, how did you reliably fix it?


Possible Solution

My very verbose and icky solution that seems to possibly work, as a work around, is to do this:

1) start the sender in a go routine, like this (above, unchanged)

2) start the receiver in a go routine, like this (below)

func receive(w worker, ch channel) {

    request := make(chan []byte, 1)
    reader := make(chan []byte, 1)

    defer func() {
        close(request) // exit signaling
        w.c.Close()    // exit signaling
        //close(reader)
    }()

    go func() {

        // untried senario, for if we still have leaks -> 100% cpu
        // we may need to be totally reliant on closing request or ch.quit
        // defer w.c.Close()

        deadline := 100 * time.Millisecond
        var n int
        var err error

        for buf := range request {
            for {
                select {
                case <-ch.quit: // shutdown signal
                    return
                default:
                }
                w.c.SetReadDeadline(time.Now().Add(deadline))
                n, err = w.c.Read(buf)
                if err != nil { // timeout or close
                    continue
                }
                break
            }
            select {
            case <-ch.quit: // shutdown signal
                return
            case reader <- buf[:n]:
                //default:
            }
        }
    }()

    var buf []byte

out:
    for {

        request <- make([]byte, messageSize)

        select {
        case <-ch.quit: // shutting down
            break out
        case buf = <-reader:
        }

        update := &update{id: w.id}

      ... process update logic ...


        select {
        case <-ch.quit: // shutting down
            break out
        case ch.update <- update
        }

    }

My question is, why does this horrendous version 2, that spawns a new go routine to read from the blocking c.Read(buf) seem to work more reliably, meaning it does not leak when the shutdown signal is send, when the much simpler first version didn't ... and it seems to be essentially the same thing due to the blocking c.Read(buf).

Downgrading my question is NOT helpful when this is a legitimate and verifiably repeatable issue, the question remains unanswered.

Thanks everyone for the responses.

So. There wasn't ever a stack trace. In fact, I got NO errors at all, not a race detection or anything and it wasn't deadlocked, a go routine just would not shut down and exit, and it wasn't consistently reproducible. I've been running the same data for two weeks.

When the go routine would fail to report that was exiting, it would simple spin out of control and drive the CPU to 100%, but only AFTER all the others exited and the system moved on. I never saw memory grow. CPU would gradually tick up to 200%, 300%, 400% and that's when the system had to be rebooted.

I logged when it was a leak was happening, it was always a different one, and I'd get one leak after 380 prior successful runs (of 23 pairs of go routines running in unision), next time 1832 before one receiver leaked, next time only 23, with exact same data being chewed on at same starting point. The leaked receiver just spun out of control, but only after the group of 22 others companions had all shutdown and exited successfully and the system moved to the next batch. It would not fail consistently, other than it was guaranteed to leak at some point.

After many days, numerous rewrites, and a million log before/after every action, this finally seems to be what the issue was, and after digging through the library I'm not sure why exactly, nor WHY it only happens randomly.

For whatever reason, the golang.org/x/net/dns/dnsmessage library will randomly freak out if you parse and go straight to skipping questions without reading a question first. No idea why that matters, hello, skipping questions means you don't care about that header section and to mark it as processed, and it works fine for literally a million times in a row, but then doesn't, so you seem to be required to read a question BEFORE you can SkipAllQuestions, since this seems to have been the solution. I'm 18,525 batches in, and adding that turned the leaks off.

var p dnsmessage.Parser
h, err := p.Start(buf[:n])
if err != nil {
    continue // what!?
}

switch {
case h.RCode == dnsmessage.RCodeSuccess:
    q, err := p.Question() // should only have one question
    if q.Type != w.Type || err != nil {
        continue // what!?, impossible
    }
    // Note: if you do NOT do the above first, you're asking for pain! (tr)
    if err := p.SkipAllQuestions(); err != nil {
        continue // what!?
    }
    // Do not count as "received" until we have passed above point and
    // validated that response had a question that we could skip...