I have a function which receives tasks and puts them into a channel. Every task has ID, some properties and a channel where result will be placed. It looks like this
task.Result = make(chan *TaskResult)
queue <- task
result := <-task.Result
sendReponse(result)
Another goroutine takes a task from the channel, processes it and puts the result into task's channel
task := <-queue
task.Result <- doExpensiveComputation(task)
This code works fine. But now I want to coalesce tasks in the queue
. Task processing is a very expensive operation, so I want process all the tasks in the queue with the same IDs once. I see two ways of doing it.
First one is not to put tasks with the same IDs to the queue, so when existing task arrives it will wait for it's copy to complete. Here is pseudo-code
if newTask in queue {
existing := queue.getById(newTask.ID)
existing.waitForComplete()
sendResponse(existing.ProcessingResult)
} else {
queue.enqueue(newTask)
}
So, I can implement it using go channel and map for random access + some synchronization means like mutex. What I don't like about this way is that I have to carry both map and channel around the code and keep their contents synchronized.
The second way is to put all the tasks into queue, but to extract task and all the tasks with the same IDs from the queue when result arrives, then send result to all the tasks. Here is pseudo-code
someTask := queue.dequeue()
result := doExpensiveComputation(someTask)
someTask.Result <- result
moreTasks := queue.getAllWithID(someTask.ID)
for _,theSameTask := range moreTasks {
theSameTask.Result <- result
}
And I have an idea how to implement this using chan + map + mutex in the same way as above.
And here is the question: is there some builtin/existing data structures which I can use for such a problem? Are there another (better) ways of doing this?
If I understand the problem correctly, the simplest solution that comes into my mind is adding a middle layer between task senders (putting into queue
) and workers (taking from queue
). This, probably routine, would be responsible for storing current tasks (by ID) and broadcasting the results to every matching tasks.
Pseugo code:
go func() {
active := make(map[TaskID][]Task)
for {
select {
case task := <-queue:
tasks := active[task.ID]
// No tasks with such ID, start heavy work
if len(tasks) == 0 {
worker <- task
}
// Save task for the result
active[task.ID] = append(active[task.ID], task)
case r := <-response:
// Broadcast to all tasks
for _, task := range active[r.ID] {
task.Result <- r.Result
}
}
}
}()
No mutexes needed and probably no need to carry anything around either, workers will simply need to put all the results into this middle layer, which is then routing responses correctly. You could even easily add caching here if there's a chance clashing IDs can arrive some time apart.
Edit: I had this dream where the above code caused a deadlock. If you send a lot of requests at once and choke worker
channel there's a serious problem – this middle layer routine is stuck on worker <- task
waiting for a worker to finish, but all the workers will be probably blocked on send to response channel (because our routine can't collect it). Playable proof.
One could think of adding some buffers into the channels but this is not a proper solution (unless you can design the system in such way the buffer will never fill up). There're a few ways of solving this problem; for example, you can run a separate routine for collecting responses, but then you would need to protect active
map with a mutex. Doable. You could also put worker <- task
into a select, which would try to send task to a worker, receive new task (if nothing to send) or collect a response. One could take advantage of the fact that nil channel is never ready for communication (ignored by select), so you can alternate between receiving and sending tasks within a single select. Example:
go func() {
var next Task // received task which needs to be passed to a worker
in := queue // incoming channel (new tasks) -- active
var out chan Task // outgoing channel (to workers) -- inactive
for {
select {
case t := <-in:
next = t // store task, so we can pass to worker
in, out = nil, worker // deactivate incoming channel, activate outgoing
case out <- next:
in, out = queue, nil // deactivate outgoing channel, activate incoming
case r := <-response:
collect <- r
}
}
}()