Golang HTTP请求工作者池

I'm trying to build a system, worker pool / jobqueue, to handle as many http requests as possible on each API endpoint. I looked into this example and got it working just fine except that I stumbled upon the problem that I don't understand how to expand the pool / jobqueue to different endpoints.

For scenario sake let's sketch an Golang http server that has a million request / min across different endpoints and request types GET & POST ETC.

How can I expand on this concept? Should I create different worker pools and jobs for each endpoint. Or can I create different jobs and enter them in the same queue and have the same pool handle these?

I want to maintain simplicity where if I create a new API endpoint I don't have to create new worker pools, so I can focus just on the api. But performance is also very much in mind.

The code I'm trying to build on is taken from the example linked earlier, here is a github 'gist' of somebody else with this code.

One thing up front: If you are running an HTTP server (Go's standard server anyway), you cannot control the number of goroutines without stopping and restarting the server. Each request starts at least one goroutine, and there's nothing you can do about that. The good news is that this is usually not a problem, since goroutines are so lightweight. However, it is perfectly reasonable that you want to keep the number of goroutines that are doing hard work under control.

You can put any value into a channel, including functions. So if the goal is to only having to write code in http handlers, let the jobs be closures -- the workers don't know (or care) what they are working on.

package main

import (
    "encoding/json"
    "io/ioutil"
    "net/http"
)

var largePool chan func()
var smallPool chan func()

func main() {
    // Start two different sized worker pools (e.g., for different workloads).
    // Cancelation and graceful shutdown omited for brevity.

    largePool = make(chan func(), 100)
    smallPool = make(chan func(), 10)

    for i := 0; i < 100; i++ {
            go func() {
                    for f := range largePool {
                            f()
                    }
            }()
    }

    for i := 0; i < 10; i++ {
            go func() {
                    for f := range smallPool {
                            f()
                    }
            }()
    }

    http.HandleFunc("/endpoint-1", handler1)
    http.HandleFunc("/endpoint-2", handler2) // naming things is hard, okay?

    http.ListenAndServe(":8080", nil)
}

func handler1(w http.ResponseWriter, r *http.Request) {
    // Imagine a JSON body containing a URL that we are expected to fetch.
    // Light work that doesn't consume many of *our* resources and can be done
    // in bulk, so we put in in the large pool.
    var job struct{ URL string }

    if err := json.NewDecoder(r.Body).Decode(&job); err != nil {
            http.Error(w, err.Error(), http.StatusBadRequest)
            return
    }

    go func() {
            largePool <- func() {
                    http.Get(job.URL)
                    // Do something with the response
            }
    }()

    w.WriteHeader(http.StatusAccepted)
}

func handler2(w http.ResponseWriter, r *http.Request) {
    // The request body is an image that we want to do some fancy processing
    // on. That's hard work; we don't want to do too many of them at once, so
    // so we put those jobs in the small pool.

    b, err := ioutil.ReadAll(r.Body)
    if err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
    }

    go func() {
            smallPool <- func() {
                    processImage(b)
            }
    }()
    w.WriteHeader(http.StatusAccepted)
}

func processImage(b []byte) {}

This is a very simple example to get the point across. It doesn't matter much how you setup your worker pools. You just need a clever job definition. In the example above it is a closure, but you could also define a Job interface, for instance.

type Job interface {
    Do()
}

var largePool chan Job
var smallPool chan Job

Now, I wouldn't call the whole worker pool approach "simple". You said your goal is to limit the number of goroutines (that are doing work). That doesn't require workers at all; it only needs a limiter. Here is the same example as above, but using channels as semaphores to limit concurrency.

package main

import (
    "encoding/json"
    "io/ioutil"
    "net/http"
)

var largePool chan struct{}
var smallPool chan struct{}

func main() {
    largePool = make(chan struct{}, 100)
    smallPool = make(chan struct{}, 10)

    http.HandleFunc("/endpoint-1", handler1)
    http.HandleFunc("/endpoint-2", handler2)

    http.ListenAndServe(":8080", nil)
}

func handler1(w http.ResponseWriter, r *http.Request) {
    var job struct{ URL string }

    if err := json.NewDecoder(r.Body).Decode(&job); err != nil {
            http.Error(w, err.Error(), http.StatusBadRequest)
            return
    }

    go func() {
            // Block until there are fewer than cap(largePool) light-work
            // goroutines running.
            largePool <- struct{}{}
            defer func() { <-largePool }() // Let everyone that we are done

            http.Get(job.URL)
    }()

    w.WriteHeader(http.StatusAccepted)
}

func handler2(w http.ResponseWriter, r *http.Request) {
    b, err := ioutil.ReadAll(r.Body)
    if err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
    }

    go func() {
            // Block until there are fewer than cap(smallPool) hard-work
            // goroutines running.
            smallPool <- struct{}{}
            defer func() { <-smallPool }() // Let everyone that we are done

            processImage(b)
    }()

    w.WriteHeader(http.StatusAccepted)
}

func processImage(b []byte) {}

It's not clear why you need worker pool at all? Would not be goroutines be enough?

If you are limited by resources you can consider implementing rates limiting. If not why simply not to span go routines as needed?

The best way to learn is to study how others do good stuff.

Have a look at https://github.com/valyala/fasthttp

Fast HTTP package for Go. Tuned for high performance. Zero memory allocations in hot paths. Up to 10x faster than net/http.

They are claiming:

serving up to 200K rps from more than 1.5M concurrent keep-alive connections per physical server

That is quite impressive and I doubt you can do any better with pool / jobqueue.

As answered before in your server each request handler will run in at least one goroutine.

But you still can use a worker-pool for back-end parallel tasks if necessary. For example let's suppose some of your Http Handler functions triggers calls to other external apis and "aggregates" their results together, so the order of calls does not matter in this case, this is one scenario where you can leverage a worker pool and distribute your work in order to have them run in parallel dispatching each task to a worker goroutine:

Sample code snippet:

    // build empty response
    capacity := config.GetIntProperty("defaultListCapacity")
    list := model.NewResponseList(make([]model.Response, 0, capacity), 1, 1, 0)

    // search providers
    providers := getProvidersByCountry(country)

    // create a slice of jobResult outputs
    jobOutputs := make([]<-chan job.JobResult, 0)

    // distribute work
    for i := 0; i < len(providers); i++ {
        job := search(providers[i], m)
        if job != nil {
            jobOutputs = append(jobOutputs, job.ReturnChannel)
            // Push each job onto the queue.
            GetInstance().JobQueue <- *job
        }
    }

    // Consume the merged output from all jobs
    out := job.Merge(jobOutputs...)
    for r := range out {
        if r.Error == nil {
            mergeSearchResponse(list, r.Value.(*model.ResponseList))
        }
    }
    return list

. complete example of worker pool running "generic" tasks asynchronously: https://github.com/guilhebl/go-offer/blob/master/offer/repo.go

. worker pool lib used: https://github.com/guilhebl/go-worker-pool