RabbitMQ多工模式

I'm trying to find a good method to consume asynchronously from an input queue, process the content using several workers and then publish to an output queue. So far I've tried a number of examples, most recently using the code from here and here as inspiration.

My current code doesn't appear to be doing what it should be however, increasing the number of workers doesn't increase performance (msg/s consumed or published) and the number of goroutines remains fairly static whilst running.

main:

func main() {
    maxWorkers := 10

    // channel for jobs
    in := make(chan []byte)
    out := make(chan []byte)

    // start workers
    wg := &sync.WaitGroup{}
    wg.Add(maxWorkers)
    for i := 1; i <= maxWorkers; i++ {
        log.Println(i)
        defer wg.Done()
        go processor(in, out)
    }

    // add jobs
    go collector(in)
    go sender(out)

    // wait for workers to complete
    wg.Wait()
}

The collector is basically the example from the RabbitMQ site with a goroutine that collects messages from the queue and places them on the 'in' channel:

forever := make(chan bool)
go func() {
    for d := range msgs {
        in <- d.Body
        d.Ack(false)
    }
}()
log.Printf("[*] Waiting for messages. To exit press CTRL+C")
<-forever

The processor receives an 'in' and 'out' channel, unmarshals JSON, performs a series of regexes and then places the output into the 'out' channel:

func processor(in chan []byte, out chan []byte) {

    var (
    // list of regexes declared here
    )

    for {
        body := <-in

        jsonIn := &Data{}
        err := json.Unmarshal(body, jsonIn)
        if err != nil {
            log.Fatalln("Failed to decode:", err)
        }

        content := jsonIn.Content

        //process regexes using:
        //jsonIn.a = r1.FindAllString(content, -1)

        jsonOut, _ := json.Marshal(jsonIn)

        out <- jsonOut
    }
}

And finally the sender is simply the code from the RabbitMQ site, setting up a connection, reading from the 'out' channel and then publishing to a RMQ queue:

for {
    jsonOut := <-out

    err = ch.Publish(
        "",     // exchange
        q.Name, // routing key
        false,  // mandatory
        false,
        amqp.Publishing{
            DeliveryMode: amqp.Persistent,
            ContentType:  "text/json",
            Body:         []byte(jsonOut),
        })
    failOnError(err, "Failed to publish a message")

}

This is a pattern that I'll be using quite a lot, so I'm spending a lot of time trying to find something that works correctly (and well) - any advice or help would be appreciated (and in case it isn't obvious, I'm new to Go).

There are a couple of things that jump out:

Done within main function

wg.Add(maxWorkers)
for i := 1; i <= maxWorkers; i++ {
    log.Println(i)
    defer wg.Done()
    go processor(in, out)
}

The defer here is executed when main returns so it's not actually indicating when processing is complete. I don't think this'll have an effect on the performance profile of your program though.

To address this you could pass in wg *sync.WaitGroup to your processor so your processor can indicate when it's done.

CPU Bound Processing

Parsing messages and performing Regex is a cpu intensive workload. How many cores is your machine? How is throughput affected if you run your program on two separate machines, does throughput 2x? What if you double your amount of cores? What about running your program with 1 worker vs 2 processor workers? does that double throughput? Are you maxing out your rabbitmq local instance? is it the bottleneck??

Setting up benchmarking and load testing harnesses should allow you to setup experiments to see where your bottle necks are :)

For queue based services it's pretty easy to setup a test harness to fill rabbitmq with a set backlog and benchmark how fast you can process those messages, or to setup a load generator to send x messages/second to rabbitmq and observe if you can keep up.

Does rabbitmq have good visibility into message processing throughput? If not I frequently add a counter to go code and then log the overall averaged throughput on an interval to get a rough idea of performance:

start := time.Now()
updateInterval := time.Tick(1 * time.Second)
numIn := 0
for {
    select {
    case <-updateInterval:
        log.Infof("IN - Count: %d", numIn)
        log.Infof("IN - Througput: %.0f events/second",
            float64(numIn)/(time.Now().Sub(start)).Seconds())
    case e := <-msgs:
        numIn++
        in <- d.Body
        d.Ack(false)
    }
}