尝试Recv返回频道已关闭但仍处于打开状态

I am trying to write a function in Go which monitors a channel and logs what is sent through it.

func monitorChannel(inChannel, outChannel reflect.Value, fid int64, cond *sync.Cond) {
    for {       
    cond.L.Lock()
    var toLog reflect.Value
    var ok bool
    for toLog, ok = inChannel.TryRecv() ; !toLog.IsValid(); { // while no value received
        if !ok {
            cond.L.Unlock()
            return
        }
        cond.Wait()
    }
    outChannel.Send(toLog)
    logMessage("a", "b", inChannel.Interface(), toLog.Interface(), fid)
    cond.L.Unlock()
}

}

This function is supposed to receive from inChannel, log the message sent and send it through outChannel. Since I want to be able to log bi-directional channels, I call this function twice for each channel I want to log, swapping inChannel and outChannel. The lock is to keep the two goroutines from passing messages between each other. "fid" is just the id of the log file.

But when I run the following test code, I get a deadlock :

errsIn := make(chan int64)
errsOut := make(chan int64)
cond := sync.NewCond(&sync.Mutex{})
go monitorChannel(reflect.ValueOf(errsIn), reflect.ValueOf(errsOut), fid, cond)
go monitorChannel(reflect.ValueOf(errsOut), reflect.ValueOf(errsIn), fid,  cond)
errsIn <- 1
if <-errsOut != 1 {
    t.Fatal("lost value through channel send")
}
errsOut <- 1
if <-errsIn != 1 {
    t.Fatal("lost value through channel send")
}

It seems as if TryRecv is returning false on its second return value even though I haven't closed the channel. Why is this? What should I do about it?

I am running go 1.0.3 on Windows 8 64 bit.

EDIT

I later discovered that TryRecv has a somewhat confusing behaviour and managed to make a generalized version of the function using the reflect package and two sync.Locker's. I still think that jnml's solution is more elegant, but if anyone has experienced similar problems with TryRecv, take a look at the comment in the middle of the function.

func passOnAndLog(in, out reflect.Value, l1, l2 sync.Locker) {
    for {
        l1.Lock()
        val, ok := in.TryRecv()
        for !val.IsValid() { // while nothing received
            l1.Unlock()
            time.Sleep(time.Nanosecond) // pausing current thread
            l1.Lock()
            val, ok = in.TryRecv()
        }
        // if val.IsValid() == true  and ok == false ,the channel is closed
        // if val.IsValid() == false and ok == false ,the channel is open but we received nothing
        // if val.IsValid() == true  and ok == true  ,we received an actual value from the open channel
        // if val.IsValid() == false and ok == true  ,we have no idea what happened  
        if !ok {
            return
        }
        l1.Unlock()
        l2.Lock() // don't want the other thread to receive while I am sending
        out.Send(val)
        LogValue(val) // logging

        l2.Unlock()
    }
}

The reflection based solution is too convoluted for me to figure out, being lazy, if it is correct and or feasible at all. (I suspect it is not, but only by intuition.)

I would approach the task in a simpler, although non-generic way. Let's have a channel which will be used by some producer(s) to write to it and will be used by some consumer(s) to read from it.

c := make(chan T, N)

It's possible to monitor this channel using a small helper function, like for example:

func monitored(c chan T) chan T {
        m := make(chan T, M)
        go func() {
                for v := range c {
                        m <- v
                        logMessage(v)
                }
                close(m)
        }()
        return m
}

Now it is enough to:

mc := monitored(c)

and

  • Pass c to producers(s), but mc to consumers(s).
  • Close c when done to not leak goroutines.

Warning: Above code was not tested at all.