去旅游爬虫运动麻烦

I'm going through the go tour and I feel like I have a pretty good understanding of the language except for concurrency.

On slide 72 there is an exercise that asks the reader to parallelize a web crawler (and to make it not cover repeats but I haven't gotten there yet.)

Here is what I have so far:

func Crawl(url string, depth int, fetcher Fetcher, ch chan string) {
    if depth <= 0 {
        return
    }

    body, urls, err := fetcher.Fetch(url)
    if err != nil {
        ch <- fmt.Sprintln(err)
        return
    }

    ch <- fmt.Sprintf("found: %s %q
", url, body)
    for _, u := range urls {
        go Crawl(u, depth-1, fetcher, ch)
    }
}

func main() {
    ch := make(chan string, 100)
    go Crawl("http://golang.org/", 4, fetcher, ch)

    for i := range ch {
        fmt.Println(i)
    }
}

The issue I have is where to put the close(ch) call. If I put a defer close(ch) somewhere in the Crawl method, then I end up writing to a closed channel in one of the spawned goroutines, since the method will finish execution before the spawned goroutines do.

If I omit the call to close(ch), as is shown in my example code, the program deadlocks after all the goroutines finish executing but the main thread is still waiting on the channel in the for loop since the channel was never closed.

A look at the Parallelization section of Effective Go leads to ideas for the solution. Essentually you have to close the channel on each return route of the function. Actually this is a nice use case of the defer statement:

func Crawl(url string, depth int, fetcher Fetcher, ret chan string) {
    defer close(ret)
    if depth <= 0 {
        return
    }

    body, urls, err := fetcher.Fetch(url)
    if err != nil {
        ret <- err.Error()
        return
    }

    ret <- fmt.Sprintf("found: %s %q", url, body)

    result := make([]chan string, len(urls))
    for i, u := range urls {
        result[i] = make(chan string)
        go Crawl(u, depth-1, fetcher, result[i])
    }

    for i := range result {
        for s := range result[i] {
            ret <- s
        }
    }

    return
}

func main() {
    result := make(chan string)
    go Crawl("http://golang.org/", 4, fetcher, result)

    for s := range result {
        fmt.Println(s)
    }
}

The essential difference to your code is that every instance of Crawl gets its own return channel and the caller function collects the results in its return channel.

Here's my solution. I have a "master" routine that listens to a channel of urls and starts new crawling routine (which puts crawled urls into the channel) if it finds new urls to crawl.

Instead of explicitly closing the channel, I have a counter for unfinished crawling goroutines, and when the counter is 0, the program exits because it has nothing to wait for.

func doCrawl(url string, fetcher Fetcher, results chan []string) {
    body, urls, err := fetcher.Fetch(url)
    results <- urls

    if err != nil {
        fmt.Println(err)
    } else {
        fmt.Printf("found: %s %q
", url, body)
    }
}



func Crawl(url string, depth int, fetcher Fetcher) {
    results := make(chan []string)
    crawled := make(map[string]bool)
    go doCrawl(url, fetcher, results)
    // counter for unfinished crawling goroutines
    toWait := 1

    for urls := range results {
        toWait--

        for _, u := range urls {
            if !crawled[u] {
                crawled[u] = true
                go doCrawl(u, fetcher, results)
                toWait++
            }
        }

        if toWait == 0 {
            break
        }
    }
}

I went with a completely different direction with this one. I might have been mislead by the tip about using a map.

// SafeUrlMap is safe to use concurrently.
type SafeUrlMap struct {
    v   map[string]string
    mux sync.Mutex
}

func (c *SafeUrlMap) Set(key string, body string) {
    c.mux.Lock()
    // Lock so only one goroutine at a time can access the map c.v.
    c.v[key] = body
    c.mux.Unlock()
}

// Value returns mapped value for the given key.
func (c *SafeUrlMap) Value(key string) (string, bool) {
    c.mux.Lock()
    // Lock so only one goroutine at a time can access the map c.v.
    defer c.mux.Unlock()
    val, ok := c.v[key]
    return val, ok
}

// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(url string, depth int, fetcher Fetcher, urlMap SafeUrlMap) {
    defer wg.Done()
    urlMap.Set(url, body)

    if depth <= 0 {
        return
    }

    body, urls, err := fetcher.Fetch(url)
    if err != nil {
        fmt.Println(err)
        return
    }

    for _, u := range urls {
        if _, ok := urlMap.Value(u); !ok {
            wg.Add(1)
            go Crawl(u, depth-1, fetcher, urlMap)
        }
    }

    return
}

var wg sync.WaitGroup

func main() {
    urlMap := SafeUrlMap{v: make(map[string]string)}

    wg.Add(1)
    go Crawl("http://golang.org/", 4, fetcher, urlMap)
    wg.Wait()

    for url := range urlMap.v {
        body, _ := urlMap.Value(url)
        fmt.Printf("found: %s %q
", url, body)
    }
}

I have implemented it with a simple channel, where all the goroutines send their messages. To ensure that it is closed when there is no more goroutines I use a safe counter, that close the channel when the counter is 0.

type Msg struct {
    url string
    body string
}

type SafeCounter struct {
    v int
    mux sync.Mutex
}

func (c *SafeCounter) inc() {
    c.mux.Lock()
    defer c.mux.Unlock()
    c.v++   
}

func (c *SafeCounter) dec(ch chan Msg) {
    c.mux.Lock()
    defer c.mux.Unlock()
    c.v--
    if c.v == 0 {
        close(ch)
    }
}

var goes SafeCounter = SafeCounter{v: 0}

// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(url string, depth int, fetcher Fetcher, ch chan Msg) {
    defer goes.dec(ch)
    // TODO: Fetch URLs in parallel.
    // TODO: Don't fetch the same URL twice.
    // This implementation doesn't do either:
    if depth <= 0 {
        return
    }
    if !cache.existsAndRegister(url) {
        body, urls, err :=  fetcher.Fetch(url)
        if err != nil {
            fmt.Println(err)
            return
        }
        ch <- Msg{url, body}
        for _, u := range urls {
            goes.inc()
            go Crawl(u, depth-1, fetcher, ch)
        }
    }
    return
}

func main() {
    ch := make(chan Msg, 100)
    goes.inc()
    go Crawl("http://golang.org/", 4, fetcher, ch)
    for m := range ch {
        fmt.Printf("found: %s %q
", m.url, m.body)
    }
}

Note that the safe counter must be incremented outside of the goroutine.

I use slice to avoid crawl the url twice,the recursive version without the concurrency is ok, but not sure about this concurrency version.

func Crawl(url string, depth int, fetcher Fetcher) {
    var str_arrs []string
    var mux sync.Mutex

    var crawl func(string, int)
    crawl = func(url string, depth int) {
        if depth <= 0 {
            return
        }

        mux.Lock()
        for _, v := range str_arrs {
            if url == v {
                mux.Unlock()
                return
            }
        }
        str_arrs = append(str_arrs, url)
        mux.Unlock()

        body, urls, err := fetcher.Fetch(url)
        if err != nil {
            fmt.Println(err)
            return
        }
        fmt.Printf("found: %s %q
", url, body)
        for _, u := range urls {
            go crawl(u, depth-1) // could delete “go” then it is recursive
        }
    }

    crawl(url, depth)
    return
}

func main() {
    Crawl("http://golang.org/", 4, fetcher)
}

O(1) time lookup of url on map instead of O(n) lookup on slice of all urls visited should help minimize time spent inside of the critical section, which is a trivial amount of time for this example but would become relevant with scale.

WaitGroup used to prevent top level Crawl() function from returning until all child go routines are complete.

func Crawl(url string, depth int, fetcher Fetcher) {
    var str_map = make(map[string]bool)
    var mux sync.Mutex
    var wg sync.WaitGroup

    var crawler func(string,int)
    crawler = func(url string, depth int) {
        defer wg.Done()

        if depth <= 0 {
            return
        }   

        mux.Lock()
        if _, ok := str_map[url]; ok {
            mux.Unlock()
            return;
        }else{
            str_map[url] = true
            mux.Unlock()
        }

        body, urls, err := fetcher.Fetch(url)
        if err != nil {
            fmt.Println(err)
            return
        }
        fmt.Printf("found: %s %q %q
", url, body, urls)

        for _, u := range urls {
            wg.Add(1)
            go crawler(u, depth-1)          
        }       
    }
    wg.Add(1)
    crawler(url,depth)
    wg.Wait()   
}

func main() {
    Crawl("http://golang.org/", 4, fetcher)
}

Here's my solution, using sync.WaitGroup and a SafeCache of fetched urls:

package main

import (
    "fmt"
    "sync"
)

type Fetcher interface {
    // Fetch returns the body of URL and
    // a slice of URLs found on that page.
    Fetch(url string) (body string, urls []string, err error)
}

// Safe to use concurrently
type SafeCache struct {
    fetched map[string]string
    mux     sync.Mutex
}

func (c *SafeCache) Add(url, body string) {
    c.mux.Lock()
    defer c.mux.Unlock()

    if _, ok := c.fetched[url]; !ok {
        c.fetched[url] = body
    }
}

func (c *SafeCache) Contains(url string) bool {
    c.mux.Lock()
    defer c.mux.Unlock()

    _, ok := c.fetched[url]
    return ok
}

// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(url string, depth int, fetcher Fetcher, cache SafeCache,
    wg *sync.WaitGroup) {

    defer wg.Done()
    if depth <= 0 {
        return
    }
    body, urls, err := fetcher.Fetch(url)
    if err != nil {
        fmt.Println(err)
        return
    }
    fmt.Printf("found: %s %q
", url, body)
    cache.Add(url, body)
    for _, u := range urls {
        if !cache.Contains(u) {
            wg.Add(1)
            go Crawl(u, depth-1, fetcher, cache, wg)
        }
    }
    return
}

func main() {
    cache := SafeCache{fetched: make(map[string]string)}
    var wg sync.WaitGroup

    wg.Add(1)
    Crawl("http://golang.org/", 4, fetcher, cache, &wg)
    wg.Wait()
}

Below is a simple solution for parallelization using only sync waitGroup.

var fetchedUrlMap = make(map[string]bool)
var mutex sync.Mutex

func Crawl(url string, depth int, fetcher Fetcher) {
    //fmt.Println("In Crawl2 with url" , url)
    if _, ok := fetchedUrlMap[url]; ok {
        return
    }

    if depth <= 0 {
        return
    }
    body, urls, err := fetcher.Fetch(url)
    mutex.Lock()
    fetchedUrlMap[url] = true
    mutex.Unlock()
    if err != nil {
        fmt.Println(err)
        return
    }
    fmt.Printf("found: %s %q
", url, body)

    var wg sync.WaitGroup
    for _, u := range urls {
        //  fmt.Println("Solving for ", u)
        wg.Add(1)
        go func(uv string) {
            Crawl(uv, depth-1, fetcher)
            wg.Done()
        }(u)
    }
    wg.Wait()
}

</div>

Similar idea to the accepted answer, but with no duplicate URLs fetched, and printing directly to console. defer() is not used either. We use channels to signal when goroutines complete. The SafeMap idea is lifted off the SafeCounter given previously in the tour.

For the child goroutines, we create an array of channels, and wait until every child returns, by waiting on the channel.

package main

import (
    "fmt"
    "sync"
)

// SafeMap is safe to use concurrently.
type SafeMap struct {
    v   map[string] bool
    mux sync.Mutex
}

// SetVal sets the value for the given key.
func (m *SafeMap) SetVal(key string, val bool) {
    m.mux.Lock()
    // Lock so only one goroutine at a time can access the map c.v.
    m.v[key] = val
    m.mux.Unlock()
}

// Value returns the current value of the counter for the given key.
func (m *SafeMap) GetVal(key string) bool {
    m.mux.Lock()
    // Lock so only one goroutine at a time can access the map c.v.
    defer m.mux.Unlock()
    return m.v[key]
}

type Fetcher interface {
    // Fetch returns the body of URL and
    // a slice of URLs found on that page.
    Fetch(url string) (body string, urls []string, err error)
}

// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(url string, depth int, fetcher Fetcher, status chan bool, urlMap SafeMap) {

    // Check if we fetched this url previously.
    if ok := urlMap.GetVal(url); ok {
        //fmt.Println("Already fetched url!")
        status <- true
        return 
    }

    // Marking this url as fetched already.
    urlMap.SetVal(url, true)

    if depth <= 0 {
        status <- false
        return
    }

    body, urls, err := fetcher.Fetch(url)
    if err != nil {
        fmt.Println(err)
        status <- false
        return
    }

    fmt.Printf("found: %s %q
", url, body)

    statuses := make ([]chan bool, len(urls)) 
    for index, u := range urls {
        statuses[index] = make (chan bool)
        go Crawl(u, depth-1, fetcher, statuses[index], urlMap)
    }

    // Wait for child goroutines.
    for _, childstatus := range(statuses) {
        <- childstatus
    }

    // And now this goroutine can finish.
    status <- true

    return
}

func main() {
    urlMap := SafeMap{v: make(map[string] bool)}
    status := make(chan bool)
    go Crawl("https://golang.org/", 4, fetcher, status, urlMap)
    <- status
}

Below is my solution. Except the global map, I only had to change the contents of Crawl. Like other solutions, I used sync.Map and sync.WaitGroup. I've blocked out the important parts.

var m sync.Map

// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(url string, depth int, fetcher Fetcher) {
    // This implementation doesn't do either:
    if depth <= 0 {
        return
    }
    // Don't fetch the same URL twice.
    /////////////////////////////////////
    _, ok := m.LoadOrStore(url, url)   //
    if ok {                            //
        return                         //
    }                                  //
    /////////////////////////////////////
    body, urls, err := fetcher.Fetch(url)
    if err != nil {
        fmt.Println(err)
        return
    }
    fmt.Printf("found: %s %q
", url, body)
    // Fetch URLs in parallel.
    /////////////////////////////////////
    var wg sync.WaitGroup              //
    defer wg.Wait()                    //
    for _, u := range urls {           //
        wg.Add(1)                      //
        go func(u string) {            //
            defer wg.Done()            //
            Crawl(u, depth-1, fetcher) //
        }(u)                           //
    }                                  //
    /////////////////////////////////////
    return
}

I think using a map (the same way we could use a set in other languages) and a mutex is the easiest approach:

func Crawl(url string, depth int, fetcher Fetcher) {
    mux.Lock()
    defer mux.Unlock()
    if depth <= 0 || IsVisited(url) {
        return
    }
    visit[url] = true
    body, urls, err := fetcher.Fetch(url)
    if err != nil {
        fmt.Println(err)
        return
    }
    fmt.Printf("found: %s %q
", url, body)
    for _, u := range urls {
        //
        go Crawl(u, depth-1, fetcher)
    }
    return
}

func IsVisited(s string) bool {
    _, ok := visit[s]
    return ok
}

var mux sync.Mutex

var visit = make(map[string]bool)

func main() {
    Crawl("https://golang.org/", 4, fetcher)
    time.Sleep(time.Second)
}