I'm going through the go tour and I feel like I have a pretty good understanding of the language except for concurrency.
On slide 72 there is an exercise that asks the reader to parallelize a web crawler (and to make it not cover repeats but I haven't gotten there yet.)
Here is what I have so far:
func Crawl(url string, depth int, fetcher Fetcher, ch chan string) {
if depth <= 0 {
return
}
body, urls, err := fetcher.Fetch(url)
if err != nil {
ch <- fmt.Sprintln(err)
return
}
ch <- fmt.Sprintf("found: %s %q
", url, body)
for _, u := range urls {
go Crawl(u, depth-1, fetcher, ch)
}
}
func main() {
ch := make(chan string, 100)
go Crawl("http://golang.org/", 4, fetcher, ch)
for i := range ch {
fmt.Println(i)
}
}
The issue I have is where to put the close(ch)
call. If I put a defer close(ch)
somewhere in the Crawl
method, then I end up writing to a closed channel in one of the spawned goroutines, since the method will finish execution before the spawned goroutines do.
If I omit the call to close(ch)
, as is shown in my example code, the program deadlocks after all the goroutines finish executing but the main thread is still waiting on the channel in the for loop since the channel was never closed.
A look at the Parallelization section of Effective Go leads to ideas for the solution. Essentually you have to close the channel on each return route of the function. Actually this is a nice use case of the defer statement:
func Crawl(url string, depth int, fetcher Fetcher, ret chan string) {
defer close(ret)
if depth <= 0 {
return
}
body, urls, err := fetcher.Fetch(url)
if err != nil {
ret <- err.Error()
return
}
ret <- fmt.Sprintf("found: %s %q", url, body)
result := make([]chan string, len(urls))
for i, u := range urls {
result[i] = make(chan string)
go Crawl(u, depth-1, fetcher, result[i])
}
for i := range result {
for s := range result[i] {
ret <- s
}
}
return
}
func main() {
result := make(chan string)
go Crawl("http://golang.org/", 4, fetcher, result)
for s := range result {
fmt.Println(s)
}
}
The essential difference to your code is that every instance of Crawl gets its own return channel and the caller function collects the results in its return channel.
Here's my solution. I have a "master" routine that listens to a channel of urls and starts new crawling routine (which puts crawled urls into the channel) if it finds new urls to crawl.
Instead of explicitly closing the channel, I have a counter for unfinished crawling goroutines, and when the counter is 0, the program exits because it has nothing to wait for.
func doCrawl(url string, fetcher Fetcher, results chan []string) {
body, urls, err := fetcher.Fetch(url)
results <- urls
if err != nil {
fmt.Println(err)
} else {
fmt.Printf("found: %s %q
", url, body)
}
}
func Crawl(url string, depth int, fetcher Fetcher) {
results := make(chan []string)
crawled := make(map[string]bool)
go doCrawl(url, fetcher, results)
// counter for unfinished crawling goroutines
toWait := 1
for urls := range results {
toWait--
for _, u := range urls {
if !crawled[u] {
crawled[u] = true
go doCrawl(u, fetcher, results)
toWait++
}
}
if toWait == 0 {
break
}
}
}
I went with a completely different direction with this one. I might have been mislead by the tip about using a map.
// SafeUrlMap is safe to use concurrently.
type SafeUrlMap struct {
v map[string]string
mux sync.Mutex
}
func (c *SafeUrlMap) Set(key string, body string) {
c.mux.Lock()
// Lock so only one goroutine at a time can access the map c.v.
c.v[key] = body
c.mux.Unlock()
}
// Value returns mapped value for the given key.
func (c *SafeUrlMap) Value(key string) (string, bool) {
c.mux.Lock()
// Lock so only one goroutine at a time can access the map c.v.
defer c.mux.Unlock()
val, ok := c.v[key]
return val, ok
}
// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(url string, depth int, fetcher Fetcher, urlMap SafeUrlMap) {
defer wg.Done()
urlMap.Set(url, body)
if depth <= 0 {
return
}
body, urls, err := fetcher.Fetch(url)
if err != nil {
fmt.Println(err)
return
}
for _, u := range urls {
if _, ok := urlMap.Value(u); !ok {
wg.Add(1)
go Crawl(u, depth-1, fetcher, urlMap)
}
}
return
}
var wg sync.WaitGroup
func main() {
urlMap := SafeUrlMap{v: make(map[string]string)}
wg.Add(1)
go Crawl("http://golang.org/", 4, fetcher, urlMap)
wg.Wait()
for url := range urlMap.v {
body, _ := urlMap.Value(url)
fmt.Printf("found: %s %q
", url, body)
}
}
I have implemented it with a simple channel, where all the goroutines send their messages. To ensure that it is closed when there is no more goroutines I use a safe counter, that close the channel when the counter is 0.
type Msg struct {
url string
body string
}
type SafeCounter struct {
v int
mux sync.Mutex
}
func (c *SafeCounter) inc() {
c.mux.Lock()
defer c.mux.Unlock()
c.v++
}
func (c *SafeCounter) dec(ch chan Msg) {
c.mux.Lock()
defer c.mux.Unlock()
c.v--
if c.v == 0 {
close(ch)
}
}
var goes SafeCounter = SafeCounter{v: 0}
// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(url string, depth int, fetcher Fetcher, ch chan Msg) {
defer goes.dec(ch)
// TODO: Fetch URLs in parallel.
// TODO: Don't fetch the same URL twice.
// This implementation doesn't do either:
if depth <= 0 {
return
}
if !cache.existsAndRegister(url) {
body, urls, err := fetcher.Fetch(url)
if err != nil {
fmt.Println(err)
return
}
ch <- Msg{url, body}
for _, u := range urls {
goes.inc()
go Crawl(u, depth-1, fetcher, ch)
}
}
return
}
func main() {
ch := make(chan Msg, 100)
goes.inc()
go Crawl("http://golang.org/", 4, fetcher, ch)
for m := range ch {
fmt.Printf("found: %s %q
", m.url, m.body)
}
}
Note that the safe counter must be incremented outside of the goroutine.
I use slice to avoid crawl the url twice,the recursive version without the concurrency is ok, but not sure about this concurrency version.
func Crawl(url string, depth int, fetcher Fetcher) {
var str_arrs []string
var mux sync.Mutex
var crawl func(string, int)
crawl = func(url string, depth int) {
if depth <= 0 {
return
}
mux.Lock()
for _, v := range str_arrs {
if url == v {
mux.Unlock()
return
}
}
str_arrs = append(str_arrs, url)
mux.Unlock()
body, urls, err := fetcher.Fetch(url)
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("found: %s %q
", url, body)
for _, u := range urls {
go crawl(u, depth-1) // could delete “go” then it is recursive
}
}
crawl(url, depth)
return
}
func main() {
Crawl("http://golang.org/", 4, fetcher)
}
O(1) time lookup of url on map instead of O(n) lookup on slice of all urls visited should help minimize time spent inside of the critical section, which is a trivial amount of time for this example but would become relevant with scale.
WaitGroup used to prevent top level Crawl() function from returning until all child go routines are complete.
func Crawl(url string, depth int, fetcher Fetcher) {
var str_map = make(map[string]bool)
var mux sync.Mutex
var wg sync.WaitGroup
var crawler func(string,int)
crawler = func(url string, depth int) {
defer wg.Done()
if depth <= 0 {
return
}
mux.Lock()
if _, ok := str_map[url]; ok {
mux.Unlock()
return;
}else{
str_map[url] = true
mux.Unlock()
}
body, urls, err := fetcher.Fetch(url)
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("found: %s %q %q
", url, body, urls)
for _, u := range urls {
wg.Add(1)
go crawler(u, depth-1)
}
}
wg.Add(1)
crawler(url,depth)
wg.Wait()
}
func main() {
Crawl("http://golang.org/", 4, fetcher)
}
Here's my solution, using sync.WaitGroup and a SafeCache of fetched urls:
package main
import (
"fmt"
"sync"
)
type Fetcher interface {
// Fetch returns the body of URL and
// a slice of URLs found on that page.
Fetch(url string) (body string, urls []string, err error)
}
// Safe to use concurrently
type SafeCache struct {
fetched map[string]string
mux sync.Mutex
}
func (c *SafeCache) Add(url, body string) {
c.mux.Lock()
defer c.mux.Unlock()
if _, ok := c.fetched[url]; !ok {
c.fetched[url] = body
}
}
func (c *SafeCache) Contains(url string) bool {
c.mux.Lock()
defer c.mux.Unlock()
_, ok := c.fetched[url]
return ok
}
// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(url string, depth int, fetcher Fetcher, cache SafeCache,
wg *sync.WaitGroup) {
defer wg.Done()
if depth <= 0 {
return
}
body, urls, err := fetcher.Fetch(url)
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("found: %s %q
", url, body)
cache.Add(url, body)
for _, u := range urls {
if !cache.Contains(u) {
wg.Add(1)
go Crawl(u, depth-1, fetcher, cache, wg)
}
}
return
}
func main() {
cache := SafeCache{fetched: make(map[string]string)}
var wg sync.WaitGroup
wg.Add(1)
Crawl("http://golang.org/", 4, fetcher, cache, &wg)
wg.Wait()
}
Below is a simple solution for parallelization using only sync waitGroup.
var fetchedUrlMap = make(map[string]bool)
var mutex sync.Mutex
func Crawl(url string, depth int, fetcher Fetcher) {
//fmt.Println("In Crawl2 with url" , url)
if _, ok := fetchedUrlMap[url]; ok {
return
}
if depth <= 0 {
return
}
body, urls, err := fetcher.Fetch(url)
mutex.Lock()
fetchedUrlMap[url] = true
mutex.Unlock()
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("found: %s %q
", url, body)
var wg sync.WaitGroup
for _, u := range urls {
// fmt.Println("Solving for ", u)
wg.Add(1)
go func(uv string) {
Crawl(uv, depth-1, fetcher)
wg.Done()
}(u)
}
wg.Wait()
}
</div>
Similar idea to the accepted answer, but with no duplicate URLs fetched, and printing directly to console. defer() is not used either. We use channels to signal when goroutines complete. The SafeMap idea is lifted off the SafeCounter given previously in the tour.
For the child goroutines, we create an array of channels, and wait until every child returns, by waiting on the channel.
package main
import (
"fmt"
"sync"
)
// SafeMap is safe to use concurrently.
type SafeMap struct {
v map[string] bool
mux sync.Mutex
}
// SetVal sets the value for the given key.
func (m *SafeMap) SetVal(key string, val bool) {
m.mux.Lock()
// Lock so only one goroutine at a time can access the map c.v.
m.v[key] = val
m.mux.Unlock()
}
// Value returns the current value of the counter for the given key.
func (m *SafeMap) GetVal(key string) bool {
m.mux.Lock()
// Lock so only one goroutine at a time can access the map c.v.
defer m.mux.Unlock()
return m.v[key]
}
type Fetcher interface {
// Fetch returns the body of URL and
// a slice of URLs found on that page.
Fetch(url string) (body string, urls []string, err error)
}
// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(url string, depth int, fetcher Fetcher, status chan bool, urlMap SafeMap) {
// Check if we fetched this url previously.
if ok := urlMap.GetVal(url); ok {
//fmt.Println("Already fetched url!")
status <- true
return
}
// Marking this url as fetched already.
urlMap.SetVal(url, true)
if depth <= 0 {
status <- false
return
}
body, urls, err := fetcher.Fetch(url)
if err != nil {
fmt.Println(err)
status <- false
return
}
fmt.Printf("found: %s %q
", url, body)
statuses := make ([]chan bool, len(urls))
for index, u := range urls {
statuses[index] = make (chan bool)
go Crawl(u, depth-1, fetcher, statuses[index], urlMap)
}
// Wait for child goroutines.
for _, childstatus := range(statuses) {
<- childstatus
}
// And now this goroutine can finish.
status <- true
return
}
func main() {
urlMap := SafeMap{v: make(map[string] bool)}
status := make(chan bool)
go Crawl("https://golang.org/", 4, fetcher, status, urlMap)
<- status
}
Below is my solution. Except the global map, I only had to change the contents of Crawl
. Like other solutions, I used sync.Map
and sync.WaitGroup
. I've blocked out the important parts.
var m sync.Map
// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(url string, depth int, fetcher Fetcher) {
// This implementation doesn't do either:
if depth <= 0 {
return
}
// Don't fetch the same URL twice.
/////////////////////////////////////
_, ok := m.LoadOrStore(url, url) //
if ok { //
return //
} //
/////////////////////////////////////
body, urls, err := fetcher.Fetch(url)
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("found: %s %q
", url, body)
// Fetch URLs in parallel.
/////////////////////////////////////
var wg sync.WaitGroup //
defer wg.Wait() //
for _, u := range urls { //
wg.Add(1) //
go func(u string) { //
defer wg.Done() //
Crawl(u, depth-1, fetcher) //
}(u) //
} //
/////////////////////////////////////
return
}
I think using a map (the same way we could use a set in other languages) and a mutex is the easiest approach:
func Crawl(url string, depth int, fetcher Fetcher) {
mux.Lock()
defer mux.Unlock()
if depth <= 0 || IsVisited(url) {
return
}
visit[url] = true
body, urls, err := fetcher.Fetch(url)
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("found: %s %q
", url, body)
for _, u := range urls {
//
go Crawl(u, depth-1, fetcher)
}
return
}
func IsVisited(s string) bool {
_, ok := visit[s]
return ok
}
var mux sync.Mutex
var visit = make(map[string]bool)
func main() {
Crawl("https://golang.org/", 4, fetcher)
time.Sleep(time.Second)
}