进行队列处理并重试失败

We have a bunch of files to be uploaded to remote blob store after processing.

Currently, the frontend (PHP) creates a redis list of such files and gives it a unique ID, called JobID. It then passes the unique ID to a beanstalk tube, which is received by a Go process. It uses a library called Go workers to process each job ID in the fashion of what net/http does. It receives the job ID, retrieves the redis list and starts processing files.

However, currently only one file is processed at a time. Since the operation here is I/O bound, not CPU bound, intuition suggests that it would be benefitial to use a goroutine per file.

However, we want to retry uploading on failure, as well as track the number of items processed per job. We cannot start a unbound number of goroutines because a single Job can contain about ~10k files to process and 100s of such Jobs can be sent per second during peak times. What would be the correct approach for this?

NB: We can change the technology stack a bit if needed (such as swapping out beanstalkd for something)

You can limit the number of goroutines by using a buffered chan with a size of the maximum number of goroutines you want. You can then block on this chan if it reaches maximum capacity. As your goroutines finish, they will free up slots to allow new goroutines to run.

Example:

package main

import (
    "fmt"
    "sync"
)

var (
    concurrent    = 5
    semaphoreChan = make(chan struct{}, concurrent)
)

func doWork(wg *sync.WaitGroup, item int) {
    // block while full
    semaphoreChan <- struct{}{}

    go func() {
        defer func() {
            // read to release a slot
            <-semaphoreChan
            wg.Done()
        }()
        // This is where your work actually gets done
        fmt.Println(item)
    }()
}

func main() {
    // we need this for the example so that we can block until all goroutines finish
    var wg sync.WaitGroup
    wg.Add(10)

    // start the work
    for i := 0; i < 10; i++ {
        doWork(&wg, i)
    }

    // block until all work is done
    wg.Wait()
}

Go Playground link: https://play.golang.org/p/jDMYuCe7HV

Inspired by this Golang UK Conference talk: https://youtu.be/yeetIgNeIkc?t=1413