每个唯一ID最多运行一个并发线程的算法

I have a Go web application which needs to execute a given section of code in only one goroutine per unique ID. The scenario is that I have requests that are coming with various IDs that represent a sort of transaction. A certain subset of the operations on these needs to be guaranteed to be run only "one at at time" for a given ID (and other competing requests should block until the prior one working on/for that ID is done).

I can think of a few ways to do this but the book keeping seems tricky - need to keep a global mutex to lock access to a map of what concurrent requests are happening and then use a mutex or a counter from there, and then make sure it doesn't deadlock, and then garbage collect (or carefully reference count) old request entries. I can do this, but sounds error prone.

Is there a pattern or something in the standard library that can be easily used to good effect in this case? Didn't see anything obvious.

EDIT: One thing I think was confusing in my explanation above is the use of the word "transaction". In my case each of these does not need an explicit close - it's just an identifier to associate multiple operations with. Since I don't have an explicit "close" or "end" concept to these, I might receive 3 requests within the same second and each operation takes 2 seconds - and I need to serialize those because running them concurrently will wreak havoc; but then I might get a request a week later with that same ID and it would be referring to the same set of operations (the ID is just the PK on a table in a database).

You've got a good start with the locked global map. You can have a worker per "transaction" and handlers send requests to them over channels, using a locked map to keep track of the channels. Workers can close transactions when they receive a special request. You don't want dangling transactions to become a problem, so you should probably arrange to get an artificial close request sent after a timeout.

That isn't the only way, though it might be convenient. If you only need make certain requests wait while their transaction is being worked on elsewhere, there is probably a construction with a map of *sync.Mutexes, rather than channels talking to worker goroutines, that has better resource use. (There's now code for that approach, more or less, in bgp's answer.)

An example of the channel approach is below; besides serializing work within each transaction, it demonstrates how you might do graceful shutdown with close and a sync.WaitGroup for a setup like this, and timeouts. It's on the Playground.

package main

import (
    "fmt"
    "log"
    "sync"
    "time"
)

// Req represents a request. In real use, if there are many kinds of requests, it might be or contain an interface value that can point to one of several different concrete structs.
type Req struct {
    id      int
    payload string // just for demo
    // ...
}

// Worker represents worker state.
type Worker struct {
    id   int
    reqs chan *Req
    // ...
}

var tasks = map[int]chan *Req{}
var tasksLock sync.Mutex

const TimeoutDuration = 100 * time.Millisecond // to demonstrate; in reality higher

// for graceful shutdown, you probably want to be able to wait on all workers to exit
var tasksWg sync.WaitGroup

func (w *Worker) Work() {
    defer func() {
        tasksLock.Lock()
        delete(tasks, w.id)
        if r := recover(); r != nil {
            log.Println("worker panic (continuing):", r)
        }
        tasksLock.Unlock()
        tasksWg.Done()
    }()
    for req := range w.reqs {
        // ...do work...
        fmt.Println("worker", w.id, "handling request", req)
        if req.payload == "close" {
            fmt.Println("worker", w.id, "quitting because of a close req")
            return
        }
    }
    fmt.Println("worker", w.id, "quitting since its channel was closed")
}

// Handle dispatches the Request to a Worker, creating one if needed.
func (r *Req) Handle() {
    tasksLock.Lock()
    defer tasksLock.Unlock()
    id := r.id
    reqs := tasks[id]
    if reqs == nil {
        // making a buffered channel here would let you queue up
        // n tasks for a given ID before the the Handle() call
        // blocks
        reqs = make(chan *Req)
        tasks[id] = reqs
        w := &Worker{
            id:   id,
            reqs: reqs,
        }
        tasksWg.Add(1)
        go w.Work()
        time.AfterFunc(TimeoutDuration, func() {
            tasksLock.Lock()
            if reqs := tasks[id]; reqs != nil {
                close(reqs)
                delete(tasks, id)
            }
            tasksLock.Unlock()
        })
    }
    // you could close(reqs) if you get a request that means
    // 'end the transaction' with no further info. I'm only
    // using close for graceful shutdown, though.
    reqs <- r
}

// Shutdown asks the workers to shut down and waits.
func Shutdown() {
    tasksLock.Lock()
    for id, w := range tasks {
        close(w)
        // delete so timers, etc. won't see a ghost of a task
        delete(tasks, id)
    }
    // must unlock b/c workers can't finish shutdown
    // until they can remove themselves from maps
    tasksLock.Unlock()
    tasksWg.Wait()
}

func main() {
    fmt.Println("Hello, playground")
    reqs := []*Req{
        {id: 1, payload: "foo"},
        {id: 2, payload: "bar"},
        {id: 1, payload: "baz"},
        {id: 1, payload: "close"},
        // worker 2 will get closed because of timeout
    }
    for _, r := range reqs {
        r.Handle()
    }
    time.Sleep(75*time.Millisecond)
    r := &Req{id: 3, payload: "quux"}
    r.Handle()
    fmt.Println("worker 2 should get closed by timeout")
    time.Sleep(75*time.Millisecond)
    fmt.Println("worker 3 should get closed by shutdown")
    Shutdown()
}

need to keep a global mutex to lock access to a map of what concurrent requests are happening and then use a mutex or a counter from there, and then make sure it doesn't deadlock, and then garbage collect (or carefully reference count) old request entries

That seems overly complicated. Here is how I would do it:

  • All map stuff should be handled by one thread (your dispatcher) so you don't have to deal with locking. This assumes that the work time is much greater than the dispatch time. The dispatcher tracks a channel and a counter per ID (in a map, obviously).
  • The only complication is how to handle the race of "goroutine thinks it's done working on ID" vs "dispatcher just found more work". The answer is that the worker requests to be cleaned up, but the dispatcher decides if the cleanup request is possible or not.

So here is how the code would work:

1) The dispatch process reads from a single input channel. It gets two types of requests: "New Work" (from the outside), and "Done Work" (from the worker). Both requests include an ID.

2) Dispatcher gets a "New Work" message: Lookup in the map by ID. If you find a channel + a count, send the work down the channel and increment the count. (*) If you find nothing, create a new channel + count in the map, send the work down the channel (also increment count), then create a worker (go-routine) reading on that channel.

3) The worker goroutine will obviously pull that "New Work" from the channel and do the work. When done, it will send a "Done Work" request to the Dispatcher.

4) Dispatcher gets a "Done Work" message. Lookup in the map and find the channel + counter. Decrement the counter. If it's zero, send a "Quit" message to the worker, and delete the entry in the map.

5) If the worker goroutine gets a "Quit" message (instead of a work message), it simply exits. (Note that there is tiny a race where a 2nd worker on that ID could be created while the old one is exiting. But the old one will only be processing the quit message, so it doesn't matter. The old worker will clean himself up, including the old channel.)

If your requests are slow enough, there will only be one entry in the map at a time. The other extreme is if your requests to the same ID are fast enough, the channel for that ID will stay be active (just the counter will go up and down).

(*) NOTE: If you make your channels 5 deep, and 6 messages get queued up, the dispatcher will stall. I think you can expand the channel depth in that case, but I'm not sure.

After reviewing the other two answers (each of which had good points - much appreciated), I came up with the following. Relatively simple - a map of Mutexes and use another Mutex to sync access to it:

Playground is here: https://play.golang.org/p/r1lzHml2QJ

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type RequestHandler struct {
    locker             sync.Mutex
    concurrentRequests map[string]*sync.Mutex
}

func (h *RequestHandler) DoSomething(id string) {

    // obtain a lock
    var idLock *sync.Mutex
    needlock := true
    for needlock {

        h.locker.Lock()
        if h.concurrentRequests == nil {
            h.concurrentRequests = make(map[string]*sync.Mutex)
        }

        idLock = h.concurrentRequests[id]

        // this is the only way to "acquire a lock": to be the one creating it - otherwise if there already
        // is a lock we just block until it unlocks and then try again, assuming it will be removed from the
        // map any moment and we'll get back to this line again and get to create it
        if idLock == nil {
            // in this case we are creating the lock, we can safely lock it here without risk of deadlock
            idLock = &sync.Mutex{}
            h.concurrentRequests[id] = idLock
            needlock = false
            idLock.Lock()
            h.locker.Unlock()
        } else {
            h.locker.Unlock()
            idLock.Lock()
            idLock.Unlock()
        }

        // then try again if we didn't get a lock
    }

    defer func() {
        // sync access to map
        h.locker.Lock()
        h.concurrentRequests[id] = nil // remove this lock from the set
        h.locker.Unlock()

        // we're done, let the next guy access this id
        idLock.Unlock()
    }()

    // DO THE WORK HERE

    fmt.Println(fmt.Sprintf("Starting work on id '%s'...", id))

    // hard at work...
    time.Sleep(time.Duration((rand.Int() % 50)) * time.Millisecond)

    fmt.Println(fmt.Sprintf("Ending work on id   '%s'...", id))

}

func main() {

    reqHandler := &RequestHandler{}

    for i := 0; i < 1000; i++ {
        go reqHandler.DoSomething(fmt.Sprintf("%v", rand.Int()%20))
        time.Sleep(5 * time.Millisecond)
    }

}