I'm using this: (symbols is []string as well as filteredSymbols)
concurrency := 5
sem := make(chan bool, concurrency)
for i := range symbols {
sem <- true
go func(int) {
defer func() { <-sem }()
rows, err := stmt.Query(symbols[i])
if <some condition is true> {
filteredSymbols = append(filteredSymbols, symbols[i])
}
}(i)
}
for i := 0; i < cap(sem); i++ {
sem <- true
}
to limit number of goroutines running concurrently. I need to limit it because every goroutine interacts with Postgres database and sometimes I do have more than 3000 symbols to evaluate. The code is for analysing big financial data, stocks and other securities. I'm also using same code to get OHLC and pre-calculated data from db. Is this a modern approach for this? I'm asking this because WaitGroups already exist and I'm looking for a way to use those instead.
Also, I observed that my method above sometimes yield different results. I had a code where sometimes the resulting number of filteredSymbols is 1409. Without changing the parameters, it would then yield 1407 results, then 1408 at times. I even had a code where there were big deficit in results.
The code below was very inconsistent so I removed the concurrency. (NOTE that in code below, I don't even have to limit concurrent goroutines since they only use in-memory resources). Removing concurrency fixed it
func getCommonSymbols(symbols1 []string, symbols2 []string) (symbols []string) {
defer timeTrack(time.Now(), "Get common symbols")
// concurrency := len(symbols1)
// sem := make(chan bool, concurrency)
// for _, s := range symbols1 {
for _, sym := range symbols1 {
// sym := s
// sem <- true
// go func(string) {
// defer func() { <-sem }()
for k := range symbols2 {
if sym == symbols2[k] {
symbols = append(symbols, sym)
break
}
}
// }(sym)
}
// for i := 0; i < cap(sem); i++ {
// sem <- true
// }
return
}
You have a data race, multiple goroutines are updating filteredSymbols at the same time. The smallest change you can make to fix it is to add a mutex lock around the append call, e.g.
concurrency := 5
sem := make(chan bool, concurrency)
l := sync.Mutex{}
for i := range symbols {
sem <- true
go func(int) {
defer func() { <-sem }()
rows, err := stmt.Query(symbols[i])
if <some condition is true> {
l.Lock()
filteredSymbols = append(filteredSymbols, symbols[i])
l.Unlock()
}
}(i)
}
for i := 0; i < cap(sem); i++ {
sem <- true
}
The Race Detector could of helped you spot this as well. One common alternative would be to use a channel to get work into a goroutine, and a channel to get the results out, something like.
concurrency := 5
workCh := make(chan string, concurrency)
resCh := make(chan string, concurrency)
workersWg := sync.WaitGroup{}
// start the required number of workers, use the WaitGroup to see when they're done
for i := 0; i < concurrency; i++ {
workersWg.Add(1)
go func() {
defer workersWg.Done()
for symbol := range workCh {
// do some work
if cond {
resCh <- symbol
}
}
}()
}
go func() {
// when all the workers are done, close the resultsCh
workersWg.Wait()
close(resCh)
}()
// submit all the work
for _, s := range symbols {
workCh <- s
}
close(workCh)
// collect up the results
for r := range resCh {
filteredSymbols = append(filteredSymbols, r)
}