Golang的递归并发

I'd like to distribute some load across some goroutines. If the number of tasks is known beforehand then it is easy to organize. For example, I could do fan out with a wait group.

nTasks := 100
nGoroutines := 10

// it is important that this channel is not buffered
ch := make(chan *Task)
done := make(chan bool)
var w sync.WaitGroup
// Feed the channel until done
go func () {
    for i:= 0; i < nTasks; i++ {
        task := getTaskI(i)
        ch <- task
    }
    // as ch is not buffered once everything is read we know we have delivered all of them
    for i:=0; i < nGoroutines; i++ {
        done <- false
    }
}()
for i:= 0; i < nGoroutines; i ++ {
    w.Add(1)
    go func () {
        defer w.Done()
        select {
        case task := <-ch:
            doSomethingWithTask(task)
        case <- done:
            return
        }
    }()
}
w.Wait()
// All tasks done, all goroutines closed

However, in my case each task returns more tasks to be done. Say for example a crawler where we receive all the links from the crawled web. My initial hunch was to have a main loop where I track the number of tasks done and tasks pending. When I'm done I send a finish signal to all goroutines:

nGoroutines := 10
ch := make(chan *Task, nGoroutines)
feedBackChannel := make(chan * Task, nGoroutines)
done := make(chan bool)

for i:= 0; i < nGoroutines; i ++ {
    go func () {
        select {
        case task := <-ch:
            task.NextTasks = doSomethingWithTask(task)
            feedBackChannel <- task
        case <- done:
            return
        }
    }()
}

// seed first task
ch <- firstTask
nTasksRemaining := 1

for nTasksRemaining > 0 {
    task := <- feedBackChannel
    nTasksRemaining -= 1
    for _, t := range(task.NextTasks) {
        ch <- t
        nTasksRemaining++
    }
}
for i:=0; i < nGoroutines; i++ {
    done <- false
}

However, this produces a deadlock. For example if NextTasks is bigger than the number of goroutines then the main loop will stall when the first tasks finish. But the first tasks can't finish because the feedBack is blocked since the mainLoop is waiting to write.

One "easy" way out of this is to post to the channel asynchronously: Instead of doing feedBackChannel <- task do go func () {feedBackChannel <- task}(). Now, this feels like an awful hack. Specially since there might be hundred of thousands of tasks.

What would be a nice way to avoid this deadlock? I've searched for concurrency patterns, but mostly are simpler things like fanning out or pipelines where the later stage does not affect the earlier steps.

If I understand your problem correctly, your solution is pretty complex. Here are some points. Hope it helps.

  • As people mentioned in comments, launching a goroutine is cheap (both memory and switch between them is much cheaper that OS level theread) and you could have hundred thousand of them. Let's assume for some reasons you want to have worker goroutines.
  • Instead of done channel you could just close ch channel and instead of select you just range over your channel getting tasks.
  • I don't see the point of separating ch and feedBackChannel just push every task you have into ch and increase its capacity.
  • As mentioned you may get a deadlock when you trying to enqueue new task. My solution is pretty naive. Just increase its capacity until you are sure that it won't overflow (you could also log warnings if cap(ch) - len(ch) < threshold). If you create a channel (of pointers) with 1 million capacity it will take about 8 * 1e6 ~= 8MB of ram.