需要帮助了解libchan的工作方式

I'm trying to use the libchan library to send messages between machines using a go channel-like transport.

From what I've gathered, the rough idea is this:

  1. You have a SPDY client that sends a serialized command object to an address over tcp. This command object contains a libchan channel called a Pipe that the response is sent through.
  2. When the server receives an incoming connection, it waits for a command object. When it gets one, it sends a response through the Pipe contained in the object.

Here's my point of confusion. For a channel to persist between two machines, they'd have to share memory or atleast share an abstraction that connects the two of them. From my grokking of the libchan codebase, I have no idea how this could be possible.

Here's a snippet from the example in the repo:

// client

    receiver, remoteSender := libchan.Pipe()
    command := &RemoteCommand{
        Cmd:        os.Args[1],
        Args:       os.Args[2:],
        Stdin:      os.Stdin,
        Stdout:     os.Stdout,
        Stderr:     os.Stderr,
        StatusChan: remoteSender,
    }

    err = sender.Send(command)
    if err != nil {
        log.Fatal(err)
    }
    err = receiver.Receive(response)
    if err != nil {
        log.Fatal(err)
    }

    os.Exit(response.Status)

and the server:

// server
t := spdy.NewTransport(p)

        go func() {
            for {
                receiver, err := t.WaitReceiveChannel()
                if err != nil {
                    log.Print("receiver error")
                    log.Print(err)
                    break
                }
                log.Print("about to spawn receive proc")
                go func() {
                    for {
                        command := &RemoteReceivedCommand{}
                        err := receiver.Receive(command)
                        returnResult := &CommandResponse{}
                        if res != nil {
                            if exiterr, ok := res.(*exec.ExitError); ok {
                                returnResult.Status = exiterr.Sys(). 
                              (syscall.WaitStatus).ExitStatus()
                            } else {
                                log.Print("res")
                                log.Print(res)
                                returnResult.Status = 10
                            }
                        }
                        err = command.StatusChan.Send(returnResult)

The point I'm trying to hone in is here:

libchan.Pipe()

According to the source, this returns a channel. One reference is kept on the client, and the other is sent to the server. This channel is then used to pipe values from the latter to the former. How does this actually work in practice?

Full code for client and server

First, it's good to know that all Pipe() does is make a channel and return the in-memory sender/receiver pair.

From inmem.go:

// Pipe returns an inmemory Sender/Receiver pair.
func Pipe() (Receiver, Sender) {
    c := make(chan interface{})
    return pReceiver(c), pSender(c)
}

Then you can look in inmem_test.go for a simple end-to-end example.

This struct is the equivalent of RemoteCommand from the demo.

type InMemMessage struct {
    Data   string
    Stream io.ReadWriteCloser
    Ret    Sender
}

In TestInmemRetPipe(), a simple client and server are created.

The client creates a local sender/receiver pair using Pipe(), while the server simply uses the libchan.Sender interface in the InMemMessage struct.

Note that the client and server are functions which receive a Sender or Receiver as an argument respectively. More on this in the next code snippet.

func TestInmemRetPipe(t *testing.T) {
    client := func(t *testing.T, w Sender) {
        ret, retPipe := Pipe()
        message := &InMemMessage{Data: "hello", Ret: retPipe}

        err := w.Send(message)
        if err != nil {
            t.Fatal(err)
        }
        msg := &InMemMessage{}
        err = ret.Receive(msg)
        if err != nil {
            t.Fatal(err)
        }

        if msg.Data != "this better not crash" {
            t.Fatalf("%#v", msg)
        }

    }
    server := func(t *testing.T, r Receiver) {
        msg := &InMemMessage{}
        err := r.Receive(msg)
        if err != nil {
            t.Fatal(err)
        }

        if msg.Data != "hello" {
            t.Fatalf("Wrong message:
\tExpected: %s
\tActual: %s", "hello", msg.Data)
        }
        if msg.Ret == nil {
            t.Fatal("Message Ret is nil")
        }

        message := &InMemMessage{Data: "this better not crash"}
        if err := msg.Ret.Send(message); err != nil {
            t.Fatal(err)
        }
    }
    SpawnPipeTestRoutines(t, client, server)

}

SpawnPipeTestRoutines() executes the client and server functions. In this function, another sender/receiver air is instantiated via Pipe().

In the demo application, the function being performed here by Pipe() (i.e. facilitating communication between the client and server instances) is instead handled via network communications.

func SpawnPipeTestRoutines(t *testing.T, s SendTestRoutine, r ReceiveTestRoutine) {
    end1 := make(chan bool)
    end2 := make(chan bool)

    receiver, sender := Pipe()

    go func() {
        defer close(end1)
        s(t, sender)
        err := sender.Close()
        if err != nil {
            t.Fatalf("Error closing sender: %s", err)
        }
    }()

    go func() {
        defer close(end2)
        r(t, receiver)
    }()
    ...

In the demo application, the communication is facilitated by calls to Transport.NewSendChannel() on the client and Transport.WaitReceiveChannel(), which return a libchan.Sender and libchan.Receiver respectively. These libchan instances handle facilitating the "pipe" via the network.

From client.go:

sender, err := transport.NewSendChannel()
...
err = sender.Send(command)

From server.go:

receiver, err := t.WaitReceiveChannel()
...
err := receiver.Receive(command)

In both cases, the prerequisite transport configuration is done beforehand (i.e. binding to sockets, utilizing TLS, etc.).

It's probably also worth noting that the spdy library being used is part of the libchan distribution, hence it providing libchan primitives.