This might be a rookies mistake. I have a slice with a string value and a map of channels. For each string in the slice, a channel is created and a map entry is created for it, with the string as key.
I watch the channels and pass a value to one of them, which is never found.
package main
import (
"fmt"
"time"
)
type TestStruct struct {
Test string
}
var channelsMap map[string](chan *TestStruct)
func main() {
stringsSlice := []string{"value1"}
channelsMap := make(map[string](chan *TestStruct))
for _, value := range stringsSlice {
channelsMap[value] = make(chan *TestStruct, 1)
go watchChannel(value)
}
<-time.After(3 * time.Second)
testStruct := new(TestStruct)
testStruct.Test = "Hello!"
channelsMap["value1"] <- testStruct
<-time.After(3 * time.Second)
fmt.Println("Program ended")
}
func watchChannel(channelMapKey string) {
fmt.Println("Watching channel: " + channelMapKey)
for channelValue := range channelsMap[channelMapKey] {
fmt.Printf("Channel '%s' used. Passed value: '%s'
", channelMapKey, channelValue.Test)
}
}
Playground link: https://play.golang.org/p/IbucTqMjdGO
Output:
Watching channel: value1
Program ended
How do I execute something when the message is fed into the channel?
The shadowing in main of channelsMap mentioned above was a critical bug, but aside from that, the program was playing "Russian roulette" with the calls to time.After so that main wouldn't finish before the watcher goroutines did. This is unstable and unreliable, so I recommend the following approach using a channel to signal when all watcher goroutines are done:
package main
import (
"fmt"
)
type TestStruct struct {
Test string
}
var channelsMap map[string](chan *TestStruct)
func main() {
stringsSlice := []string{"value1", "value2", "value3"}
structsSlice := []TestStruct{
{"Hello1"},
{"Hello2"},
{"Hello3"},
}
channelsMap = make(map[string](chan *TestStruct))
// Signal channel to wait for watcher goroutines.
done := make(chan struct{})
for _, s := range stringsSlice {
channelsMap[s] = make(chan *TestStruct)
// Give watcher goroutines the signal channel.
go watchChannel(s, done)
}
for _, ts := range structsSlice {
for _, s := range stringsSlice {
channelsMap[s] <- &ts
}
}
// Close the channels so watcher goroutines can finish.
for _, s := range stringsSlice {
close(channelsMap[s])
}
// Wait for all watcher goroutines to finish.
for range stringsSlice {
<-done
}
// Now we're really done!
fmt.Println("Program ended")
}
func watchChannel(channelMapKey string, done chan<- struct{}) {
fmt.Println("Watching channel: " + channelMapKey)
for channelValue := range channelsMap[channelMapKey] {
fmt.Printf("Channel '%s' used. Passed value: '%s'
", channelMapKey, channelValue.Test)
}
done <- struct{}{}
}
(Go Playground link: https://play.golang.org/p/eP57Ru44-NW)
Of importance is the use of the done channel to let watcher goroutines signal that they're finished to main. Another critical part is the closing of the channels once you're done with them. If you don't close them, the range loops in the watcher goroutines will never end, waiting forever. Once you close the channel, the range loop exits and the watcher goruoutine can send on the done channel, signaling that it has finished working.
Finally, back in main, you have to receive on the done channel once for each watcher goroutine you created. Since the number of watcher goroutines is equal to the number of items in stringsSlice, you simply range over stringsSlice to receive the correct amount of times from the done channel. Once that's finished, the main function can exit with a guarantee that all watchers have finished.
There are many problems with your approach.
The first one is that you're redeclaring ("shadowing") the global variable channelsMap
in your main
function. (Had you completed at least some most basic intro to Go, you should have had no such problem.)
This means that your watchChannel
(actually, all the goroutines which execute that function) read the global channelsMap
while your main
function writes to its local channelsMap
.
What happens next, is as follows:
The range
statement in the watchChannel
has a simple map lookup expression as its source—channelsMap[channelMapKey]
.
In Go, this form of map lookup never fails, but if the map has no such key (or if the map is not initialized, that is, it's nil
), the so-called "zero value" of the appropriate type is returned.
Since the global channelsMap
is always empty, any call to watchChannel
performs a map lookup which always returns the zero value of type chan *TestStruct
. The zero value for any channel is nil
.
The range
statement executed over a nil
channel produces zero iterations. In other words, the for
loop in watchChannel
always executes zero times.
The more complex problem, still, is not shadowing of the global variable but rather the complete absense of synchronization between the goroutines. You're using "sleeping" as a sort of band-aid in an attempt to perform implicit synchronization between goroutines but while this does appear to be okay judged by so-called "common sense", it's not going to work in practice for two reasons:
There exist various ways to synchronize execution between goroutines. Basically they amount to sends and receives over channels and using the types provided by the sync
package. In your particular case the simplest approach is probably using the sync.WaitGroup
type.
Here is what we would have after fixing the problems explained above: - Initialize the map variable right at the point of its definition and not mess with it in main
. - Use sync.WaitGroup
to make main
properly wait for all the goroutines it spawned to singal they're done:
package main
import (
"fmt"
"sync"
)
type TestStruct struct {
Test string
}
var channelsMap = make(map[string](chan *TestStruct))
func main() {
stringsSlice := []string{"value1"}
var wg sync.WaitGroup
wg.Add(len(stringsSlice))
for _, value := range stringsSlice {
channelsMap[value] = make(chan *TestStruct, 1)
go watchChannel(value, &wg)
}
testStruct := new(TestStruct)
testStruct.Test = "Hello!"
channelsMap["value1"] <- testStruct
wg.Wait()
fmt.Println("Program ended")
}
func watchChannel(channelMapKey string, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Println("Watching channel: " + channelMapKey)
for channelValue := range channelsMap[channelMapKey] {
fmt.Printf("Channel '%s' used. Passed value: '%s'
", channelMapKey, channelValue.Test)
}
}
The next two problems with your code become apparent once we will have fixed the former two—after you make the "watcher" goroutines use the same map variable as the goroutine running main
, and make the latter properly wait for the watchers:
There is a data race over the map variable between the code which updates the map after the for
loop spawning the watcher goroutines ended and the code which accesses this variable in all the watcher goroutines.
There is a deadlock between the watcher goroutines and the main goroutine which waits for them to complete.
The reason for the deadlock is that the watcher goroutines never receive any signal they have to quit processing and hence are stuck forever trying to read from their respective channels.
The ways to fix these two new problems are simple but they might actually "break" your original idea of structuring your code.
First, I'd remove the data race by simply making the watchers not access the map variable. As you can see, each call to watchChannel
receives a single value to use as the key to read a value off the shared map, and hence each watcher always reads a single value exactly once during its run time. The code would become much clearer if we remove this extra map access altogether and instead pass the appropriate channel value directly to each watcher. A nice byproduct of this is that we do not need a global map variable anymore.
Here's what we'll get:
package main
import (
"fmt"
"sync"
)
type TestStruct struct {
Test string
}
func main() {
stringsSlice := []string{"value1"}
channelsMap := make(map[string](chan *TestStruct))
var wg sync.WaitGroup
wg.Add(len(stringsSlice))
for _, value := range stringsSlice {
channelsMap[value] = make(chan *TestStruct, 1)
go watchChannel(value, channelsMap[value], &wg)
}
testStruct := new(TestStruct)
testStruct.Test = "Hello!"
channelsMap["value1"] <- testStruct
wg.Wait()
fmt.Println("Program ended")
}
func watchChannel(channelMapKey string, ch <-chan *TestStruct, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Println("Watching channel: " + channelMapKey)
for channelValue := range ch {
fmt.Printf("Channel '%s' used. Passed value: '%s'
", channelMapKey, channelValue.Test)
}
}
Okay, we still have the deadlock.
There are multiple approaches to solving this but they depend on the actual circumstances, and with this toy example, any attempt to iterate over at least a subset of them would just muddle the waters. Instead, let's employ the simplest one for this case: closing a channel makes any pending receive operation on it immediately unblock and produce the zero value for the channel's type. For a channel being iterated over using the range
statement it simply means the stamement terminates without producing any value from the channel.
In other words, let's just close all the channels to unblock the range
statements being run by the watcher goroutines and then wait for these goroutines to report their completion via the wait group.
To not make the answer overly long, I also added programmatic initialization of the string slice to make the example more interesting by having multiple watchers—not just a single one—actually do useful work:
package main
import (
"fmt"
"sync"
)
type TestStruct struct {
Test string
}
func main() {
var stringsSlice []string
channelsMap := make(map[string](chan *TestStruct))
for i := 1; i <= 10; i++ {
stringsSlice = append(stringsSlice, fmt.Sprintf("value%d", i))
}
var wg sync.WaitGroup
wg.Add(len(stringsSlice))
for _, value := range stringsSlice {
channelsMap[value] = make(chan *TestStruct, 1)
go watchChannel(value, channelsMap[value], &wg)
}
for _, value := range stringsSlice {
testStruct := new(TestStruct)
testStruct.Test = fmt.Sprint("Hello! ", value)
channelsMap[value] <- testStruct
}
for _, ch := range channelsMap {
close(ch)
}
wg.Wait()
fmt.Println("Program ended")
}
func watchChannel(channelMapKey string, ch <-chan *TestStruct, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Println("Watching channel: " + channelMapKey)
for channelValue := range ch {
fmt.Printf("Channel '%s' used. Passed value: '%s'
", channelMapKey, channelValue.Test)
}
}
As you can see, there are things you should actually learn about in way more greater detail before embarking on working with concurrency.
I'd recommend to proceed in the following order:
sync
package.