I'm currently trying to range
over a map
to do concurrent database requests rather than synchronously, obviously because of the speed boost.
My problem is I have something like this:
var mainthreads = make(chan *mainthread)
var mainthreadsFetched = make(chan struct{})
for containerid := range containers {
go func() {
rows, err := db.Query("SELECT thread_id, belongs_to, thread_name, access_level FROM forum_mainthread WHERE belongs_to = ?", containerid)
defer rows.Close()
if err != nil {
log.Println(err)
}
for rows.Next() {
mainthread := &MainThread{}
err := rows.Scan(&mainthread.MainThreadID, &mainthread.BelongsTo, &mainthread.ThreadName, &mainthread.AccessLevel)
if err != nil {
log.Println(err)
}
mainthreads <- mainthread
}
}()
mainthreadsFetched <- struct{}{}
}
// Get all mainthreads
<-mainthreadsFetched
// Do other stuff after complete
Obviously mainthreadsFetched <- struct{}{}
is being called almost instantly because the loop finishes faster than you can blink, how can I create a new channel per loop that will not block each new goroutine
from starting, but rather let the loop start all goroutines
and then send out on a channel when every goroutine
is done.
Using sync.WaitGroup
is a great solution, and the one usually used.
Alternatively, you can receive on mainthreadsFetched
len(containers)
times, instead of just 1. That will guarantee that all go routines have completed. You'll need to move the send line to the end of the go routine (or, better, into a defer).
Also, since containerid
is in the for loop, its value changes. You need to pass it as a parameter to the go routine closure.
So the best way I could come up with doing this is to use sync.WaitGroup
and do something like this:
var wg sync.WaitGroup
var mainThreadFetched = make(chan MainThread)
for containerid := range containers {
wg.Add(1)
go func(containerid int64) {
rows, err := db.Query("SELECT thread_id, belongs_to, thread_name, access_level FROM forum_mainthread WHERE belongs_to = ?", containerid)
defer rows.Close()
if err != nil {
log.Println(err)
}
for rows.Next() {
mainthread := MainThread{}
err := rows.Scan(&mainthread.MainThreadID, &mainthread.BelongsTo, &mainthread.ThreadName, &mainthread.AccessLevel)
if err != nil {
log.Println(err)
}
mainThreadFetched <- mainthread
}
wg.Done()
}(containerid)
}
go func() {
wg.Wait()
close(mainThreadFetched)
}()
for mainthread := range mainThreadFetched {
containers[mainthread.BelongsTo].MainThreads = append(containers[mainthread.BelongsTo].MainThreads, mainthread)
}
// Do other stuff
Now I can read from the mainThreadFetched
channel and then when the WaitGroup
is satisfied it will close the channel allowing the loop to end and continue on
I don't see where you are reading mainthreads. If it's not a buffered channel, you would need to solve that some way or another. I'm going to provide a few solutions for solving it - None are more "correct" than the other - it just depends on your needs.
Variant A This is the simplest solution but it presumes you have some other goroutine reading mainthreads (which may already be the case)
var mainthreads = make(chan *mainthread)
var mainthreadsFetched = make(chan struct{})
go somethingWhichReadsMainThreads()
for containerid := range containers {
go func(containerid int) {
// build query omitted for brevity
for rows.Next() {
// omitted for brevity
mainthreads <- mainthread
}
mainthreadsFetched <- struct{}{}
}(containerid)
}
for i := 0; i < len(containers); i++ {
<-mainThreadsFetched
}
close(mainthreads)
// Do other stuff after complete
Variant B This one uses the select statement to deal with reading the threads separate from the completion notifications without needing yet another goroutine.
var mainthreads = make(chan *mainthread)
var mainthreadsFetched = make(chan struct{})
for containerid := range containers {
go func(containerid int) {
// build query omitted for brevity
for rows.Next() {
// omitted for brevity
mainthreads <- mainthread
}
mainthreadsFetched <- struct{}{}
}(containerid)
}
numComplete := 0
readRunning := true
for readRunning {
select {
case thread := <-mainthreads:
// do something with thread, like threads = append(threads, thread)
case <-mainthreadsFetched:
numFetched++
if numFetched == len(containers) {
readRunning = False
}
}
}
// Do other stuff after complete
Variant C This one uses the fact that you aren't using the 'zero value' (nil) for passing real data, so you can use that as a signal value instead of a separate struct channel. It has the advantage that it is far less code, but it does feel like spooky action at a distance.
var mainthreads = make(chan *mainthread)
for containerid := range containers {
go func(containerid int) {
// build query omitted for brevity
for rows.Next() {
// omitted Scan for brevity
mainthreads <- mainthread
}
mainthreads <- nil // nil signals to us we are done
}(containerid)
}
numComplete := 0
for thread := range mainthreads {
if thread != nil {
// do something with thread, like threads = append(threads, thread)
} else {
numFetched++
if numFetched == len(containers) {
break
}
}
}
// Do other stuff after complete