Golang:新手-主从并发

I'm having an issue trying to implement this (all goroutines asleep - deadlock!) Here's the gist of the code:

var workers = runtime.NumCPU()

func main() {
    jobs := make(chan *myStruct, workers)
    done := make(chan *myStruct, workers)

    go produceWork(file_with_jobs, jobs)
    for i := 0; i < runtime.NumCPU(); i++ {
        go Worker(jobs, done)
    }
    consumeWork(done)
}

func produceWork(vf string, jobs chan *utils.DigSigEntries) {
    defer close(jobs)

    // load file with jobs
    file, err := ini.LoadFile(vf)

    // get data for processing
    for data, _ := range file {
        // ...
        jobs <- &myStruct{data1, data2, data3, false}
    }
}

func Worker(in, out chan *myStruct) {
    for {
        item, open := <-in
        if !open {
            break
        }

        process(item)
        out <- item
    }
    // close(out)   --> tried closing the out channel, but then not all items are processed
    //                  though no panics occur.
}

func process(item *myStruct) {
    //...modify the item
    item.status = true
}

func consumeWork(done chan *myStruct) {
    for val := range done {
        if !val.status {
            fmt.Println(val)
        }
    }
}

I'm mainly trying to understand how to do this without using the sync/Wait stuff - just pure channels - is this possible? The goal of this routine is to have a single producer load up items that are processed by N workers - appreciate any pointers / help.

You can, as siritinga suggested, use a third signalling or counter channel, eg signal chan boolean, where the produceWork goroutine would add a value before each job entered into the jobs channel. So, an equal count of values will be passed to signal as to jobs:

func produceWork(vf string, jobs chan *utils.DigSigEntries, signal chan boolean) {
    defer close(jobs)

    // load file with jobs
    file, err := ini.LoadFile(vf)

    // get data for processing
    for data, _ := range file {
        // ...
        signal <- true
        jobs <- &myStruct{data1, data2, data3, false}
    }

    close(signal)
}

The consume would then start by reading from the signal channel. If there is a value, it can be certain there will be a value to read from the out channel (once a worker has passed it on). If the signal is closed, then all is done. We can close the remaining done channel:

func consumeWork(done chan *myStruct, signal chan boolean) {
    for _ := range signal {
        val <- done
        if !val.status {
            fmt.Println(val)
        }
    }

    close(done)
} 

While this is possible, I would not really recommend it. It doesn't make the code more clear than when using a sync.WaitGroup. After all, the signal channel would basically only work as a counter. A WaitGroup would have the same purpose and would do it cheaper.

But your question was not about how to solve the problem, but rather if it was possible to do it with pure channels.

Sorry, i didn't notice that you wanted to skip /sync :/ I'll leave the answer, maybe someone is looking for this.

import (
    "sync"
)


func main() {
    jobs := make(chan *myStruct, workers)
    done := make(chan *myStruct, workers)

    var workerWg sync.WaitGroup   // waitGroup for workers
    var consumeWg sync.WaitGroup  // waitGroup for consumer
    consumeWg.Add(1) // add one active Consumer

    for i := 0; i < runtime.NumCPU(); i++ {
        go Worker(&workerWg, jobs, done)
        workerWg.Add(1)
    }
    go consumeWork(&consumeWg, done)

    produceWork(file_with_jobs, jobs)

    close(jobs)
    workerWg.Wait()

    close(done)
    consumeWg.Wait()

}

func produceWork(vf string, jobs chan *utils.DigSigEntries) {

    // load file with jobs
    file, err := ini.LoadFile(vf)

    // get data for processing
    for data, _ := range file {
        // ...
        jobs <- &myStruct{data1, data2, data3, false}
    }
}


func Worker(wg *sync.WaitGroup, done chan *myStruct) {

    defer wg.Done()
    for job := range jobs {

        result := process(job)
        out <- result
    }

   // close(out)   --> tried closing the out channel, but then not all items are processed
   //                  though no panics occur.
}

func process(item *myStruct) {
    //...modify the item
    item.status = true
}

func consumeWork(wg *sync.WaitGroup, done chan *myStruct) {
    defer wg.Done()
    for val := range done {
        if !val.status {
            fmt.Println(val)
        }
    }
}