On app engine I have a large number of entities of a particular kind. I want to run a function on each entity (e.g. edit the entity or copy it) I would do this in a taskqueue but a taskqueue is limited to 10 minutes runtime and each function call is prone to many kinds of errors. What is the best way to do this?
Here's my solution although I'm hoping someone out there has a better solution. I also wonder if this is prone to fork bombs e.g. if the task runs twice, it will set off two chains of iteration.. ! I'm only using it to iterate a few hundred thousand entities, although the operation on each entity is expensive.
First I create a taskqueue for running each individual function call on an entity one at a time:
queue:
- name: entity-iter
rate: 100/s
max_concurrent_requests: 1
retry_parameters:
task_retry_limit: 3
task_age_limit: 30m
min_backoff_seconds: 200
and then I have an iterate entity method which, given the kind, will call your delay func on each entity with the key.
package sysadmin
import (
"google.golang.org/appengine/datastore"
"golang.org/x/net/context"
"google.golang.org/appengine/log"
"google.golang.org/appengine/delay"
"google.golang.org/appengine/taskqueue"
)
func ForEachEntity(kind string, f *delay.Function) *delay.Function {
var callWithNextKey *delay.Function // func(c context.Context, depth int, cursorString string) error
callWithNextKey = delay.Func("something", func(c context.Context, depth int, cursorString string) error {
q := datastore.NewQuery(kind).KeysOnly()
if cursorString != "" {
if curs, err := datastore.DecodeCursor(cursorString); err != nil {
log.Errorf(c, "error decoding cursor %v", err)
return err
} else {
q = q.Start(curs)
}
}
it := q.Run(c)
if key, err := it.Next(nil); err != nil {
if err == datastore.Done {
log.Infof(c, "Done %v", err)
return nil
}
log.Errorf(c, "datastore error %v", err)
return err
} else {
curs, _ := it.Cursor()
if t, err := f.Task(key); err != nil {
return err
} else if _, err = taskqueue.Add(c, t, "entity-iter"); err != nil {
log.Errorf(c, "error %v", err)
return err
}
if depth - 1 > 0 {
if err := callWithNextKey.Call(c, depth - 1, curs.String()); err != nil {
log.Errorf(c, "error2 %v", err)
return err
}
}
}
return nil
})
return callWithNextKey
}
example usage:
var DoCopyCourse = delay.Func("something2", CopyCourse)
var DoCopyCourses = ForEachEntity("Course", DoCopyCourse)
func CopyCourses(c context.Context) {
//sharedmodels.MakeMockCourses(c)
DoCopyCourses.Call(c, 9999999, "")
}