i use this as concurrentmap,and buffer channels as map value for thread safe(worked as queue),when test use 10 goroutines the value got from channel was not same with the one send in,any suggestion?
package main
import "fmt"
import "github.com/streamrail/concurrent-map"
func main() {
testmap := cmap.New()
fmt.Println("SyncMapNew: ", TestInParallel(&testmap, 10))
}
func TestInParallel(g *cmap.ConcurrentMap, n int) time.Duration {
start := time.Now()
var wait sync.WaitGroup
for i := 0; i < n; i++ {
wait.Add(1)
go func() {
TheTest(g, rand.New(rand.NewSource(int64(i*500))))
wait.Done()
}()
}
wait.Wait()
return time.Now().Sub(start)
}
func TheTest(g *cmap.ConcurrentMap, rnd *rand.Rand) time.Duration {
start := time.Now()
var key string
var value time.Time
for i := 0; i < 10000; i++ {
key = strconv.Itoa(int(rnd.Int31n(50000)))
if g.Has(key) == false {
g.Set(key, make(chan time.Time, 100))
}
tchan, _ := g.Get(key)
castchan := tchan.(chan time.Time)
value = time.Now()
castchan <- value
got := <-castchan
g.Set(key, castchan)
if value != got {
panic(fmt.Sprintf("ERROR: expected %v, got %v", value, got))
}
}
return time.Now().Sub(start)
}
updatei misunderstand the business logic,the code should like this
key = strconv.Itoa(int(rnd.Int31n(500)))
tchan, _ := g.GetSet(key, make(chan time.Time, 100))
castchan := tchan.(chan time.Time)
value = time.Now()
if len(castchan) >= 99 {
<-castchan//do somthing here
}
castchan <- value
g.Set(key, castchan)
You are using random keys, so more than one goroutine may get the same random number.
if two routines get the same number at the same(ish) time, then
This is a race condition:
if g.Has(key) == false {
g.Set(key, make(chan time.Time, 100))
}
It is possible that h.Has is false for two goroutines at the SAME time and then they both set, so goroutine1 sets chan A, and goroutine2 sets chan B and then both end up using chan B
In order to solve this, you need something like
SetIfAbsent
Which locks, checks if it exists, and if not, sets it, then unlocks.
The library you linked map isn't super useful as a cache in that it provides no atomic SetIfAbsent type function.
In the event there isn't a g.Has / g.Set race, then if two routines just happen to get the same key, and thus, same channel, you aren't guaranteed which value is first in queue and which is read first.
so goroutine1 might read the value goroutine2 put in, or the other way around.
When thinking about using shared state in a concurrently executing system, you have to assume that any other operation may happen between any statements / lines of code.
I often like to think of it as, you should assume that each line of your code is run on each core one at a time.
so, in Has/Set example it would be:
if g.Has(key) == false { // goroutine1
if g.Has(key) == false { // goroutine2
g.Set(key, make(chan time.Time, 100)) //goroutine1
g.Set(key, make(chan time.Time, 100)) //goroutine2
} //goroutine1
} //goroutine2
tchan, _ := g.Get(key) //goroutine1
tchan, _ := g.Get(key) //goroutine2
see where the bug is? the second routine put it's channel in the map, but both retrieved that same channel on the tchan line.
make sense?