This is doing my head in, I cant figure out how to solve it;
The code in the answer below (see url) works brilliantly to process the tasks, but the workers will die once that tasks list is empty, I want them to stay alive and somehow notify the main code that they are out of work so I can fetch more jobs to fill the tasks list with tasks
How would you define a pool of goroutines to be executed at once in Golang?
Using user:Jsor example code from below, I try to create a simple program, but I am confused.
import (
"fmt"
"strconv"
)
//workChan - read only that delivers work
//requestChan - ??? what is this
func Worker(myid string, workChan <- chan string, requestChan chan<- struct{}) {
for {
select {
case work := <-workChan:
fmt.Println("Channel: " + myid + " do some work: " + work)
case requestChan <- struct{}{}:
//hm? how is the requestChan used?
}
}
}
func Logic(){
workChan := make(chan string)
requestChan := make(chan struct{})
//Create the workers
for i:=1; i < 5; i++ {
Worker( strconv.Itoa( i), workChan, requestChan)
}
//Give the workers some work
for i:=100; i < 115; i++ {
workChan<- "workid"+strconv.Itoa( i)
}
}
This is what the select
statement is for.
func Worker(workChan chan<- Work, requestChan chan<- struct{}) {
for {
select {
case work := <-workChan:
// Do work
case requestChan <- struct{}{}:
}
}
}
This worker will run forever and ever. If work is available, it will pull it from the worker channel. If there's nothing left it will send a request.
Not that since it runs forever and ever, if you want to be able to kill a worker you need to do something else. One possibility is to always check ok
with workChan and if that channel is closed quit the function. Another option is to use an individual quit channel for each worker.
Compared to the other solution you posted, you just need (first) not to close the channel, and just keep feeding items to it.
Then you need to answer the following question: is it absolutely necessary that (a) you fetch the next X items from your queue only once one of the workers has “nothing more to do” (or, what is the same, once the first X items are either fully processed, or assigned to a worker); or (b) is it okay if you keep the second set of X items in memory, and go feeding them to the workers as new work items are needed?
As I understand it, only (a) needs the requestChan you’re wondering about (see below). For (b), something as simple as the following would suffice:
# B version
type WorkItem int
const (
N = 5 // Number of workers
X = 15 // Number of work items to get from the infinite queue at once
)
func Worker(id int, workChan <-chan WorkItem) {
for {
item := <-workChan
doWork(item)
fmt.Printf("Worker %d processes item #%v
", id, item)
}
}
func Dispatch(workChan chan<- WorkItem) {
for {
items := GetNextFromQueue(X)
for _, item := range items {
workChan <- item
fmt.Printf("Dispatched item #%v
", item)
}
}
}
func main() {
workChan := make(chan WorkItem) // Shared amongst all workers; could make it buffered if GetNextFromQueue() is slow.
// Start N workers.
for i := 0; i < N; i++ {
go Worker(i, workChan)
}
// Dispatch items to the workers.
go Dispatch(workChan)
time.Sleep(20 * time.Second) // Ensure main(), and our program, finish.
}
(I’ve uploaded to the Playground a full working solution for (b).)
As for (a), the workers change to say: do work, or if there’s no more work, tell the dispatcher to get more via the reqChan communication channel. That “or” is implemented via select. Then, the dispatcher waits on reqChan before making another call to GetNextFromQueue()
. It’s more code, but ensures the semantics that you might be interested in. (The previous version is overall simpler, though.)
# A version
func Worker(id int, workChan <-chan WorkItem, reqChan chan<- int) {
for {
select {
case item := <-workChan:
doWork(item)
fmt.Printf("Worker %d processes item #%v
", id, item)
case reqChan <- id:
fmt.Printf("Worker %d thinks they requested more work
", id)
}
}
}
func Dispatch(workChan chan<- WorkItem, reqChan <-chan int) {
for {
items := GetNextFromQueue(X)
for _, item := range items {
workChan <- item
fmt.Printf("Dispatched item #%v
", item)
}
id := <-reqChan
fmt.Printf("Polling the queue in Dispatch() at the request of worker %d
", id)
}
}
(I’ve also uploaded to the Playground a full working solution for (a).)