前往:学习渠道和排队,致命错误

I'm trying to learn how to use channels to make a queue in Go for one of my other projects. My other project basically queues up database rows, and then does number crunching on the database using the details in the rows.

I don't want the same row to be processing in a worker at the same time, so it needs to check whether a worker is currently working on that specific row ID, and if so, wait for it to finish. If it's not the same row ID, it can run asynchronously, but I also want to limit the number of asynchronous workers that can run at the same time. In my code below, I'm trying to limit it to three workers at the moment.

Here's what I have:

package main

import (
    "log"
    "strconv"
    "time"
)

// RowInfo holds the job info
type RowInfo struct {
    id int
}

// WorkerCount holds how many workers are currently running
var WorkerCount int

// WorkerLocked specifies whether a row ID is currently processing by a worker
var WorkerLocked map[string]bool

// Process the RowInfo
func worker(row RowInfo) {
    rowID := strconv.Itoa(row.id)

    WorkerCount++
    WorkerLocked[rowID] = true

    time.Sleep(1 * time.Second)
    log.Printf("ID rcvd: %d", row.id)

    WorkerLocked[rowID] = false
    WorkerCount--
}

// waiter will check if the row is already processing in a worker
// Block until it finishes completion, then dispatch
func waiter(row RowInfo) {
    rowID := strconv.Itoa(row.id)
    for WorkerLocked[rowID] == true {
        time.Sleep(1 * time.Second)
    }

    go worker(row)
}

func main() {
    jobsQueue := make(chan RowInfo, 10)
    WorkerLocked = make(map[string]bool)

    // Dispatcher waits for jobs on the channel and dispatches to waiter
    go func() {
        // Wait for a job
        for {
            // Only have a max of 3 workers running asynch at a time
            for WorkerCount > 3 {
                time.Sleep(1 * time.Second)
            }

            job := <-jobsQueue
            go waiter(job)
        }
    }()

    // Test the queue, send some data
    for i := 0; i < 12; i++ {
        r := RowInfo{
            id: i,
        }
        jobsQueue <- r
    }

    // Prevent exit!
    for {
        time.Sleep(1 * time.Second)
    }
}

And I'm getting this error, but it's an intermittent issue because sometimes when I run it it appears to work. Is there a race condition?:

go run main.go
panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xb code=0x1 addr=0x8 pc=0x4565e7]

goroutine 37 [running]:
main.worker(0x5)
    /home/piiz/go/src/github.com/zzz/asynch/main.go:25 +0x94
created by main.waiter
    /home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb

goroutine 1 [sleep]:
time.Sleep(0x3b9aca00)
    /usr/local/go/src/runtime/time.go:59 +0xf9
main.main()
    /home/piiz/go/src/github.com/zzz/asynch/main.go:73 +0xf8

goroutine 5 [sleep]:
time.Sleep(0x3b9aca00)
    /usr/local/go/src/runtime/time.go:59 +0xf9
main.main.func1(0xc82008c000)
    /home/piiz/go/src/github.com/zzz/asynch/main.go:55 +0x2d
created by main.main
    /home/piiz/go/src/github.com/zzz/asynch/main.go:61 +0xa0

goroutine 35 [sleep]:
time.Sleep(0x3b9aca00)
    /usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x2)
    /home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
    /home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb

goroutine 36 [sleep]:
time.Sleep(0x3b9aca00)
    /usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x4)
    /home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
    /home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb

goroutine 34 [sleep]:
time.Sleep(0x3b9aca00)
    /usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x1)
    /home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
    /home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb

goroutine 12 [runnable]:
runtime.goexit1()
    /usr/local/go/src/runtime/proc1.go:1732
runtime.goexit()
    /usr/local/go/src/runtime/asm_amd64.s:1697 +0x6
created by main.main.func1
    /home/piiz/go/src/github.com/zzz/asynch/main.go:59 +0x8c

goroutine 19 [sleep]:
time.Sleep(0x3b9aca00)
    /usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x8)
    /home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
    /home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb

goroutine 20 [sleep]:
time.Sleep(0x3b9aca00)
    /usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x0)
    /home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
    /home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb

goroutine 16 [sleep]:
time.Sleep(0x3b9aca00)
    /usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x9)
    /home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
    /home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb

goroutine 33 [sleep]:
time.Sleep(0x3b9aca00)
    /usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x3)
    /home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
    /home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb

goroutine 18 [sleep]:
time.Sleep(0x3b9aca00)
    /usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x7)
    /home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
    /home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb

goroutine 22 [sleep]:
time.Sleep(0x3b9aca00)
    /usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0xa)
    /home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
    /home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb

goroutine 49 [runnable]:
main.worker(0x6)
    /home/piiz/go/src/github.com/zzz/asynch/main.go:21
created by main.waiter
    /home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
exit status 2

Anyway, I am still learning, so if you look at my code and go "what the hell", well, I won't be surprised :) Maybe I'm approaching this problem entirely wrong. Thanks.

If you're going to use the WorkerLocked map, you need to protect access to it using the sync package. You also need to protect WorkerCount in the same way (or using atomic operations). Doing something like that would also make sleeping unnecessary (using condition variables).

Better yet, have 3 (or however many) workers waiting for rows to work on using channels. You would then distribute the rows to the various workers such that a particular row is always worked on by a particular worker (e.g., using row.id % 3 to determine which worker/channel to send the row to).

I highly recommend not to use any locking in this situation where you have workers that handles reads from database. Locks and semaphores in general can cause a lot of issue and in the end leave you with bunch of corrupted data. Trust me. Been there, done that. You need to be careful and avoid using them in cases like this. Locking is good if you wish to preserve data and maintain maps for example but not for actual processing.

By locking go routine you're slowing down your go program unnecessary. Go is designed to process things fast as possible. Don't hold him down.

Here's some theory of my own that might help you understand what I'm trying to say a little bit better:

  • In order to handle workers limit to 3. Just spawn 3 different go routines that selects from queue. Worker will never take same job from channel so you're safe here.

  • make() already have internal channel limit done that can be used in this case nicely. That channel limit is actual 2nd parameter. So if you write

    queue := make(chan RowInfo, 10)
    it means that this queue can take up to 10 of RowInfo. In case that loop that aggregates into this queue reaches 10 of them, it will lock down and wait for worker to release one item from channel. So once queue goes to 9, database aggregator will than write 10th and worker will take that 10th out.

On this way you can have natural workflow that golang is all about :) This as well is called spawning pre-workers

package main

import (
    "fmt"
    "os"
    "os/signal"
    "syscall"
    "time"
)

// RowInfo holds the job info
type RowInfo struct {
    ID int
}

func worker(queue chan RowInfo, done chan bool) {
    fmt.Println("Starting worker...")

    for {
        select {
        case row := <-queue:
            fmt.Printf("Got row info: %v 
", row)
            // Keep it for second so we can see actual queue lock working
            time.Sleep(1 * time.Second)

        case <-time.After(10 * time.Second):
            fmt.Printf("This job is taking way too long. Let's clean it up now by lets say write write in database that job has failed so it can be restarted again when time is right.")
        case <-done:
            fmt.Printf("Got quit signal... Killing'em all")
            break
        }
    }
}

func handleSigterm(kill chan os.Signal, done chan bool) {
    select {
    case _ = <-kill:
        close(done)
    }
}

func main() {

    // Do not allow more than 10 records to be in the channel.
    queue := make(chan RowInfo, 10)
    done := make(chan bool)

    kill := make(chan os.Signal, 1)

    signal.Notify(kill, os.Interrupt)
    signal.Notify(kill, syscall.SIGTERM)

    go handleSigterm(kill, done)

    for i := 0; i < 3; i++ {
        go worker(queue, done)
    }

    // Should be infinite loop in the end...
    go func() {
        for i := 0; i < 100; i++ {
            fmt.Printf("Queueing: %v 
", i)
            queue <- RowInfo{ID: i}
        }
    }()

    <-done
    // Give it some time to process things before shutting down. This is bad way of doing things
    // but is efficient for this example
    time.Sleep(5 * time.Second)
}

As for managing database states, You can have status in database saying "in-progress" so every time you select you as well do an update on that row that it's in progress. This is of course one way of doing it. By preserving some sort of mapping in golang I would say that you'd torture your service more than it needs to be.

Hope this helps!