I am trying to work with concurrency and channels in Go. The problem that I am facing is mainly the idea of concurrency, so I am not discarding that the following logic is wrong or should be changed.
I have a buffered channel that has a buffer size of 'N' and it also represents the number of goroutines that will be created. All of the routines reads form One Channel and Writes in another One channel, and the main goroutine will be printing the values from the final channel.
1 input channel --- N goroutines looking and adding to input and output --- 1 output channel
The problem is that i always hit deadlock, since I don't know how to close a channel that is feeding itself, and does not know when it will stop, so I cannot close the output channel either.
The code is the following example:
package main
const count = 3
const finalNumber = 100
// There will be N routines running and reading from the one read channel
// The finalNumber is not known, in this examples is 100, but in the main problem will keep self feeding until the operation gives a wrong output
// readingRoutine will feed read channel and the print channel
func readingRoutine(read, print chan int) {
for i := range read {
print <- i
if i < finalNumber && i+count < finalNumber {
read <- i + count
}
}
}
// This is the main routine that will be printing the values from the print channel
func printingRoutine(print chan int) {
for i := range print {
println(i)
}
}
func main() {
read := make(chan int, count)
print := make(chan int, count)
// Feed count numbers into the buffered channel
for i := 0; i < count; i++ {
read <- i
}
// count go routines will be processing the read channel
for i := 0; i < count; i++ {
go readingRoutine(read, print)
}
printingRoutine(print)
}
In this example it should print all numbers from 0 to 100 and finish. Thanks
You can use a sync.WaitGroup
for waiting things done, say wg := &sync.WaitGroup{}
As you are trying to print finalNumber
times, you shall call wg.Add(finalNumber)
and then in print()
, each time you have done a print, call wg.Done()
.
Spawn another goroutine to wait wg.Wait()
and then close the read
channel and print
channel.
func printingRoutine(print chan int,wg *sync.WaitGroup) {
for i := range print {
println(i)
wg.Done()
}
}
func main() {
read := make(chan int, count)
print := make(chan int, count)
wg := &sync.WaitGroup{}
wg.Add(finalNumber)
// Feed count numbers into the buffered channel
for i := 0; i < count; i++ {
read <- i
}
// count go routines will be processing the read channel
for i := 0; i < count; i++ {
go readingRoutine(read, print)
}
go func() {
wg.Wait()
close(read)
close(print)
}()
printingRoutine(print,wg)
}
Playground: https://play.golang.org/p/BMSfz03egx0
I usually find that if you're having real problems making a design pattern or idea work, you're doing it wrong. In this case the idea of a self-feeding routine knowing when it should close itself.
I think what you're looking for is the idea of a worker pool
.
Essentially you have a channel that contains your set of work
, and then a number of workers
in the form of go routines that read jobs from that channel and do something with them until all work has been completed.
In the following example I am using the package gopool to run 3 concurrent workers which are being fed by a 4th go routine.
I wait for all workers to close themselves, which is caused by the input work channel being closed.
// create the channel to store the jobs
// notice that we can only have 5 jobs in the channel at one time
workChan := make(chan int, 5)
// here we define what should be happening in each of our workers.
// each worker will be running concurrently
var work gopool.WorkFunc = func(ctx context.Context) error {
for {
select {
case <-ctx.Done():
// this is just a get out clause in case we want to prematurely stop the workers while there is still work to do
return ctx.Err()
case work, ok := <-workChan:
if !ok {
// we get here if the work channel has been closed
return nil
}
// do something with work here
fmt.Println(work)
}
}
}
// this func defines how many workers we want to be running concurrently
var workerCount gopool.WorkerCountFunc = func() uint64 {
return 3
}
// here we define a new worker pool
p := gopool.NewPool("test", work, workerCount, nil, context.TODO())
// here we start the worker pool
cancel, err := p.StartOnce()
if err != nil {
panic(err)
}
// the workers are now running and waiting for jobs
// we'll defer the cancel to make sure that the pool will be closed eventually
defer cancel()
// now we'll start a go routine to feed the workers
// it does this by adding to the workChan, and closes workChan when there is no more work to do
// when the workChan is closed the workers know that they should quit
go func(workChan chan<- int, min int, max int) {
for i := min; i <= max; i++ {
workChan <- i
}
close(workChan)
}(workChan, 3, 200)
// now we wait for the pool to be finished with it's work
<-p.Done()
fmt.Println("all work has been done")
Example output:
$ go run main.go
4
6
7
8
9
10
3
5
13
14
15
16
17
18
12
20
21
19
23
24
25
22
27
28
11
30
31
26
33
29
35
36
34
38
39
32
41
42
37
44
43
46
47
40
49
50
51
52
48
54
53
56
57
58
45
60
61
62
63
64
65
55
59
68
69
66
71
72
73
70
75
76
77
67
79
80
74
82
83
81
85
84
87
88
89
78
91
90
93
94
92
96
97
95
99
98
101
102
103
104
100
106
107
108
109
105
111
112
110
114
115
116
117
118
119
120
86
122
123
124
125
126
127
121
129
113
131
128
133
134
130
136
137
132
139
138
141
140
143
144
145
146
147
148
149
150
135
151
153
142
155
156
157
158
159
160
161
162
163
164
152
154
167
165
166
170
171
172
173
174
169
176
177
178
179
180
168
182
183
184
181
185
187
188
189
190
175
186
193
191
195
194
197
196
199
198
200
192
all work has been done