访问地图上的Goroutine块

The following code snippets are taken from a program that aims at reading documents from a MongoDB and writing them to a Postgres database. The program is implemented using the producer/consumer pattern: A producer Goroutine reads from Mongo and sends the documents fetched to a channel.
The consumer Goroutine reads from the channel, constructs an INSERT INTO sql statement and inserts the data into the Postgres database.

Unfortunately the consumer seems to block indeterministically. I believe it happens when a map data structure that gets passed over a chan from producer to consumer is accessed, but can't tell for sure.

The simplified code of the producer:

func producer(ops chan BatchOp, ...) {
    // Iterate over all tables that we want to fetch documents from
    for _, table := range schema.Schema[0].Tables {

        // Run some aggregation to fetch the documents and create an iterator
        pipe := mongoDb.C(table.Collection).Pipe(table.Pipeline)
        iter := pipe.Batch(200).AllowDiskUse().Iter()

        const RESULTS_BUFFER_SIZE = 200


        result := bson.M{}
        results := make([]bson.M, 0, RESULTS_BUFFER_SIZE)

        // The flush function sends a batch of data into the channel
        flush := func() {
            fmt.Printf("Flushing %d items
", len(results))
            ops <- BatchOp{
                Insert,
                table.Collection,
                results,
            }
            results = make([]bson.M, 0, RESULTS_BUFFER_SIZE)
        }

        // Iterate over the results from MongoDB and append each document 
        // to the results slice
        for iter.Next(&result) {
            results = append(results, bson.M{
                "_id": result["_id"],
                "name": result["Name"],
                "i": result["i"],
            })

            // ... flush the results slice as soon as its big enough
            if len(results) == RESULTS_BUFFER_SIZE {
                flush()
            }
        }

        // Flush the last contents of the results slice
        if len(results) > 0 {
            flush()
        }

        if err := iter.Close(); err != nil {
            panic(err)
        }
    }
}


type BatchOp struct {
    data       []bson.M // bson.M is defined as: type M map[string]interface{} 
}
ops := make(chan BatchOp)

Unfortunately the program blocks indeterministically when the consumer accesses the data:

func writer(schema *db.SchemaSchema, ops chan BatchOp, psql *sql.DB) {
    // ...

    for {
        op := <-ops

        fmt.Printf("Writing %d rows
", len(op.data))
        _, err := psql.Exec(db.GenerateInsert(table, op.data))
        // ...
        fmt.Printf("Wrote %d rows
", len(op.data))
    }
}

// Generate the INSERT INTO statement for a slice of documents from MongoDB
func GenerateInsert(table Table, data []bson.M) (sql string) {
    sql = fmt.Sprintf(`INSERT INTO "%s" (`, table.Table)
    for i, column := range table.Columns {
        sql += `"` + column.SqlName + `"`
        if i < len(table.Columns)-1 {
            sql += ", "
        }
    }

    sql += ") VALUES "
    for i, row := range data {
        sql += "("
        for j, column := range table.Columns {

            // !!!!!! THIS !!!!!!! is the row that blocks and would just not get called sometimes
            switch v := row[column.Name].(type) {
            case nil:
                sql += "NULL"
            case int:
                sql += fmt.Sprintf(`%v`, v)
            default:
                sql += fmt.Sprintf(`'%v'`, v)
            }

            if j < len(table.Columns)-1 {
                sql += ", "
            }
        }
        sql += ")"

        if i < len(data)-1 {
            sql += ", "
        }
    }

    return sql
}
  • What could be problems that lead this statement row[column.Name] to block? I believe it has to do with the fact that a deeply hierarchical map is sent over a chan.
  • How could I have debugged this? GDB?
  • Is it best practice to implement producer-consumer like this?

Edit July 24
* The reference to the full code, since commenters asked for it: https://github.com/erikmuttersbach/mongo2psql

  • Sample output that lead me to the conclusion that the code blocks: When I run go run insert.go 1111 documents are inserted into the foos collection.
    When I subsequently run go run main.go I get the following output: Creating table foos Creating table bars Seeding table foos from collection foos Flushing 200 items Writing 200 rows Flushing 200 items Wrote 200 rows Writing 200 rows Flushing 200 items Wrote 200 rows Writing 200 rows Flushing 200 items Wrote 200 rows Writing 200 rows Flushing 200 items Wrote 200 rows Writing 200 rows Flushing 111 items Wrote 200 rows Writing 111 rows Seeding table bars from collection bars Seeding done
    After the line Writing 111 rows I would expect another line Wrote 111 rows that does not appear (it also only happens sometimes).
    You CAN get Wrote 111 rows to show, when you replace the function body of GenerateInsert with a simple return "SELECT 1". As access to the channel messages is omitted in this case I assumed that it has to do with the access to the data of the channel messages.

** Edit July 24 (2)** When I kill the program with SIGQUIT, I get the following output:

^\SIGQUIT: quit
PC=0x10566a3 m=0 sigcode=0

goroutine 0 [idle]:
runtime.mach_semaphore_timedwait(0xf03, 0x186a0, 0x7fff5fbff3ec, 0x0, 0x186a05fbff428, 0x1455b60, 0x7fff5fbff420, 0x1050d63, 0x186a0, 0xffffffff, ...)
    /usr/local/Cellar/go/1.8.3/libexec/src/runtime/sys_darwin_amd64.s:425 +0x13
runtime.semasleep1(0x186a0, 0xffffffff)
    /usr/local/Cellar/go/1.8.3/libexec/src/runtime/os_darwin.go:402 +0xe1
runtime.semasleep.func1()
    /usr/local/Cellar/go/1.8.3/libexec/src/runtime/os_darwin.go:432 +0x33
runtime.systemstack(0x7fff5fbff448)
    /usr/local/Cellar/go/1.8.3/libexec/src/runtime/asm_amd64.s:343 +0xab
runtime.semasleep(0x186a0, 0xffffffff)
    /usr/local/Cellar/go/1.8.3/libexec/src/runtime/os_darwin.go:433 +0x4d
runtime.notetsleep_internal(0x1455808, 0x186a0, 0x1455b60, 0x14d44e02e4b89c75, 0x1455b00)
    /usr/local/Cellar/go/1.8.3/libexec/src/runtime/lock_sema.go:198 +0x79
runtime.notetsleep(0x1455808, 0x186a0, 0x0)
    /usr/local/Cellar/go/1.8.3/libexec/src/runtime/lock_sema.go:246 +0x75
runtime.stopTheWorldWithSema()
    /usr/local/Cellar/go/1.8.3/libexec/src/runtime/proc.go:1029 +0x274
runtime.systemstack(0x1455b00)
    /usr/local/Cellar/go/1.8.3/libexec/src/runtime/asm_amd64.s:327 +0x79
runtime.mstart()
    /usr/local/Cellar/go/1.8.3/libexec/src/runtime/proc.go:1132

goroutine 26 [running]:
runtime.systemstack_switch()
    /usr/local/Cellar/go/1.8.3/libexec/src/runtime/asm_amd64.s:281 fp=0xc42004d608 sp=0xc42004d600
runtime.gcStart(0x0, 0x12ec200)
    /usr/local/Cellar/go/1.8.3/libexec/src/runtime/mgc.go:1010 +0x208 fp=0xc42004d640 sp=0xc42004d608
runtime.mallocgc(0x2000, 0x0, 0xc420114300, 0xc420248000)
    /usr/local/Cellar/go/1.8.3/libexec/src/runtime/malloc.go:785 +0x491 fp=0xc42004d6e0 sp=0xc42004d640
runtime.rawstring(0x1f60, 0x0, 0x0, 0x0, 0x0, 0x0)
    /usr/local/Cellar/go/1.8.3/libexec/src/runtime/string.go:237 +0x85 fp=0xc42004d710 sp=0xc42004d6e0
runtime.rawstringtmp(0x0, 0x1f60, 0x13, 0x1f5e, 0xc420248000, 0x1f5e, 0x1f5e)
    /usr/local/Cellar/go/1.8.3/libexec/src/runtime/string.go:107 +0x78 fp=0xc42004d750 sp=0xc42004d710
runtime.concatstrings(0x0, 0xc42004d830, 0x2, 0x2, 0xc420248000, 0x1f5e)
    /usr/local/Cellar/go/1.8.3/libexec/src/runtime/string.go:46 +0xf9 fp=0xc42004d7e8 sp=0xc42004d750
runtime.concatstring2(0x0, 0xc420248000, 0x1f5e, 0x130caaa, 0x2, 0xc420248000, 0x1f5e)
    /usr/local/Cellar/go/1.8.3/libexec/src/runtime/string.go:55 +0x47 fp=0xc42004d828 sp=0xc42004d7e8
_/Users/erikmuttersbach/Projects/mongo2psql-go/db.GenerateInsert(0xc420103870, 0x4, 0xc420103790, 0x4, 0xc4201098c0, 0x1, 0x4, 0xc420124100, 0x3, 0x4, ...)
    /Users/erikmuttersbach/Projects/mongo2psql-go/db/sql.go:31 +0x46f fp=0xc42004da10 sp=0xc42004d828
main.writer(0xc420108200, 0xc4200728a0, 0xc4200c0280)
    /Users/erikmuttersbach/Projects/mongo2psql-go/main.go:111 +0x3b4 fp=0xc42004dfc8 sp=0xc42004da10
runtime.goexit()
    /usr/local/Cellar/go/1.8.3/libexec/src/runtime/asm_amd64.s:2197 +0x1 fp=0xc42004dfd0 sp=0xc42004dfc8
created by main.main
    /Users/erikmuttersbach/Projects/mongo2psql-go/main.go:181 +0x135

goroutine 1 [running]:
    goroutine running on other thread; stack unavailable

goroutine 17 [syscall, 10 minutes, locked to thread]:
runtime.goexit()
    /usr/local/Cellar/go/1.8.3/libexec/src/runtime/asm_amd64.s:2197 +0x1

goroutine 20 [chan receive, 10 minutes]:
database/sql.(*DB).connectionOpener(0xc4200c0280)
    /usr/local/Cellar/go/1.8.3/libexec/src/database/sql/sql.go:837 +0x4a
created by database/sql.Open
    /usr/local/Cellar/go/1.8.3/libexec/src/database/sql/sql.go:582 +0x212

goroutine 22 [sleep, 10 minutes]:
time.Sleep(0x1dcd6500)
    /usr/local/Cellar/go/1.8.3/libexec/src/runtime/time.go:59 +0xf9
gopkg.in/mgo%2ev2.(*mongoCluster).syncServersLoop(0xc42006c500)
    /Users/erikmuttersbach/go/src/gopkg.in/mgo.v2/cluster.go:368 +0x471
created by gopkg.in/mgo%2ev2.newCluster
    /Users/erikmuttersbach/go/src/gopkg.in/mgo.v2/cluster.go:78 +0x188

goroutine 50 [sleep, 10 minutes]:
time.Sleep(0x37e11d600)
    /usr/local/Cellar/go/1.8.3/libexec/src/runtime/time.go:59 +0xf9
gopkg.in/mgo%2ev2.(*mongoServer).pinger(0xc42010e000, 0x1)
    /Users/erikmuttersbach/go/src/gopkg.in/mgo.v2/server.go:301 +0x293
created by gopkg.in/mgo%2ev2.newServer
    /Users/erikmuttersbach/go/src/gopkg.in/mgo.v2/server.go:89 +0x166

goroutine 25 [select, 10 minutes]:
main.tailer(0xc4200728a0, 0xc4200fe0a0)
    /Users/erikmuttersbach/Projects/mongo2psql-go/main.go:67 +0x4d0
created by main.main
    /Users/erikmuttersbach/Projects/mongo2psql-go/main.go:180 +0xff

goroutine 38 [IO wait]:
net.runtime_pollWait(0x1664df0, 0x72, 0xa)
    /usr/local/Cellar/go/1.8.3/libexec/src/runtime/netpoll.go:164 +0x59
net.(*pollDesc).wait(0xc42006f5d8, 0x72, 0x1438e40, 0x14363d0)
    /usr/local/Cellar/go/1.8.3/libexec/src/net/fd_poll_runtime.go:75 +0x38
net.(*pollDesc).waitRead(0xc42006f5d8, 0xc42007d6b0, 0x24)
    /usr/local/Cellar/go/1.8.3/libexec/src/net/fd_poll_runtime.go:80 +0x34
net.(*netFD).Read(0xc42006f570, 0xc42007d6b0, 0x24, 0x24, 0x0, 0x1438e40, 0x14363d0)
    /usr/local/Cellar/go/1.8.3/libexec/src/net/fd_unix.go:250 +0x1b7
net.(*conn).Read(0xc4200e0010, 0xc42007d6b0, 0x24, 0x24, 0x0, 0x0, 0x0)
    /usr/local/Cellar/go/1.8.3/libexec/src/net/net.go:181 +0x70
gopkg.in/mgo%2ev2.fill(0x143cce0, 0xc4200e0010, 0xc42007d6b0, 0x24, 0x24, 0x0, 0x53)
    /Users/erikmuttersbach/go/src/gopkg.in/mgo.v2/socket.go:535 +0x53
gopkg.in/mgo%2ev2.(*mongoSocket).readLoop(0xc420164000)
    /Users/erikmuttersbach/go/src/gopkg.in/mgo.v2/socket.go:551 +0x107
created by gopkg.in/mgo%2ev2.newSocket
    /Users/erikmuttersbach/go/src/gopkg.in/mgo.v2/socket.go:194 +0x259

goroutine 3 [runnable]:
net.runtime_pollWait(0x1664eb0, 0x72, 0x9)
    /usr/local/Cellar/go/1.8.3/libexec/src/runtime/netpoll.go:164 +0x59
net.(*pollDesc).wait(0xc4201060d8, 0x72, 0x1438e40, 0x14363d0)
    /usr/local/Cellar/go/1.8.3/libexec/src/net/fd_poll_runtime.go:75 +0x38
net.(*pollDesc).waitRead(0xc4201060d8, 0xc4201000c0, 0x24)
    /usr/local/Cellar/go/1.8.3/libexec/src/net/fd_poll_runtime.go:80 +0x34
net.(*netFD).Read(0xc420106070, 0xc4201000c0, 0x24, 0x24, 0x0, 0x1438e40, 0x14363d0)
    /usr/local/Cellar/go/1.8.3/libexec/src/net/fd_unix.go:250 +0x1b7
net.(*conn).Read(0xc42000e010, 0xc4201000c0, 0x24, 0x24, 0x0, 0x0, 0x0)
    /usr/local/Cellar/go/1.8.3/libexec/src/net/net.go:181 +0x70
gopkg.in/mgo%2ev2.fill(0x143cce0, 0xc42000e010, 0xc4201000c0, 0x24, 0x24, 0x0, 0x0)
    /Users/erikmuttersbach/go/src/gopkg.in/mgo.v2/socket.go:535 +0x53
gopkg.in/mgo%2ev2.(*mongoSocket).readLoop(0xc420118000)
    /Users/erikmuttersbach/go/src/gopkg.in/mgo.v2/socket.go:551 +0x107
created by gopkg.in/mgo%2ev2.newSocket
    /Users/erikmuttersbach/go/src/gopkg.in/mgo.v2/socket.go:194 +0x259

goroutine 52 [select, 10 minutes]:
github.com/rwynn/gtm.FetchDocuments(0xc4200fe0a0, 0xc420063860, 0x131fa40, 0xc420100120, 0xc4201122a0, 0x0, 0x0)
    /Users/erikmuttersbach/go/src/github.com/rwynn/gtm/gtm.go:498 +0x3a7
created by github.com/rwynn/gtm.Start
    /Users/erikmuttersbach/go/src/github.com/rwynn/gtm/gtm.go:673 +0x5fc

goroutine 53 [semacquire, 10 minutes]:
sync.runtime_notifyListWait(0xc420150108, 0xc400000001)
    /usr/local/Cellar/go/1.8.3/libexec/src/runtime/sema.go:298 +0x10b
sync.(*Cond).Wait(0xc4201500f8)
    /usr/local/Cellar/go/1.8.3/libexec/src/sync/cond.go:57 +0x89
gopkg.in/mgo%2ev2.(*Iter).Next(0xc4201500f0, 0x12c45e0, 0xc42007d590, 0x0)
    /Users/erikmuttersbach/go/src/gopkg.in/mgo.v2/session.go:3704 +0x9f
github.com/rwynn/gtm.TailOps(0xc4200fe0a0, 0xc420063860, 0xc42010a008, 0x1, 0x1, 0xc420110090, 0x0, 0x0)
    /Users/erikmuttersbach/go/src/github.com/rwynn/gtm/gtm.go:365 +0x1ea
created by github.com/rwynn/gtm.Start
    /Users/erikmuttersbach/go/src/github.com/rwynn/gtm/gtm.go:685 +0x8ea

rax    0xe
rbx    0x0
rcx    0x7fff5fbff3c0
rdx    0x186a0
rdi    0xf03
rsi    0x0
rbp    0x7fff5fbff3f8
rsp    0x7fff5fbff3c0
r8     0x43a260
r9     0x11
r10    0x653ef1e26c7c
r11    0x202
r12    0x1ad50d58ee7b
r13    0x1cbe784ba3b8
r14    0x14d44e02db750c00
r15    0x1
rip    0x10566a3
rflags 0x202
cs     0x7
fs     0x0
gs     0x0
exit status 2

The problem is the garbage collector, which can be seen from the stack trace (after sending SIGQUIT to the program):

goroutine 37 [running]:
runtime.systemstack_switch()
    /usr/local/Cellar/go/1.8.3/libexec/src/runtime/asm_amd64.s:281 fp=0xc420145608 sp=0xc420145600
runtime.gcStart(0x0, 0xc420366000)
    /usr/local/Cellar/go/1.8.3/libexec/src/runtime/mgc.go:1010 +0x208 fp=0xc420145640 sp=0xc420145608
runtime.mallocgc(0x2500, 0x0, 0xc420106100, 0xc420145728)
    /usr/local/Cellar/go/1.8.3/libexec/src/runtime/malloc.go:785 +0x491 fp=0xc4201456e0 sp=0xc420145640
runtime.rawstring(0x20e9, 0x0, 0x0, 0x0, 0x0, 0x0)
    /usr/local/Cellar/go/1.8.3/libexec/src/runtime/string.go:237 +0x85 fp=0xc420145710 sp=0xc4201456e0
runtime.rawstringtmp(0x0, 0x20e9, 0x29, 0xc4201457c0, 0xc4200bdb80, 0xc420106ff0, 0xc420106120)
    /usr/local/Cellar/go/1.8.3/libexec/src/runtime/string.go:107 +0x78 fp=0xc420145750 sp=0xc420145710
runtime.concatstrings(0x0, 0xc420145830, 0x2, 0x2, 0xc420106ff0, 0xc4201b8000)
    /usr/local/Cellar/go/1.8.3/libexec/src/runtime/string.go:46 +0xf9 fp=0xc4201457e8 sp=0xc420145750
runtime.concatstring2(0x0, 0xc42051d900, 0x20c0, 0xc420106ff0, 0x29, 0xc420106ff0, 0x29)
    /usr/local/Cellar/go/1.8.3/libexec/src/runtime/string.go:55 +0x47 fp=0xc420145828 sp=0xc4201457e8
_/Users/erikmuttersbach/Projects/mongo2psql-go/db.GenerateInsert(0xc4200c9730, 0x4, 0xc4200c9650, 0x4, 0xc4200e1860, 0x1, 0x4, 0xc42006c400, 0x3, 0x4, ...)
    /Users/erikmuttersbach/Projects/mongo2psql-go/db/sql.go:27 +0x407 fp=0xc420145a10 sp=0xc420145828
main.writer(0xc4200e01a0, 0xc42015ccc0, 0xc420014320)
    /Users/erikmuttersbach/Projects/mongo2psql-go/main.go:112 +0x3b4 fp=0xc420145fc8 sp=0xc420145a10
runtime.goexit()
    /usr/local/Cellar/go/1.8.3/libexec/src/runtime/asm_amd64.s:2197 +0x1 fp=0xc420145fd0 sp=0xc420145fc8
created by main.main
    /Users/erikmuttersbach/Projects/mongo2psql-go/main.go:202 +0x135

The line runtime.gcStart(0x0, 0xc420366000) more specifically hints you to the fact that the GC wants to kick-in, but is blocked. The GC in fact was blocked, because I started my "never ending" go routines like this:

func main() {
...
go writer()
go producer()

for {}
}

The infinite for loop blocks the proc and lets the garbage collector never time to kick in.