The following code snippets are taken from a program that aims at reading documents from a MongoDB and writing them to a Postgres database. The program is implemented using the producer/consumer pattern: A producer Goroutine reads from Mongo and sends the documents fetched to a channel.
The consumer Goroutine reads from the channel, constructs an INSERT INTO
sql statement and inserts the data into the Postgres database.
Unfortunately the consumer seems to block indeterministically. I believe it happens when a map
data structure that gets passed over a chan
from producer to consumer is accessed, but can't tell for sure.
The simplified code of the producer:
func producer(ops chan BatchOp, ...) {
// Iterate over all tables that we want to fetch documents from
for _, table := range schema.Schema[0].Tables {
// Run some aggregation to fetch the documents and create an iterator
pipe := mongoDb.C(table.Collection).Pipe(table.Pipeline)
iter := pipe.Batch(200).AllowDiskUse().Iter()
const RESULTS_BUFFER_SIZE = 200
result := bson.M{}
results := make([]bson.M, 0, RESULTS_BUFFER_SIZE)
// The flush function sends a batch of data into the channel
flush := func() {
fmt.Printf("Flushing %d items
", len(results))
ops <- BatchOp{
Insert,
table.Collection,
results,
}
results = make([]bson.M, 0, RESULTS_BUFFER_SIZE)
}
// Iterate over the results from MongoDB and append each document
// to the results slice
for iter.Next(&result) {
results = append(results, bson.M{
"_id": result["_id"],
"name": result["Name"],
"i": result["i"],
})
// ... flush the results slice as soon as its big enough
if len(results) == RESULTS_BUFFER_SIZE {
flush()
}
}
// Flush the last contents of the results slice
if len(results) > 0 {
flush()
}
if err := iter.Close(); err != nil {
panic(err)
}
}
}
type BatchOp struct {
data []bson.M // bson.M is defined as: type M map[string]interface{}
}
ops := make(chan BatchOp)
Unfortunately the program blocks indeterministically when the consumer accesses the data
:
func writer(schema *db.SchemaSchema, ops chan BatchOp, psql *sql.DB) {
// ...
for {
op := <-ops
fmt.Printf("Writing %d rows
", len(op.data))
_, err := psql.Exec(db.GenerateInsert(table, op.data))
// ...
fmt.Printf("Wrote %d rows
", len(op.data))
}
}
// Generate the INSERT INTO statement for a slice of documents from MongoDB
func GenerateInsert(table Table, data []bson.M) (sql string) {
sql = fmt.Sprintf(`INSERT INTO "%s" (`, table.Table)
for i, column := range table.Columns {
sql += `"` + column.SqlName + `"`
if i < len(table.Columns)-1 {
sql += ", "
}
}
sql += ") VALUES "
for i, row := range data {
sql += "("
for j, column := range table.Columns {
// !!!!!! THIS !!!!!!! is the row that blocks and would just not get called sometimes
switch v := row[column.Name].(type) {
case nil:
sql += "NULL"
case int:
sql += fmt.Sprintf(`%v`, v)
default:
sql += fmt.Sprintf(`'%v'`, v)
}
if j < len(table.Columns)-1 {
sql += ", "
}
}
sql += ")"
if i < len(data)-1 {
sql += ", "
}
}
return sql
}
row[column.Name]
to block? I believe it has to do with the fact that a deeply hierarchical map
is sent over a chan.Edit July 24
* The reference to the full code, since commenters asked for it: https://github.com/erikmuttersbach/mongo2psql
go run insert.go
1111 documents are inserted into the foos
collection.go run main.go
I get the following output: Creating table foos Creating table bars Seeding table foos from collection foos Flushing 200 items Writing 200 rows Flushing 200 items Wrote 200 rows Writing 200 rows Flushing 200 items Wrote 200 rows Writing 200 rows Flushing 200 items Wrote 200 rows Writing 200 rows Flushing 200 items Wrote 200 rows Writing 200 rows Flushing 111 items Wrote 200 rows Writing 111 rows Seeding table bars from collection bars Seeding done
Writing 111 rows
I would expect another line Wrote 111 rows
that does not appear (it also only happens sometimes).Wrote 111 rows
to show, when you replace the function body of GenerateInsert
with a simple return "SELECT 1"
. As access to the channel messages is omitted in this case I assumed that it has to do with the access to the data
of the channel messages.** Edit July 24 (2)** When I kill the program with SIGQUIT, I get the following output:
^\SIGQUIT: quit
PC=0x10566a3 m=0 sigcode=0
goroutine 0 [idle]:
runtime.mach_semaphore_timedwait(0xf03, 0x186a0, 0x7fff5fbff3ec, 0x0, 0x186a05fbff428, 0x1455b60, 0x7fff5fbff420, 0x1050d63, 0x186a0, 0xffffffff, ...)
/usr/local/Cellar/go/1.8.3/libexec/src/runtime/sys_darwin_amd64.s:425 +0x13
runtime.semasleep1(0x186a0, 0xffffffff)
/usr/local/Cellar/go/1.8.3/libexec/src/runtime/os_darwin.go:402 +0xe1
runtime.semasleep.func1()
/usr/local/Cellar/go/1.8.3/libexec/src/runtime/os_darwin.go:432 +0x33
runtime.systemstack(0x7fff5fbff448)
/usr/local/Cellar/go/1.8.3/libexec/src/runtime/asm_amd64.s:343 +0xab
runtime.semasleep(0x186a0, 0xffffffff)
/usr/local/Cellar/go/1.8.3/libexec/src/runtime/os_darwin.go:433 +0x4d
runtime.notetsleep_internal(0x1455808, 0x186a0, 0x1455b60, 0x14d44e02e4b89c75, 0x1455b00)
/usr/local/Cellar/go/1.8.3/libexec/src/runtime/lock_sema.go:198 +0x79
runtime.notetsleep(0x1455808, 0x186a0, 0x0)
/usr/local/Cellar/go/1.8.3/libexec/src/runtime/lock_sema.go:246 +0x75
runtime.stopTheWorldWithSema()
/usr/local/Cellar/go/1.8.3/libexec/src/runtime/proc.go:1029 +0x274
runtime.systemstack(0x1455b00)
/usr/local/Cellar/go/1.8.3/libexec/src/runtime/asm_amd64.s:327 +0x79
runtime.mstart()
/usr/local/Cellar/go/1.8.3/libexec/src/runtime/proc.go:1132
goroutine 26 [running]:
runtime.systemstack_switch()
/usr/local/Cellar/go/1.8.3/libexec/src/runtime/asm_amd64.s:281 fp=0xc42004d608 sp=0xc42004d600
runtime.gcStart(0x0, 0x12ec200)
/usr/local/Cellar/go/1.8.3/libexec/src/runtime/mgc.go:1010 +0x208 fp=0xc42004d640 sp=0xc42004d608
runtime.mallocgc(0x2000, 0x0, 0xc420114300, 0xc420248000)
/usr/local/Cellar/go/1.8.3/libexec/src/runtime/malloc.go:785 +0x491 fp=0xc42004d6e0 sp=0xc42004d640
runtime.rawstring(0x1f60, 0x0, 0x0, 0x0, 0x0, 0x0)
/usr/local/Cellar/go/1.8.3/libexec/src/runtime/string.go:237 +0x85 fp=0xc42004d710 sp=0xc42004d6e0
runtime.rawstringtmp(0x0, 0x1f60, 0x13, 0x1f5e, 0xc420248000, 0x1f5e, 0x1f5e)
/usr/local/Cellar/go/1.8.3/libexec/src/runtime/string.go:107 +0x78 fp=0xc42004d750 sp=0xc42004d710
runtime.concatstrings(0x0, 0xc42004d830, 0x2, 0x2, 0xc420248000, 0x1f5e)
/usr/local/Cellar/go/1.8.3/libexec/src/runtime/string.go:46 +0xf9 fp=0xc42004d7e8 sp=0xc42004d750
runtime.concatstring2(0x0, 0xc420248000, 0x1f5e, 0x130caaa, 0x2, 0xc420248000, 0x1f5e)
/usr/local/Cellar/go/1.8.3/libexec/src/runtime/string.go:55 +0x47 fp=0xc42004d828 sp=0xc42004d7e8
_/Users/erikmuttersbach/Projects/mongo2psql-go/db.GenerateInsert(0xc420103870, 0x4, 0xc420103790, 0x4, 0xc4201098c0, 0x1, 0x4, 0xc420124100, 0x3, 0x4, ...)
/Users/erikmuttersbach/Projects/mongo2psql-go/db/sql.go:31 +0x46f fp=0xc42004da10 sp=0xc42004d828
main.writer(0xc420108200, 0xc4200728a0, 0xc4200c0280)
/Users/erikmuttersbach/Projects/mongo2psql-go/main.go:111 +0x3b4 fp=0xc42004dfc8 sp=0xc42004da10
runtime.goexit()
/usr/local/Cellar/go/1.8.3/libexec/src/runtime/asm_amd64.s:2197 +0x1 fp=0xc42004dfd0 sp=0xc42004dfc8
created by main.main
/Users/erikmuttersbach/Projects/mongo2psql-go/main.go:181 +0x135
goroutine 1 [running]:
goroutine running on other thread; stack unavailable
goroutine 17 [syscall, 10 minutes, locked to thread]:
runtime.goexit()
/usr/local/Cellar/go/1.8.3/libexec/src/runtime/asm_amd64.s:2197 +0x1
goroutine 20 [chan receive, 10 minutes]:
database/sql.(*DB).connectionOpener(0xc4200c0280)
/usr/local/Cellar/go/1.8.3/libexec/src/database/sql/sql.go:837 +0x4a
created by database/sql.Open
/usr/local/Cellar/go/1.8.3/libexec/src/database/sql/sql.go:582 +0x212
goroutine 22 [sleep, 10 minutes]:
time.Sleep(0x1dcd6500)
/usr/local/Cellar/go/1.8.3/libexec/src/runtime/time.go:59 +0xf9
gopkg.in/mgo%2ev2.(*mongoCluster).syncServersLoop(0xc42006c500)
/Users/erikmuttersbach/go/src/gopkg.in/mgo.v2/cluster.go:368 +0x471
created by gopkg.in/mgo%2ev2.newCluster
/Users/erikmuttersbach/go/src/gopkg.in/mgo.v2/cluster.go:78 +0x188
goroutine 50 [sleep, 10 minutes]:
time.Sleep(0x37e11d600)
/usr/local/Cellar/go/1.8.3/libexec/src/runtime/time.go:59 +0xf9
gopkg.in/mgo%2ev2.(*mongoServer).pinger(0xc42010e000, 0x1)
/Users/erikmuttersbach/go/src/gopkg.in/mgo.v2/server.go:301 +0x293
created by gopkg.in/mgo%2ev2.newServer
/Users/erikmuttersbach/go/src/gopkg.in/mgo.v2/server.go:89 +0x166
goroutine 25 [select, 10 minutes]:
main.tailer(0xc4200728a0, 0xc4200fe0a0)
/Users/erikmuttersbach/Projects/mongo2psql-go/main.go:67 +0x4d0
created by main.main
/Users/erikmuttersbach/Projects/mongo2psql-go/main.go:180 +0xff
goroutine 38 [IO wait]:
net.runtime_pollWait(0x1664df0, 0x72, 0xa)
/usr/local/Cellar/go/1.8.3/libexec/src/runtime/netpoll.go:164 +0x59
net.(*pollDesc).wait(0xc42006f5d8, 0x72, 0x1438e40, 0x14363d0)
/usr/local/Cellar/go/1.8.3/libexec/src/net/fd_poll_runtime.go:75 +0x38
net.(*pollDesc).waitRead(0xc42006f5d8, 0xc42007d6b0, 0x24)
/usr/local/Cellar/go/1.8.3/libexec/src/net/fd_poll_runtime.go:80 +0x34
net.(*netFD).Read(0xc42006f570, 0xc42007d6b0, 0x24, 0x24, 0x0, 0x1438e40, 0x14363d0)
/usr/local/Cellar/go/1.8.3/libexec/src/net/fd_unix.go:250 +0x1b7
net.(*conn).Read(0xc4200e0010, 0xc42007d6b0, 0x24, 0x24, 0x0, 0x0, 0x0)
/usr/local/Cellar/go/1.8.3/libexec/src/net/net.go:181 +0x70
gopkg.in/mgo%2ev2.fill(0x143cce0, 0xc4200e0010, 0xc42007d6b0, 0x24, 0x24, 0x0, 0x53)
/Users/erikmuttersbach/go/src/gopkg.in/mgo.v2/socket.go:535 +0x53
gopkg.in/mgo%2ev2.(*mongoSocket).readLoop(0xc420164000)
/Users/erikmuttersbach/go/src/gopkg.in/mgo.v2/socket.go:551 +0x107
created by gopkg.in/mgo%2ev2.newSocket
/Users/erikmuttersbach/go/src/gopkg.in/mgo.v2/socket.go:194 +0x259
goroutine 3 [runnable]:
net.runtime_pollWait(0x1664eb0, 0x72, 0x9)
/usr/local/Cellar/go/1.8.3/libexec/src/runtime/netpoll.go:164 +0x59
net.(*pollDesc).wait(0xc4201060d8, 0x72, 0x1438e40, 0x14363d0)
/usr/local/Cellar/go/1.8.3/libexec/src/net/fd_poll_runtime.go:75 +0x38
net.(*pollDesc).waitRead(0xc4201060d8, 0xc4201000c0, 0x24)
/usr/local/Cellar/go/1.8.3/libexec/src/net/fd_poll_runtime.go:80 +0x34
net.(*netFD).Read(0xc420106070, 0xc4201000c0, 0x24, 0x24, 0x0, 0x1438e40, 0x14363d0)
/usr/local/Cellar/go/1.8.3/libexec/src/net/fd_unix.go:250 +0x1b7
net.(*conn).Read(0xc42000e010, 0xc4201000c0, 0x24, 0x24, 0x0, 0x0, 0x0)
/usr/local/Cellar/go/1.8.3/libexec/src/net/net.go:181 +0x70
gopkg.in/mgo%2ev2.fill(0x143cce0, 0xc42000e010, 0xc4201000c0, 0x24, 0x24, 0x0, 0x0)
/Users/erikmuttersbach/go/src/gopkg.in/mgo.v2/socket.go:535 +0x53
gopkg.in/mgo%2ev2.(*mongoSocket).readLoop(0xc420118000)
/Users/erikmuttersbach/go/src/gopkg.in/mgo.v2/socket.go:551 +0x107
created by gopkg.in/mgo%2ev2.newSocket
/Users/erikmuttersbach/go/src/gopkg.in/mgo.v2/socket.go:194 +0x259
goroutine 52 [select, 10 minutes]:
github.com/rwynn/gtm.FetchDocuments(0xc4200fe0a0, 0xc420063860, 0x131fa40, 0xc420100120, 0xc4201122a0, 0x0, 0x0)
/Users/erikmuttersbach/go/src/github.com/rwynn/gtm/gtm.go:498 +0x3a7
created by github.com/rwynn/gtm.Start
/Users/erikmuttersbach/go/src/github.com/rwynn/gtm/gtm.go:673 +0x5fc
goroutine 53 [semacquire, 10 minutes]:
sync.runtime_notifyListWait(0xc420150108, 0xc400000001)
/usr/local/Cellar/go/1.8.3/libexec/src/runtime/sema.go:298 +0x10b
sync.(*Cond).Wait(0xc4201500f8)
/usr/local/Cellar/go/1.8.3/libexec/src/sync/cond.go:57 +0x89
gopkg.in/mgo%2ev2.(*Iter).Next(0xc4201500f0, 0x12c45e0, 0xc42007d590, 0x0)
/Users/erikmuttersbach/go/src/gopkg.in/mgo.v2/session.go:3704 +0x9f
github.com/rwynn/gtm.TailOps(0xc4200fe0a0, 0xc420063860, 0xc42010a008, 0x1, 0x1, 0xc420110090, 0x0, 0x0)
/Users/erikmuttersbach/go/src/github.com/rwynn/gtm/gtm.go:365 +0x1ea
created by github.com/rwynn/gtm.Start
/Users/erikmuttersbach/go/src/github.com/rwynn/gtm/gtm.go:685 +0x8ea
rax 0xe
rbx 0x0
rcx 0x7fff5fbff3c0
rdx 0x186a0
rdi 0xf03
rsi 0x0
rbp 0x7fff5fbff3f8
rsp 0x7fff5fbff3c0
r8 0x43a260
r9 0x11
r10 0x653ef1e26c7c
r11 0x202
r12 0x1ad50d58ee7b
r13 0x1cbe784ba3b8
r14 0x14d44e02db750c00
r15 0x1
rip 0x10566a3
rflags 0x202
cs 0x7
fs 0x0
gs 0x0
exit status 2
The problem is the garbage collector, which can be seen from the stack trace (after sending SIGQUIT to the program):
goroutine 37 [running]:
runtime.systemstack_switch()
/usr/local/Cellar/go/1.8.3/libexec/src/runtime/asm_amd64.s:281 fp=0xc420145608 sp=0xc420145600
runtime.gcStart(0x0, 0xc420366000)
/usr/local/Cellar/go/1.8.3/libexec/src/runtime/mgc.go:1010 +0x208 fp=0xc420145640 sp=0xc420145608
runtime.mallocgc(0x2500, 0x0, 0xc420106100, 0xc420145728)
/usr/local/Cellar/go/1.8.3/libexec/src/runtime/malloc.go:785 +0x491 fp=0xc4201456e0 sp=0xc420145640
runtime.rawstring(0x20e9, 0x0, 0x0, 0x0, 0x0, 0x0)
/usr/local/Cellar/go/1.8.3/libexec/src/runtime/string.go:237 +0x85 fp=0xc420145710 sp=0xc4201456e0
runtime.rawstringtmp(0x0, 0x20e9, 0x29, 0xc4201457c0, 0xc4200bdb80, 0xc420106ff0, 0xc420106120)
/usr/local/Cellar/go/1.8.3/libexec/src/runtime/string.go:107 +0x78 fp=0xc420145750 sp=0xc420145710
runtime.concatstrings(0x0, 0xc420145830, 0x2, 0x2, 0xc420106ff0, 0xc4201b8000)
/usr/local/Cellar/go/1.8.3/libexec/src/runtime/string.go:46 +0xf9 fp=0xc4201457e8 sp=0xc420145750
runtime.concatstring2(0x0, 0xc42051d900, 0x20c0, 0xc420106ff0, 0x29, 0xc420106ff0, 0x29)
/usr/local/Cellar/go/1.8.3/libexec/src/runtime/string.go:55 +0x47 fp=0xc420145828 sp=0xc4201457e8
_/Users/erikmuttersbach/Projects/mongo2psql-go/db.GenerateInsert(0xc4200c9730, 0x4, 0xc4200c9650, 0x4, 0xc4200e1860, 0x1, 0x4, 0xc42006c400, 0x3, 0x4, ...)
/Users/erikmuttersbach/Projects/mongo2psql-go/db/sql.go:27 +0x407 fp=0xc420145a10 sp=0xc420145828
main.writer(0xc4200e01a0, 0xc42015ccc0, 0xc420014320)
/Users/erikmuttersbach/Projects/mongo2psql-go/main.go:112 +0x3b4 fp=0xc420145fc8 sp=0xc420145a10
runtime.goexit()
/usr/local/Cellar/go/1.8.3/libexec/src/runtime/asm_amd64.s:2197 +0x1 fp=0xc420145fd0 sp=0xc420145fc8
created by main.main
/Users/erikmuttersbach/Projects/mongo2psql-go/main.go:202 +0x135
The line runtime.gcStart(0x0, 0xc420366000)
more specifically hints you to the fact that the GC wants to kick-in, but is blocked. The GC in fact was blocked, because I started my "never ending" go routines like this:
func main() {
...
go writer()
go producer()
for {}
}
The infinite for
loop blocks the proc and lets the garbage collector never time to kick in.