使用Golang及时提交数据库

I successfully "batched" many statements in lots of 500-1000 rows to be Inserted at once. However this was using simple for loop and manually setting it to 500-1000 loops. Something like:

for i:=0;i<500;i++ {
   // Create a string of 500 values to be inserted at once
}
// Insert the 500 values

Is there a way I could timely commit() like: "commit each seconds" ?

Conceptually, I'd like to have something like;

// Create connection to DB
// Begin a transaction
// Prepare a statement

go timelyCommits(tx)  // spawn a commit ticker
for {
   // Constantly create string of values to be inserted like:
   // Values (1, "one"),(2,"two"),(3,"three")...(1000,"thousand")...
   // Insert without commit
}

func timelyCommits(tx){
   for {
      time.Sleep(1 * time.Second)
      tx.Commit()
   }
}

Optimization is not a trivial task and may also involving database tuning, etc. Without detail knowledge about the system that you're trying to implement, it is difficult to give a proper advice. In addition to what already suggested in the answer, you may need to implement a kind of buffering, e.g. with channel which has fixed capacity. Then when buffer is FULL or timer EXPIRED, build the query then perform BULK INSERT in transaction. Try it at The Go Playground.

package main

import (
    "fmt"
    "time"
)

type DataBuffer struct {
    capacity int
    duration time.Duration

    incomingData chan interface{}
    full chan bool
    mustExit chan bool
    done chan bool

    query string
    args []interface{}
}

func NewDataBuffer(capacity int, dur time.Duration) *DataBuffer {
    buf := &DataBuffer {
        incomingData: make(chan interface{}, capacity),
        full: make(chan bool),
        args: make([]interface{}, capacity),
        duration: dur,
        mustExit: make(chan bool, 1),
        done: make(chan bool, 1),
    }
    return buf
}

func (b *DataBuffer) Append(d interface{}) {
    if !b.put(d) {
        //Notify that buffer is full
        //<- will wait until space available
        b.full <- true
        b.incomingData <- d
    }
}

func (b *DataBuffer) put(d interface{}) bool {
    //Try to append the data
    //If channel is full, do nothing, then return false
    select {
    case b.incomingData <- d:
        return true
    default:
        //channel is full
        return false
    }
}

func (b *DataBuffer) execTransaction() error {
    /*
        Begin transaction
        Insert Data Group 
        Commit/rollback
    */

    fmt.Print(time.Now())
    fmt.Println(b.query)
    fmt.Println(b.args)

    return nil
}

func (b *DataBuffer) clear() {
    //clear args
    nOldArg := len(b.args)
    for k := 0; k < nOldArg; k++ {
        b.args[k] = nil
    }
    b.args = b.args[:0]
    b.query = ""
}

func (b *DataBuffer) buildQuery() bool {
    ndata := len(b.incomingData)
    if ndata == 0 {
        return false
    }

    k := 0
    b.clear()

    //Build the query, adjust as needed
    b.query = "QUERY:"
    for data := range b.incomingData {
        b.query += fmt.Sprintf(" q%d", k) //build the query
        b.args = append(b.args, data)

        k++
        if k >= ndata {
            break
        }

    }
    return true
}

func (b *DataBuffer) doInsert() {
    if b.buildQuery() {
        b.execTransaction()
    }
}

func (b *DataBuffer) runAsync() {
    defer func() {
        b.doInsert()
        fmt.Println("Last insert")
        close(b.done)
    }()

    timer := time.NewTimer(b.duration)
    for {
        select {
        case <- timer.C:
            b.doInsert()
            fmt.Println("Timer Expired")
            timer.Reset(b.duration)
        case <- b.full:
            if !timer.Stop() {
                <-timer.C
            }
            b.doInsert()
            fmt.Println("Full")
            timer.Reset(b.duration)
        case <- b.mustExit:
            if !timer.Stop() {
                <-timer.C
            }
            return  
        }
    }
}

func (b *DataBuffer) Run() {
    go b.runAsync()
}
func (b *DataBuffer) Stop() {
    b.mustExit <- true
}

func (b *DataBuffer) WaitDone() {
    <- b.done
}

func main() {
    buf := NewDataBuffer(5, 1*time.Second)
    buf.Run()

    //simulate incoming data
    for k := 0; k < 30; k++ {
        buf.Append(k)
        time.Sleep(time.Duration(10*k)*time.Millisecond)
    }
    buf.Stop()
    buf.WaitDone()  
}

Note:

  • You need to implement proper error handling.
  • The type of incomingData may be adjusted to your need

You can do this using a goroutine that does the following:

  • creates a new transaction and prepares statements
  • continuously reads from an input channel (this receives the data from other parts of your program)
  • times out after 10ms and commits the transaction, then creates a new one to accumulate more data

This function could look something like:

func transactionLoop(input chan Data) {
    tx := CreateNewTransaction() // This creates a new TX and prepares statements
    timeout := time.NewTimer(time.Millisecond * 10)
    for {
        select {
        case newData <- input:
            tx.Insert(newData)
        case <- timeout.C:
            // Too much time has passed: commit and create new tx.
            tx.Commit()
            // Create new transaction and launch timer.
            tx = CreateNewTransaction()
            timeout.Reset(time.Millisecond * 10)
        }
    }
}

This may do what you want, though there are quite a few details/problems left:

  • there's absolutely no concept of passing errors back to the callers: this is a terrible idea
  • shouldn't commit empty transactions
  • if both channels are ready at the same time, the order is random (should be fine)
  • you may want to commit in another goroutine, though you could then run into trouble if your commits take longer than 10ms.
  • I haven't tested this, just jotting things down