I'm pretty new to Go and looking for a way to handle 3000 queries using 100 workers and ensuring a connection for every worker (MySQL is already configured with more than 100 connections). This is my attempt:
package main
import (
"database/sql"
_ "github.com/go-sql-driver/mysql"
)
var query *sql.Stmt
func worker(jobs <-chan int, results chan<- int) {
for _ = range jobs {
_, e := query.Exec("a")
if e != nil {
panic(e.Error())
}
results <- 1
}
}
func main() {
workers := 100
db, e := sql.Open("mysql", "foo:foo@/foo")
if e != nil {
panic(e.Error())
}
db.SetMaxOpenConns(workers)
db.SetMaxIdleConns(workers)
defer db.Close()
query, e = db.Prepare("INSERT INTO foo (foo) values(?)")
if e != nil {
panic(e.Error())
}
total := 30000
jobs := make(chan int, total)
results := make(chan int, total)
for w := 0; w < workers; w++ {
go worker(jobs, results)
}
for j := 0; j < total; j++ {
jobs <- j
}
close(jobs)
for r := 0; r < total; r++ {
<-results
}
}
It's working, but I'm not sure if is the best way of doing it.
Please, if you think this is opinion based or is not a good question at all, just mark it to be closed and leave a comment explaining why.
What you've got fundamentally works, but to get rid of buffering, you need to be writing to jobs
and reading from results
at the same time. Otherwise, your process ends up stuck--workers can't send results because nothing is receiving them, and you can't insert jobs because workers are blocked.
Here's a boiled-down example on the Playground of how to do a work queue that pushes jobs in the background as it receives results in main
:
package main
import "fmt"
func worker(jobs <-chan int, results chan<- int) {
for _ = range jobs {
// ...do work here...
results <- 1
}
}
func main() {
workers := 10
total := 30
jobs := make(chan int)
results := make(chan int)
// start workers
for w := 0; w < workers; w++ {
go worker(jobs, results)
}
// insert jobs in background
go func() {
for j := 0; j < total; j++ {
jobs <- j
}
}()
// collect results
for i := 0; i < total; i++ {
<-results
fmt.Printf(".")
}
close(jobs)
}
For that particular code to work, you have to know how many results you'll get. If you don't know that (say, each job could produce zero or multiple results), you can use a sync.WaitGroup
to wait for the workers to finish, then close the result stream:
package main
import (
"fmt"
"sync"
)
func worker(jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
for _ = range jobs {
// ...do work here...
results <- 1
}
wg.Done()
}
func main() {
workers := 10
total := 30
jobs := make(chan int)
results := make(chan int)
wg := &sync.WaitGroup{}
// start workers
for w := 0; w < workers; w++ {
wg.Add(1)
go worker(jobs, results, wg)
}
// insert jobs in background
go func() {
for j := 0; j < total; j++ {
jobs <- j
}
close(jobs)
wg.Wait()
// all workers are done so no more results
close(results)
}()
// collect results
for _ = range results {
fmt.Printf(".")
}
}
There are many other more complicated tricks one can do to stop all workers after an error happens, put results into the same order as the original jobs, or do other things like that. Sounds as if the basic version works here, though.