频道过早终止

I am prototyping a series of go routines for a pipeline that each perform a transformation. The routines are terminating before all the data has passed through.

I have checked Donavan and Kernighan book and Googled for solutions.

Here is my code:

package main

import (
    "fmt"
    "sync"
)

func main() {
    a1 := []string{"apple", "apricot"}

    chan1 := make(chan string)
    chan2 := make(chan string)
    chan3 := make(chan string)

    var wg sync.WaitGroup

    go Pipe1(chan2, chan1, &wg)
    go Pipe2(chan3, chan2, &wg)
    go Pipe3(chan3, &wg)

    func (data []string) {
        defer wg.Done()
        for _, s := range data {
            wg.Add(1)
            chan1 <- s
        }
        go func() {
            wg.Wait()
            close(chan1)
        }()
    }(a1)
}

func Pipe1(out chan<- string, in <-chan string, wg *sync.WaitGroup) {
    defer wg.Done()
    for s := range in {
        wg.Add(1)
        out <- s + "s are"
    }
}
func Pipe2(out chan<- string, in <-chan string, wg *sync.WaitGroup) {
    defer wg.Done()
    for s := range in {
        wg.Add(1)
        out <- s + " good for you"
    }
}
func Pipe3(in <-chan string, wg *sync.WaitGroup) {
    defer wg.Done()
    for s := range in {
        wg.Add(1)
        fmt.Println(s)
    }
}


My expected output is:

apples are good for you
apricots are good for you

The results of running main are inconsistent. Sometimes I get both lines. Sometimes I just get the apples. Sometimes nothing is output.

As Adrian already pointed out, your WaitGroup.Add and WaitGroup.Done calls are mismatched. However, in cases like this the "I am done" signal is typically given by closing the output channel. WaitGroups are only necessary if work is shared between several goroutines (i.e. several goroutines consume the same channel), which isn't the case here.

package main

import (
    "fmt"
)

func main() {
    a1 := []string{"apple", "apricot"}

    chan1 := make(chan string)
    chan2 := make(chan string)
    chan3 := make(chan string)

    go func() {
        for _, s := range a1 {
            chan1 <- s
        }

        close(chan1)
    }()

    go Pipe1(chan2, chan1)
    go Pipe2(chan3, chan2)

    // This range loop terminates when chan3 is closed, which Pipe2 does after
    // chan2 is closed, which Pipe1 does after chan1 is closed, which the
    // anonymous goroutine above does after it sent all values.
    for s := range chan3 {
        fmt.Println(s)
    }
}

func Pipe1(out chan<- string, in <-chan string) {
    for s := range in {
        out <- s + "s are"
    }

    close(out) // let caller know that we're done
}

func Pipe2(out chan<- string, in <-chan string) {
    for s := range in {
        out <- s + " good for you"
    }

    close(out) // let caller know that we're done
}

Try it on the playground: https://play.golang.org/p/d2J4APjs_lL

You're calling wg.Wait in a goroutine, so main is allowed to return (and therefore your program exits) before the other routines have finished. This would cause the behavior your see, but taking out of a goroutine alone isn't enough.

You're also misusing the WaitGroup in general; your Add and Done calls don't relate to one another, and you don't have as many Dones as you have Adds, so the WaitGroup will never finish. If you're calling Add in a loop, then every loop iteration must also result in a Done call; as you have it now, you defer wg.Done() before each of your loops, then call Add inside the loop, resulting in one Done and many Adds. This code would need to be significantly revised to work as intended.