多次使用时,TCP连接返回“管道中断”错误

This question relates to go and its net package.

I wrote a simple tcp server handles some RPC. the client is using a chan net.Conn to manage all tcp connection on the client side. Server is running with a tcp listener.

here's the code: client:

package server

import (
    "errors"
    "log"
    "net"
)

var tcpPool chan net.Conn

func NewClient(connections int, address string) {

    tcpPool = make(chan net.Conn, connections)
    for i := 0; i < connections; i++ {
        conn, err := net.Dial("tcp4", address)
        if err != nil {
            log.Panic(err)
        }
        tcpPool <- conn
    }
}

func SendMessage(msg []byte) ([]byte, error) {
    conn := getConn()

    log.Println("check conn: ", conn)
    log.Println("msg: ", msg)

    defer releaseConn(conn)
    // send message
    n, err := conn.Write(msg)
    if err != nil {
        log.Panic(err)
    } else if n < len(msg) {
        log.Panic(errors.New("Message did not send in full"))
    }

    // receiving a message
    inBytes := make([]byte, 0)

    for {
        // bufsize 1024, read bufsize bytes each time
        b := make([]byte, bufSize)
        res, err := conn.Read(b)
        log.Println("server sends >>>>>>>>>>>>: ", res)
        if err != nil {
            b[0] = ReError
            break
        }
        inBytes = append(inBytes, b[:res]...)
        // message finished.
        if res < bufSize {
            break
        }
    }
    // check replied message
    if len(inBytes) == 0 {
        return []byte{}, errors.New("empty buffer error")
    }
    log.Println("SendMessage gets: ", inBytes)
    return inBytes, nil
}

func releaseConn(conn net.Conn) error {
    log.Println("return conn to pool")
    select {
    case tcpPool <- conn:
        return nil
    }
}

func getConn() (conn net.Conn) {
    log.Println("Take one from pool")
    select {
    case conn := <-tcpPool:
        return conn
    }
}

server

func StartTCPServer(network, addr string) error {
    listener, err := net.Listen(network, addr)
    if err != nil {
        return errors.Wrapf(err, "Unable to listen on address %s
", addr)
    }
    log.Println("Listen on", listener.Addr().String())
    defer listener.Close()
    for {
        log.Println("Accept a connection request.")
        conn, err := listener.Accept()
        if err != nil {
            log.Println("Failed accepting a connection request:", err)
            continue
        }
        log.Println("Handle incoming messages.")
        go onConn(conn)
    }
}

//onConn recieves a tcp connection and waiting for incoming messages
func onConn(conn net.Conn) {
    inBytes := make([]byte, 0)
    defer func() {
        if e := recover(); e != nil {
            //later log
            if err, ok := e.(error); ok {
                println("recover", err.Error())
            }
        }
        conn.Close()
    }()
    // load msg
    for {
        buf := make([]byte, bufSize)
        res, err := conn.Read(buf)
        log.Println("server reading: ", res)
        inBytes = append(inBytes, buf[:res]...)
        if err != nil || res < bufSize {
            break
        }
    }

    var req RPCRequest
    err := json.Unmarshal(inBytes, &req)
    if err != nil {
        log.Panic(err)
    }
    log.Println("rpc request: ", req)

    var query UserRequest
    err = json.Unmarshal(req.Query, &query)
    if err != nil {
        log.Panic(err)
    }
    log.Println("rpc request query: ", query)

    // call method to process request
    // good now we can proceed to function call
    // some actual function calls gets a output
    // outBytes, err := json.Marshal(out)
    conn.Write(outBytes)
}

I think this is very standard. but for some reason, I can only send message on the client side one, and then the follow 2nd and 3rd start to show some irregularity.

1st ---> success, gets response 2nd ---> client can send but nothing gets back, logs on server side shows no in coming message 3rd ---> if I send from client side one more time, it shows broken pipe error..

There are some bad handling way. First, the flag to insure the msg from server finished is depending on io.EOF,not length

    // message finished.
    if res < 512 {
        break
    }

instead of this, reader returns an io.EOF is the only symbol that shows message finished. Second, chan type has its property to block and not need to use select.by the way, you really need to start a goroutine to release. The same requirement for getConn

func releaseConn(conn net.Conn)  {
    go func(){
        tcpPool <- conn
    }()
}

func getConn() net.Conn {
    con := <-tcpPool
    return con
}

Third, listener should not be close, code below is bad

defer listener.Close()

The most important reason is on the client side, res, err := conn.Read(b) this receive the reply from the server. when nothing reply ,it block rather than io.EOF, nor some error else. It means ,you cann't box a lasting communicating part into a function send(). You can do a single thing to use sendmsg() to send, but never use sendmsg() to handle the reply. you can handle reply like this

var receive chan string

func init() {
    receive = make(chan string, 10)
}
func ReceiveMessage(con net.Conn) {
    // receiving a message
    inBytes := make([]byte, 0, 1000)
    var b = make([]byte, 512)
    for {
        // bufsize 1024, read bufsize bytes each time
        res, err := con.Read(b)
        if err != nil {
            if err == io.EOF {
                break
            }
            fmt.Println(err.Error())
            break
        }
        inBytes = append(inBytes, b[:res]...)
        msg := string(inBytes)
        fmt.Println("receive msg from server:" + msg)
        receive <- msg
    }
}

I found several problem in your code, but I can't tell which one leads your failure. This is my code according to what you write and did some fixing. client.go:

package main

import (
    "fmt"
    "io"
    "log"
    "net"
)

var tcpPool chan net.Conn
var receive chan string

func init() {
    receive = make(chan string, 10)
}
func NewClient(connections int, address string) {
    tcpPool = make(chan net.Conn, connections)
    for i := 0; i < connections; i++ {
        conn, err := net.Dial("tcp", address)
        if err != nil {
            log.Panic(err)
        }
        tcpPool <- conn
    }
}

func SendMessage(con net.Conn, msg []byte) error {
    // send message
    _, err := con.Write(msg)
    if err != nil {
        log.Panic(err)
    }
    return nil
}

func ReceiveMessage(con net.Conn) {
    // receiving a message
    inBytes := make([]byte, 0, 1000)
    var b = make([]byte, 512)
    for {
        // bufsize 1024, read bufsize bytes each time
        res, err := con.Read(b)
        if err != nil {
            if err == io.EOF {
                break
            }
            fmt.Println(err.Error())
            break
        }
        inBytes = append(inBytes, b[:res]...)
        msg := string(inBytes)
        fmt.Println("receive msg from server:" + msg)
        receive <- msg
    }
}

func getConn() net.Conn {
    con := <-tcpPool
    return con
}

func main() {
    NewClient(20, "localhost:8101")
    con := <-tcpPool
    e := SendMessage(con, []byte("hello, i am client"))
    if e != nil {
        fmt.Println(e.Error())
        return
    }
    go ReceiveMessage(con)
    var msg string
    for {
        select {
        case msg = <-receive:
            fmt.Println(msg)
        }
    }
}

server.go

package main

import (
    "fmt"
    "io"
    "net"
)

func StartTCPServer(network, addr string) error {
    listener, err := net.Listen(network, addr)
    if err != nil {
        return err
    }
    for {
        conn, err := listener.Accept()
        if err != nil {

            fmt.Println(err.Error())
            continue

        }
        onConn(conn)
    }
}

//onConn recieves a tcp connection and waiting for incoming messages
func onConn(conn net.Conn) {
    inBytes := make([]byte, 0)
    // load msg
    for {
        buf := make([]byte, 512)
        res, err := conn.Read(buf)
        if err != nil {
            if err == io.EOF {
                return
            }
            fmt.Println(err.Error())
            return
        }
        inBytes = append(inBytes, buf[:res]...)

        fmt.Println("receive from client:" + string(inBytes))
        conn.Write([]byte("hello"))
    }
}

func main() {
    if e := StartTCPServer("tcp", ":8101"); e != nil {
        fmt.Println(e.Error())
        return
    }
}

this works and no error. By the way, I can't see where either on the client side or the server side you do con.Close(). It's nessasary to close it.This means a connection once got from the pool, you don't put it back. When you think a connection is over, then close it and build a new connection to fill the pool rather than put it back,beause it's a fatal operation to put a closed con back to the pool.