关闭Go频道并同步go例程

Im unable to terminate my WaitGroup in go and consequently can't exit the range loop. Can anybody tell me why. Or a better way of limiting the number of go routines while still being able to exit on chan close!

Most examples i have seen relate to a statically typed chan length, but this channel is dynamically resized as a result of other processes.

The print statement ("DONE!") in the example are printed showing that the testValProducer prints the right amount of times but the code never reaches ("--EXIT--") which means wg.Wait is still blocking somehow.

type TestValContainer chan string

func StartFunc(){
testValContainer            := make(TestValContainer)
go func(){testValContainer <- "string val 1"}()
go func(){testValContainer <- "string val 2"}()
go func(){testValContainer <- "string val 3"}()
go func(){testValContainer <- "string val 4"}()
go func(){testValContainer <- "string val 5"}()
go func(){testValContainer <- "string val 6"}()
go func(){testValContainer <- "string val 7"}()
wg  := sync.WaitGroup{}

// limit the number of worker goroutines
for i:=0; i < 3; i++ {
    wg.Add(1)
    go func(){
        v := i
        fmt.Printf("launching %v", i)
        for str := range testValContainer{
            testValProducer(str, &wg)
        }
        fmt.Println(v, "--EXIT --")  // never called
    }()
}

wg.Wait()
close(testValContainer)

}


func get(url string){
    http.Get(url)
    ch <- getUnvisited()
}


func testValProducer(testStr string, wg *sync.WaitGroup){
    doSomething(testStr)
    fmt.Println("done !") // called
    wg.Done() // NO EFFECT??
}

I might do something like this, it keeps everything easy to follow. I define a structure which implements a semaphore to control the number of active Go routines spinning up... and allows me to read from the channel as they come in.

package main

import (
    "fmt"
    "sync"
)

type TestValContainer struct {
    wg   sync.WaitGroup
    sema chan struct{}
    data chan int
}

func doSomething(number int) {
    fmt.Println(number)
}

func main() {
    //semaphore limit 10 routines at time
    tvc := TestValContainer{
        sema: make(chan struct{}, 10),
        data: make(chan int),
    }

    for i := 0; i <= 100; i++ {
        tvc.wg.Add(1)
        go func(i int) {
            tvc.sema <- struct{}{}
            defer func() {
                <-tvc.sema
                tvc.wg.Done()
            }()

            tvc.data <- i
        }(i)
    }
    // wait in the background so that waiting and closing the channel dont
    // block the for loop below
    go func() {
        tvc.wg.Wait()
        close(tvc.data)
    }()
    // get channel results
    for res := range tvc.data {
        doSomething(res)
    }

}

In your example you have two errors:

  1. You are calling wg.Done in side the loop in each worker thread rather than at the end of the worker thread (right before it completes). The calls to wg.Done must be matched one-to-one with wg.Add(1)s.
  2. With that fixed, there is a deadlock where the main thread is waiting for the worker threads to complete, while the worker threads area waiting for the input channel to be closed by the main thread.

The logic will be cleaner and easier to understand if you separate the producer side from the consumer side more clearly. Run a separate goroutine for each side. Example:

// Producer side (only write and close allowed).
go func() {
    testValContainer <- "string val 1"
    testValContainer <- "string val 2"
    testValContainer <- "string val 3"
    testValContainer <- "string val 4"
    testValContainer <- "string val 5"
    testValContainer <- "string val 6"
    testValContainer <- "string val 7"
    close(testValContainer) // Signals that production is done.
}()

// Consumer side (only read allowed).
for i:=0; i < 3; i++ {
    wg.Add(1)
    go func() {
        defer wg.Done()
        v := i
        fmt.Printf("launching %v", i)
        for str := range testValContainer {
            doSomething(str)
        }
        fmt.Println(v, "--EXIT --")
    }()
}
wg.Wait()

If the items are being produced from some other source, potentially a collection of goroutines, you should still have either: 1) a separate goroutine or logic somewhere that oversees that production and calls close once it's done, or 2) make your main thread wait for the production side to complete (e.g. with a WaitGroup waiting for the producer goroutines) and close the channel before waiting for the consumptions side.

If you think about it, no matter how you arrange the logic you need to have some "side-channel" way of detecting, in one single synchronised place, that there are no more messages being produced. Otherwise you can never know when the channel should be closed.

In other words, you can't wait for the range loops on the consumer side to complete to trigger the close, as this leads to a catch 22.