同步工作程序以进行递归爬网

I would like to implement a "crawler" with n workers where each worker is able to add additional jobs. The program should stop when there are no jobs left and all workers have finished their work.

I have the following code (you can play with it at https://play.golang.org/p/_j22p_OfYv):

package main

import (
    "fmt"
    "sync"
)

func main() {
    pathChan := make(chan string)
    fileChan := make(chan string)
    workers := 3
    var wg sync.WaitGroup

    paths := map[string][]string{
        "/":     {"/test", "/foo", "a", "b"},
        "/test": {"aa", "bb", "cc"},
        "/foo":  {"/bar", "bbb", "ccc"},
        "/bar":  {"aaaa", "bbbb", "cccc"},
    }

    for i := 0; i < workers; i++ {
        wg.Add(1)
        go func() {
            for {
                path, ok := <-pathChan
                if !ok {
                    break
                }

                for _, f := range paths[path] {
                    if f[0] == '/' {
                        pathChan <- f
                    } else {
                        fileChan <- f
                    }
                }
            }

            wg.Done()
        }()
    }

    pathChan <- "/"

    for {
        filePath, ok := <-fileChan
        if !ok {
            break
        }

        fmt.Println(filePath)
    }

    wg.Wait()
    close(pathChan)
}

Unfortunately, this ends in a dead-lock. Where exactly is the problem? Also, what is the best practice to write such functionality? Are channels the correct feature to use?

EDIT:

I have updated my code to use two wait groups, one for the jobs and one for the workers (see https://play.golang.org/p/bueUJzMhqj):

package main

import (
    "fmt"
    "sync"
)

func main() {
    pathChan := make(chan string)
    fileChan := make(chan string)
    jobs := new(sync.WaitGroup)
    workers := new(sync.WaitGroup)
    nworkers := 2

    paths := map[string][]string{
        "/":     {"/test", "/foo", "a", "b"},
        "/test": {"aa", "bb", "cc"},
        "/foo":  {"/bar", "bbb", "ccc"},
        "/bar":  {"aaaa", "bbbb", "cccc"},
    }

    for i := 0; i < nworkers; i++ {
        workers.Add(1)
        go func() {
            defer workers.Done()
            for {
                path, ok := <-pathChan
                if !ok {
                    break
                }

                for _, f := range paths[path] {
                    if f[0] == '/' {
                        jobs.Add(1)
                        pathChan <- f
                    } else {
                        fileChan <- f
                    }
                }

                jobs.Done()
            }

        }()
    }

    jobs.Add(1)
    pathChan <- "/"

    go func() {
        jobs.Wait()
        close(pathChan)
        workers.Wait()
        close(fileChan)
    }()

    for {
        filePath, ok := <-fileChan
        if !ok {
            break
        }

        fmt.Println(filePath)
    }

}

This indeed seems to work, but obviously a deadlock will still happen if nworkers is set to 1, because the single worker will wait forever when adding something to the channel pathChan. To solve this issue, the channel buffer can be increased (e.g. pathChan := make(chan string, 2)), but this will only work as long as two buffer isn't completely full. Of course, the buffer size could be set to a large number, say 10000, but the code could still hit a deadlock. Additionally, this doesn't seem to be a clean solution to me.

This is where I realized that it would be easier to use some sort of queue instead of a channel, where elements can be added and removed without blocking and where the size of the queue isn't fixed. Do such queues exist in the Go standard library?

If you want to wait for an arbitrary number of workers to finish, the standard library includes sync.WaitGroup for exactly this purpose.

There are other concurrency issues as well:

  • You're using channel closure signalling, but you have multiple goroutines sending on the same channel. This is generally bad practice: since each routine can never know when the other routines are done with the channel, you can never correctly close the channel.
  • Closing one channel waits on the other to be closed first, but it will never be closed, so it deadlocks.
  • The only reason it doesn't deadlock immediately is your example happens to have more workers than directories under "/". Add two more directories under "/" and it deadlocks immediately.

There are some solutions:

  • Dump the worker pool and just spin a goroutine for every subdirectory, and let the scheduler worry about the rest: https://play.golang.org/p/ck2DkNFnyF
  • Use one worker per root-level directory, and have each worker process its directory recursively rather than queuing subdirectories it finds to a channel.