I've got a use case where I have 15k rows to insert in to two tables. This can't follow the previous results I've seen on here where you just append to a string to do a bulk upload because I need to know which ones failed and which ones passed.
The setup is that there's a writer database and multiple readers, so we can kind of 'hog' the writer database connections temporarily to get this going since reads are on a different node.
The code I've come up with is as follows:
type Req struct {
FieldA, FieldB, FieldC, FieldD, FieldE, FieldF string
}
type Result struct {
ID int
}
type InsertResult struct {
LastInsertedID int64
Error error
}
func (d *DB) InsertRequests(reqs []Req) <-chan InsertResult {
outChan := make(chan InsertResult, 500)
go func() {
defer close(outChan)
wg := &sync.WaitGroup{}
for _, req := range reqs {
wg.Add(1)
go func(req Req) {
defer wg.Done()
tx, err := d.db.Begin()
if err != nil {
outChan <- InsertResult{Error: err}
return
}
res, err := tx.Exec("INSERT INTO table (a, b, c, d, e, f) VALUES (?, ?, ?, ?, ?, ?)",
req.FieldA, req.FieldB, req.FieldC, req.FieldD, req.FieldE, req.FieldF)
if err != nil {
tx.Rollback()
outChan <- InsertResult{Error: err}
return
}
id, err := res.LastInsertId()
if err != nil {
outChan <- InsertResult{Error: err}
return
}
tx.Commit()
outChan <- InsertResult{LastInsertedID: id}
}(req)
}
wg.Wait()
}()
return outChan
}
Basically, concurrently write to the database with all the inserts and send the results down a buffered channel of the same length.
Does anyone have anything better than this?