I'm having an issue trying to implement this (all goroutines asleep - deadlock!) Here's the gist of the code:
var workers = runtime.NumCPU()
func main() {
jobs := make(chan *myStruct, workers)
done := make(chan *myStruct, workers)
go produceWork(file_with_jobs, jobs)
for i := 0; i < runtime.NumCPU(); i++ {
go Worker(jobs, done)
}
consumeWork(done)
}
func produceWork(vf string, jobs chan *utils.DigSigEntries) {
defer close(jobs)
// load file with jobs
file, err := ini.LoadFile(vf)
// get data for processing
for data, _ := range file {
// ...
jobs <- &myStruct{data1, data2, data3, false}
}
}
func Worker(in, out chan *myStruct) {
for {
item, open := <-in
if !open {
break
}
process(item)
out <- item
}
// close(out) --> tried closing the out channel, but then not all items are processed
// though no panics occur.
}
func process(item *myStruct) {
//...modify the item
item.status = true
}
func consumeWork(done chan *myStruct) {
for val := range done {
if !val.status {
fmt.Println(val)
}
}
}
I'm mainly trying to understand how to do this without using the sync/Wait stuff - just pure channels - is this possible? The goal of this routine is to have a single producer load up items that are processed by N workers - appreciate any pointers / help.
You can, as siritinga suggested, use a third signalling or counter channel, eg signal chan boolean
, where the produceWork
goroutine would add a value before each job entered into the jobs
channel. So, an equal count of values will be passed to signal
as to jobs
:
func produceWork(vf string, jobs chan *utils.DigSigEntries, signal chan boolean) {
defer close(jobs)
// load file with jobs
file, err := ini.LoadFile(vf)
// get data for processing
for data, _ := range file {
// ...
signal <- true
jobs <- &myStruct{data1, data2, data3, false}
}
close(signal)
}
The consume would then start by reading from the signal
channel. If there is a value, it can be certain there will be a value to read from the out
channel (once a worker has passed it on). If the signal
is closed, then all is done. We can close the remaining done
channel:
func consumeWork(done chan *myStruct, signal chan boolean) {
for _ := range signal {
val <- done
if !val.status {
fmt.Println(val)
}
}
close(done)
}
While this is possible, I would not really recommend it. It doesn't make the code more clear than when using a sync.WaitGroup
. After all, the signal
channel would basically only work as a counter. A WaitGroup would have the same purpose and would do it cheaper.
But your question was not about how to solve the problem, but rather if it was possible to do it with pure channels.
Sorry, i didn't notice that you wanted to skip /sync :/ I'll leave the answer, maybe someone is looking for this.
import (
"sync"
)
func main() {
jobs := make(chan *myStruct, workers)
done := make(chan *myStruct, workers)
var workerWg sync.WaitGroup // waitGroup for workers
var consumeWg sync.WaitGroup // waitGroup for consumer
consumeWg.Add(1) // add one active Consumer
for i := 0; i < runtime.NumCPU(); i++ {
go Worker(&workerWg, jobs, done)
workerWg.Add(1)
}
go consumeWork(&consumeWg, done)
produceWork(file_with_jobs, jobs)
close(jobs)
workerWg.Wait()
close(done)
consumeWg.Wait()
}
func produceWork(vf string, jobs chan *utils.DigSigEntries) {
// load file with jobs
file, err := ini.LoadFile(vf)
// get data for processing
for data, _ := range file {
// ...
jobs <- &myStruct{data1, data2, data3, false}
}
}
func Worker(wg *sync.WaitGroup, done chan *myStruct) {
defer wg.Done()
for job := range jobs {
result := process(job)
out <- result
}
// close(out) --> tried closing the out channel, but then not all items are processed
// though no panics occur.
}
func process(item *myStruct) {
//...modify the item
item.status = true
}
func consumeWork(wg *sync.WaitGroup, done chan *myStruct) {
defer wg.Done()
for val := range done {
if !val.status {
fmt.Println(val)
}
}
}