如何有效地将渠道链接在一起?

I'm trying to create message hub in golang. Messages are getting through different channels that persist in map[uint32]chan []float64. I do an endless loop over map and check if a channel has a message. If it has, I write it to the common client's write channel together with an incoming channel's id. It works fine, but uses all CPU, and other processes are throttled.

UPD: Items in map adding and removing dynamically by another function.

I thinking to limit CPU for this app through Docker, but maybe there is more elegant path?

My code :

    func (c *Client) accumHandler() {

    for !c.stop {
        c.channels.Range(func(key, value interface{}) bool {

            select {
            case message := <-value.(chan []float64):
                mess := map[uint32]interface{}{key.(uint32): message}

                select {
                case c.send <- mess:

                }

            default:

            }
            return !c.stop
        })
    }
}

If I'm reading the cards correctly, it seems like you are trying to pass along an array of floats to a common channel along with a channel identifier. I assume that you are doing this to pass multiple channels out to different publishers, but so that you only have to track a single channel for your consumer.

It turns out that you don't need to loop over channels to see when it's outputting a value. You can chain channels together inside of goroutines. For this reason, no busy wait is necessary. Something like this will suit your purposes (again, if I'm reading the cards correctly). Look for the all caps comment for the way around your busy loop. Link to playground.

var num_publishes = 3

func main() {
  num_publishers := 10
  single_consumer := make(chan []float64)

  for i:=0;i<num_publishers;i+=1 {
    c := make(chan []float64)

    // connect channel to your single consumer channel
    go func() { for { single_consumer <- <-c } }() // THIS IS PROBABLY WHAT YOU DIDN'T KNOW ABOUT

    // send the channel to the publisher
    go publisher(c, i*100)
  }

  // dumb consumer example
  for i:=0;i<num_publishers*num_publishes;i+=1 {
    fmt.Println(<-single_consumer)
  }
}

func publisher(c chan []float64, publisher_id int) {
  dummy := []float64{
    float64(publisher_id+1),
    float64(publisher_id+2),
    float64(publisher_id+3),
  }
  for i:=0;i<num_publishes;i+=1 {
    time.Sleep(time.Duration(rand.Intn(10000)) * time.Millisecond)
    c <- dummy
  }
}

It is eating all of your CPU because you are continuously cycling around the dictionary checking for messages, so even when there are no messages to process the CPU, or at least a thread or core, is running flat out. You need blocking sends and receives on the channels!

I assume you are doing this because you don't know how many channels there will be and therefor can't just select on all of the input channels. A better pattern would be to start a separate goroutine for each input channel you are currently storing in the dictionary. Each goroutine should have a loop in which it blocks waiting the input channel and on receiving a message does a blocking send to a channel to the client which is shared by all.

The question isn't complete so can't give exact code, but you're going to have goroutines that look something like this:

type Message struct {
    id uint32,
    message []float64
}

func receiverGoroutine(id uint32, input chan []float64, output chan Message) {
    for {
        message := <- input
        output <- Message{id: id, message: message}
    }
}

func clientGoroutine(c *Client, input chan Message) {
    for {
        message := <- input
        // do stuff
    }
}

(You'll need to add some "done" channels as well though)

Elsewhere you will start them with code like this:

clientChan := make(chan Message)
go clientGoroutine(client, clientChan)

for i:=0; i<max; i++ {
    go receiverGoroutine( (uint32)i, make(chan []float64, clientChan)
}

Or you can just start the client routine and then add the others as they are needed rather than in a loop up front - depends on your use case.