What I eventually want to accomplish is to dynamically scale my workers up OR down, depending on the workload.
The code below successfully parses data when a Task is coming through w.Channel
func (s *Storage) StartWorker(w *app.Worker) {
go func() {
for {
w.Pool <- w.Channel // register current worker to the worker pool
select {
case task := <-w.Channel: // received a work request, do some work
time.Sleep(task.Delay)
fmt.Println(w.WorkerID, "processing task:", task.TaskName)
w.Results <- s.ProcessTask(w, &task)
case <-w.Quit:
fmt.Println("Closing channel for", w.WorkerID)
return
}
}
}()
}
The blocking point here is the line below.
w.Pool <- w.Channel
In that sense, if I try to stop a worker(s) in any part of my program with:
w.Quit <- true
the case <-w.Quit:
is blocked and never receives until there's another incoming Task on w.Channel (and I guess select statement here is random for each case selection).
So how can I stop a channel(worker) independently?
See below sample code, it declares a fanout function that is reponsible to size up/down the workers.
It works by using timeouts to detect that new workers has ended or are required to spawn.
there is an inner loop to ensure that each item is processed before moving on, blocking the source when it is needed.
package main
import (
"fmt"
"io"
"log"
"net"
"os"
)
func main() {
input := make(chan string)
fanout(input)
}
func fanout() {
workers := 0
distribute := make(chan string)
workerEnd := make(chan bool)
for i := range input {
done := false
for done {
select {
case distribute<-i:
done = true
case <-workerEnd:
workers--
default:
if workers <10 {
workers++
go func(){
work(distribute)
workerEnd<-true
}()
}
}
}
}
}
func work(input chan string) {
for {
select {
case i := <-input:
<-time.After(time.Millisecond)
case <-time.After(time.Second):
return
}
}
}