Am trying to build transaction in mongodb with retries functionality as similar to other drivers like nodejs etc. This is my current implementation
if session, err = client.StartSession(); err != nil {
return err
}
if err = session.StartTransaction(); err != nil {
return err
}
if err = mongo.WithSession(ctx, session, func(sc mongo.SessionContext) error {
if result, err = collection.UpdateOne(sc, bson.M{"_id": id}, update); err != nil {
_ = session.AbortTransaction(sc)
return err
}
if result.MatchedCount != 1 || result.ModifiedCount != 1 {
_ = session.AbortTransaction(sc)
return error.New("no match")
}
if err = session.CommitTransaction(sc); err != nil {
_ = session.AbortTransaction(sc)
}
return nil
}); err != nil {
// what needs to be handled here?
// If its a particular error type can i retry transaction here?
// should i abort transaction here?
return err
}
session.EndSession(ctx)
If its a particular error type how can i go about retry a transaction?
Also do we need to abort transaction everytime a commit fails or returning an error will automatically cancels the transaction?
I am not able to find much examples on how to implement this right
After doing some research, I could put up an example similar to other mongo drivers.
func trans() error {
transactionCodeFunc := func(sctx mongo.SessionContext) error {
err := sctx.StartTransaction(options.Transaction().
SetReadConcern(readconcern.Snapshot()).
SetWriteConcern(writeconcern.New(writeconcern.WMajority())),
)
if err != nil {
return err
}
// Transaction - Attempt an stripe charge
err = empService.Update(sctx, filter, update)
if err != nil {
sctx.AbortTransaction(sctx)
return err
}
// Transaction - Attempt an stripe charge
err = empService.Update(sctx, filter2, update2)
if err != nil {
sctx.AbortTransaction(sctx)
return err
}
return commitWithRetry(sctx)
}
return db.Client().UseSessionWithOptions(
ctx, options.Session().SetDefaultReadPreference(readpref.Primary()),
func(sctx mongo.SessionContext) error {
return runTransactionWithRetry(sctx, transactionCodeFunc)
},
)
}
func runTransactionWithRetry(sctx mongo.SessionContext, txnFn
func(mongo.SessionContext) error) error {
for {
err := txnFn(sctx) // Performs transaction.
if err == nil {
return nil
}
log.Println("Transaction aborted. Caught exception during transaction.")
// If transient error, retry the whole transaction
if cmdErr, ok := err.(mongo.CommandError); ok && cmdErr.HasErrorLabel("TransientTransactionError") {
log.Println("TransientTransactionError, retrying transaction...")
continue
}
return err
}
}
func commitWithRetry(sctx mongo.SessionContext) error {
for {
err := sctx.CommitTransaction(sctx)
switch e := err.(type) {
case nil:
fmt.Println("Transaction comitted")
return nil
case mongo.CommandError:
if e.HasErrorLabel("UnknownTransactionCommitResult") {
log.Println("UnknownTransactionCommitResult, retrying commit operation")
continue
}
log.Println("Error during commit...")
return e
default:
log.Println("Error during commit...")
return e
}
}
}