退出工作池中的通道

What I eventually want to accomplish is to dynamically scale my workers up OR down, depending on the workload.

The code below successfully parses data when a Task is coming through w.Channel

func (s *Storage) StartWorker(w *app.Worker) {

    go func() {

        for {

            w.Pool <- w.Channel // register current worker to the worker pool

            select {

            case task := <-w.Channel: // received a work request, do some work

                time.Sleep(task.Delay)

                fmt.Println(w.WorkerID, "processing task:", task.TaskName)

                w.Results <- s.ProcessTask(w, &task)

            case <-w.Quit:
                fmt.Println("Closing channel for", w.WorkerID)
                return
            }

        }
    }()

}

The blocking point here is the line below.

w.Pool <- w.Channel

In that sense, if I try to stop a worker(s) in any part of my program with:

w.Quit <- true

the case <-w.Quit: is blocked and never receives until there's another incoming Task on w.Channel (and I guess select statement here is random for each case selection).

So how can I stop a channel(worker) independently?

See below sample code, it declares a fanout function that is reponsible to size up/down the workers.

It works by using timeouts to detect that new workers has ended or are required to spawn.

there is an inner loop to ensure that each item is processed before moving on, blocking the source when it is needed.

package main

import (
    "fmt"
    "io"
    "log"
    "net"
    "os"
)

func main() {
    input := make(chan string)
    fanout(input)
}

func fanout() {
    workers := 0
    distribute := make(chan string)
    workerEnd := make(chan bool)
    for i := range input {
        done := false 
        for done {
            select {
            case distribute<-i:
                done = true
            case <-workerEnd:
                workers--
            default:
                if workers <10 {
                    workers++
                    go func(){
                        work(distribute)
                        workerEnd<-true
                    }()
                }
            }   
        }
    }
}

func work(input chan string) {
    for  {
        select {
        case i := <-input:
            <-time.After(time.Millisecond)
        case <-time.After(time.Second):
            return
        }

    }
}