处理多个WebSocket连接

I'm trying to create a program that will connect to several servers though gorilla web-sockets. I currently have a program that will iterate over a list of server addresses and create a new goroutine that will create its own Websocket.conn and handle reading and writing.

The problem is that every time a new goroutine is created the previous goroutines are blocked and only the last one can continue. I believe this is because the gorilla websocket library is blocking each gorotutine, but I might be mistaken.

I have tried putting a timer in the server list iterator and each goroutine will work perfectly but then the moment a new goroutine is made with another address the previous goroutine is blocked.

The relevant bits of my code:

In my main.go

for _, server := range servers {
  go control(ctx, server, port)
}

In control()


func control(ctx context.Context, server, port string) { 
  url := url.URL{
    Scheme: "ws",
    Host: server + ":" + port,
    Path: "",
  }
  conn, _, err := websocket.DefaultDialer.Dial(url.String(), nil)
  if err != nil {
    panic(err)
  }
  defer conn.Close()
  go sendHandler(ctx, conn)
  go readHandler(ctx, conn)
}

readHandler(ctx context.Context, conn *websocket.Con) {
  for {
    _, p, err := conn.ReadMessage(); if err != nil {
      panic(err)
    }
    select {
      case <-ctx.Done():
        goto TERM
      default:
        // do nothing
    }
  }
  TERM:
  // do termination  
}

sendHandler(ctx context.Context, conn *websocket.Con) {
  for _, msg := range msges {
    err = conn.WriteMessage(websocket.TextMessage, msg)
    if err != nil {
      panic(err)
    }
  }
  <-ctx.Done()
}

I removed the parts where I add waitgroups and other unnecessary pieces of code.

So what I expect is for there to be 3n goroutines running (where n is the number of servers) without blocking but right now I see only 3 goroutines running which are the ones called by the last iteration of the server list.

Thanks!

EDIT 14/06/2019:

I spent some time making a small working example and in the example the bug did not occur - none of the threads blocked each other. I'm still unsure what was causing it but here is my small working example:

main.go

package main

import (
    "context"
    "fmt"
    "os"
    "time"
    "os/signal"
    "syscall"
    "sync"
    "net/url"
    "github.com/gorilla/websocket"
    )

func main() {
    servers := []string{"5555","5556", "5557"}
    comms := make(chan os.Signal, 1)
    signal.Notify(comms, os.Interrupt, syscall.SIGTERM)

    ctx := context.Background()
    ctx, cancel := context.WithCancel(ctx)
    var wg sync.WaitGroup

    for _, server := range servers {
        wg.Add(1)
        go control(server,
                   ctx,
                   &wg)
    }

    <-comms
    cancel()
    wg.Wait()
}

func control(server string, ctx context.Context, wg *sync.WaitGroup) {
    fmt.Printf("Started control for %s
", server)
    url := url.URL {
        Scheme: "ws",
        Host: "0.0.0.0" + ":" + server,
        Path: "",
    }
    conn, _, err := websocket.DefaultDialer.Dial(url.String(), nil)
    if err != nil {
        panic(err)
    }
    defer conn.Close()

    var localwg sync.WaitGroup

    localwg.Add(1)
    go sendHandler(ctx, conn, &localwg, server)
    localwg.Add(1)
    go readHandler(ctx, conn, &localwg, server)

    <- ctx.Done()
    localwg.Wait()
    wg.Done()
    return
}

func sendHandler(ctx context.Context, conn *websocket.Conn, wg *sync.WaitGroup, server string) {
    for i := 0; i < 50; i++ {
        err := conn.WriteMessage(websocket.TextMessage, []byte("ping"))
        if err != nil {
            panic(err)
        }
        fmt.Printf("sent msg to %s
", server)
        time.Sleep(1 * time.Second)
    }
    <- ctx.Done()
    wg.Done()
}

func readHandler(ctx context.Context, conn *websocket.Conn, wg *sync.WaitGroup, server string) {
    for {

        select {

            case <- ctx.Done():
                wg.Done()
                return
            default:
                _, p, err :=  conn.ReadMessage()
                if err != nil {
                    wg.Done()
                    fmt.Println("done")
                }
                fmt.Printf("Got [%s] from %s
", string(p), server)
        }
    }
}

I tested it with dpallot's simple-websocket-server by a server on 5555, 5556 and 5557 respectively.

This part of your code is causing the problem:

conn, _, err := websocket.DefaultDialer.Dial(url.String(), nil)
if err != nil {
    panic(err)
}
defer conn.Close()
go sendHandler(ctx, conn)
go readHandler(ctx, conn)

You create the connection, defer the close of it, start two other goroutines and then end the function. The function end closes the socket due to your defer.