甚至使用互斥体进行自定义并发映射的Golang数据竞赛

Here is a simple concurrent map that I wrote for learning purpose

    package concurrent_hashmap

    import (
        "hash/fnv"
        "sync"
    )

    type ConcurrentMap struct {
        buckets []ThreadSafeMap
        bucketCount uint32
    }

    type ThreadSafeMap struct {
        mapLock sync.RWMutex
        hashMap map[string]interface{}
    }

    func NewConcurrentMap(bucketSize uint32) *ConcurrentMap {
        var threadSafeMapInstance ThreadSafeMap
        var bucketOfThreadSafeMap []ThreadSafeMap

        for i := 0; i <= int(bucketSize); i++ {
            threadSafeMapInstance = ThreadSafeMap{sync.RWMutex{}, make(map[string]interface{})}
            bucketOfThreadSafeMap = append(bucketOfThreadSafeMap, threadSafeMapInstance)
        }

        return &ConcurrentMap{bucketOfThreadSafeMap, bucketSize}
    }

    func (cMap *ConcurrentMap) Put(key string, val interface{}) {
        bucketIndex := hash(key) % cMap.bucketCount
        bucket := cMap.buckets[bucketIndex]
        bucket.mapLock.Lock()
        bucket.hashMap[key] = val
        bucket.mapLock.Unlock()
    }

    // Helper
    func hash(s string) uint32 {
        h := fnv.New32a()
        h.Write([]byte(s))
        return h.Sum32()
    }

I am trying to write a simple benchmark and I find that synchronize access will work correctly but concurrent access will get

fatal error: concurrent map writes

Here is my benchmark run with go test -bench=. -race

package concurrent_hashmap

import (
    "testing"
    "runtime"
    "math/rand"
    "strconv"
    "sync"
)
// Concurrent does not work
func BenchmarkMyFunc(b *testing.B) {
    var wg sync.WaitGroup

    runtime.GOMAXPROCS(runtime.NumCPU())

    my_map := NewConcurrentMap(uint32(4))
    for n := 0; n < b.N; n++ {
        go insert(my_map, wg)
    }
    wg.Wait()
}

func insert(my_map *ConcurrentMap, wg sync.WaitGroup) {
    wg.Add(1)
    var rand_int int
    for element_num := 0; element_num < 1000; element_num++ {
        rand_int = rand.Intn(100)
        my_map.Put(strconv.Itoa(rand_int), rand_int)
    }
    defer wg.Done()
}

// This works
func BenchmarkMyFuncSynchronize(b *testing.B) {
    my_map := NewConcurrentMap(uint32(4))
    for n := 0; n < b.N; n++ {
        my_map.Put(strconv.Itoa(123), 123)
    }
}

The WARNING: DATA RACE is saying that bucket.hashMap[key] = val is causing the problem, but I am confused on why that is possible, since I lock that logic whenever write is happening.

I think I am missing something basic, can someone point out my mistake?

Thanks

Edit1:

Not sure if this helps but here is what my mutex looks like if I don't lock anything

{{0 0} 0 0 0 0}

Here is what it looks like if I lock the write

{{1 0} 0 0 -1073741824 0}

Not sure why my readerCount is a low negative number

Edit:2

I think I find where the issue is at, but not sure why I have to code that way

The issue is

type ThreadSafeMap struct {
    mapLock sync.RWMutex // This is causing problem
    hashMap map[string]interface{}
}

it should be

type ThreadSafeMap struct {
    mapLock *sync.RWMutex
    hashMap map[string]interface{}
}

Another weird thing is that in Put if I put print statement inside lock

bucket.mapLock.Lock()
fmt.Println("start")
fmt.Println(bucket)
fmt.Println(bucketIndex)
fmt.Println(bucket.mapLock)
fmt.Println(&bucket.mapLock)
bucket.hashMap[key] = val
defer bucket.mapLock.Unlock()

The following prints is possible

start
start
{0x4212861c0 map[123:123]}
{0x4212241c0 map[123:123]}

Its weird because each start printout should be follow with 4 lines of bucket info since you cannot have start back to back because that would indicate that multiple thread is access the line inside lock

Also for some reason each bucket.mapLock have different address even if I make the bucketIndex static, that indicate that I am not even accessing the same lock.

But despite the above weirdness changing mutex to pointer solves my problem

I would love to find out why I need pointers for mutex and why the prints seem to indicate multiple thread is accessing the lock and why each lock has different address.

The problem is with the statement

bucket := cMap.buckets[bucketIndex]

bucket now contains copy of the ThreadSafeMap at that index. As sync.RWMutex is stored as value, a copy of it is made while assigning. But map maps hold references to an underlying data structure, so the copy of the pointer or the same map is passed. The code locks a copy of the lock while writing to a single map, which cause the problem.

Thats why you don't face any problem when you change sync.RWMutex to *sync.RWMutex. It's better to store reference to structure in map as shown.

package concurrent_hashmap

import (
    "hash/fnv"
    "sync"
)

type ConcurrentMap struct {
    buckets     []*ThreadSafeMap
    bucketCount uint32
}

type ThreadSafeMap struct {
    mapLock sync.RWMutex
    hashMap map[string]interface{}
}

func NewConcurrentMap(bucketSize uint32) *ConcurrentMap {
    var threadSafeMapInstance *ThreadSafeMap
    var bucketOfThreadSafeMap []*ThreadSafeMap

    for i := 0; i <= int(bucketSize); i++ {
        threadSafeMapInstance = &ThreadSafeMap{sync.RWMutex{}, make(map[string]interface{})}
        bucketOfThreadSafeMap = append(bucketOfThreadSafeMap, threadSafeMapInstance)
    }

    return &ConcurrentMap{bucketOfThreadSafeMap, bucketSize}
}

func (cMap *ConcurrentMap) Put(key string, val interface{}) {
    bucketIndex := hash(key) % cMap.bucketCount
    bucket := cMap.buckets[bucketIndex]
    bucket.mapLock.Lock()
    bucket.hashMap[key] = val
    bucket.mapLock.Unlock()
}

// Helper
func hash(s string) uint32 {
    h := fnv.New32a()
    h.Write([]byte(s))
    return h.Sum32()
}

It's possible to validate the scenario by modifying the function Put as follows

func (cMap *ConcurrentMap) Put(key string, val interface{}) {
    //fmt.Println("index", key)
    bucketIndex := 1
    bucket := cMap.buckets[bucketIndex]
    fmt.Printf("%p %p
", &(bucket.mapLock), bucket.hashMap)
}