使用通道同步多个goroutine

I need to start a number of workers with single task queue and single result queue. Each worker should be started in different goroutine. And I need to wait till all workers will be finished and task queue will be empty before exiting from program. I have prepare small example for goroutine synchronization. The main idea was that we count tasks in queue and waiting for all workers to finish jobs. But current implementation sometime miss values. Why this happends and how to solve the problem? The sample code:

import (
    "fmt"
    "os"
    "os/signal"
    "strconv"
)

const num_workers = 5

type workerChannel chan uint64

// Make channel for tasks
var workCh workerChannel
// Make channel for task counter
var cntChannel chan int

// Task counter
var tskCnt int64

// Worker function
func InitWorker(input workerChannel, result chan string, num int) {
    for {
        select {
        case inp := <-input:
            getTask()
            result <- ("Worker " + strconv.Itoa(num) + ":" + strconv.FormatUint(inp, 10))
        }
    }
}

// Function to manage task counter
// should be in uniq goroutine
func taskCounter(inp chan int) {
    for {
        val := <-inp
        tskCnt += int64(val)
    }
}

// Put pask to the queue
func putTask(val uint64) {
    func() {
        fmt.Println("Put ", val)
        cntChannel <- int(1)
        workCh <- val
    }()
}

// Get task from queue
func getTask() {
    func() {
        cntChannel <- int(-1)
    }()
}

func main() {
// Init service channels
    abort := make(chan os.Signal)
    done := make(chan bool)

// init queue for results
    result := make(chan string)

// init task queue
    workCh = make(workerChannel)

// start some workers
    for i := uint(0); i < num_workers; i++ {
        go InitWorker(workCh, result, int(i))
    }

// init counter for synchro
    cntChannel = make(chan int)
    go taskCounter(cntChannel)

// goroutine that put some tasks into queue
    go func() {
        for i := uint(0); i < 21; i++ {
            putTask(uint64(i))
        }

        // wait for processing all tasks and close application
        for len(cntChannel) != 0 {}
        for tskCnt != 0 {}
        for len(workCh) != 0 {}
        for len(result) != 0 {}

        // send signal for close
        done <- true
    }()

    signal.Notify(abort, os.Interrupt)
    for {
        select {
        case <-abort:
            fmt.Println("Aborted.")
            os.Exit(0)

        // print results
        case res := <-result:
            fmt.Println(res)

        case <-done:
            fmt.Println("Done")
            os.Exit(0)
        }
    }
}

Use sync.WaitGroup to wait for goroutines to complete. Close channels to cause loops reading on channels to exit.

package main

import (
    "fmt"
    "sync"
)

type workerChannel chan uint64

const num_workers = 5

func main() {

    results := make(chan string)
    workCh := make(workerChannel)

    // Start workers
    var wg sync.WaitGroup
    wg.Add(num_workers)
    for i := 0; i < num_workers; i++ {
        go func(num int) {
            defer wg.Done()
            // Loop processing work until workCh is closed
            for w := range workCh {
                results <- fmt.Sprintf("worker %d, task %d", num, w)
            }

        }(i)
    }

    // Close result channel when workers are done
    go func() {
        wg.Wait()
        close(results)
    }()

    // Send work to be done
    go func() {
        for i := 0; i < 21; i++ {
            workCh <- uint64(i)
        }
        // Closing the channel causes workers to break out of loop
        close(workCh)
    }()

    // Process results. Loop exits when result channel is closed.
    for r := range results {
        fmt.Println(r)
    }
}

https://play.golang.org/p/ZifpzsP6fNv

I suggest using close(chan) for this kind of tasks.

WaitGroup version.

package main

import (
    "log"
    "sync"
)

func worker(in chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for i := range in {
        log.Println(i)
    }

}

func main() {
    in := make(chan int)
    lc := 25
    maxValue := 30
    wg := sync.WaitGroup{}
    wg.Add(lc)
    for i := 0; i < lc; i++ {
        go worker(in, &wg)
    }

    for c := 0; c <= maxValue; c++ {
        in <- c
    }
    close(in)
    wg.Wait()
}

Channel version

package main

import (
    "log"
    "os"
)

func worker(in chan int, end chan struct{}) {
    defer func() { end <- struct{}{} }()
    for i := range in {
        log.Println(i)
    }

}

func main() {
    in := make(chan int)
    lc := 25
    maxValue := 30
    end := make(chan struct{})
    var fin int
    go func() {
        for {
            <-end
            fin++
            log.Println(`fin`, fin)
            if fin == lc {
                break
            }
        }
        close(end)
        os.Exit(0)
    }()
    for i := 0; i < lc; i++ {
        go worker(in, end)
    }

    for c := 0; c <= maxValue; c++ {
        in <- c
    }
    close(in)
    <-make(chan struct{})
}