This is a follow up to my earlier post:
http://stackoverflow.com/questions/34736825/goroutine-exit-status-2-what-does-it-mean-why-is-it-happening?noredirect=1#comment57238789_34736825
I'm still having trouble figuring out where the channels should be closed, after reading multiple topics and articles both on and off SO.
This program will open a list of files, create an output file for each input file (with the same name),visit all the urls in each input file and get all href links from these - which are saved to the corresponding output file. However, I'm getting the following error:
http://play.golang.org/p/8X-1rM3aXC
The linkgetter, and getHref functions are mainly for processing. Head and tail are run as separate goroutines, and worker does the processing.
package main
import (
"bufio"
"bytes"
"fmt"
"golang.org/x/net/html"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"path/filepath"
"regexp"
"sync"
)
type Work struct {
Link string
Filename string
}
type Output struct {
Href string
Filename string
}
func getHref(t html.Token) (href string, ok bool) {
// Iterate over all of the Token's attributes until we find an "href"
for _, a := range t.Attr {
if a.Key == "href" {
href = a.Val
ok = true
}
}
return
}
func linkGetter(out chan<- Output, r io.Reader, filename string) {
z := html.NewTokenizer(r)
for {
tt := z.Next()
switch {
case tt == html.ErrorToken:
return
case tt == html.StartTagToken:
t := z.Token()
isAnchor := t.Data == "a"
if !isAnchor {
continue
}
// Extract the href value, if there is one
url, ok := getHref(t)
if !ok {
continue
}
out <- Output{url, filename}
}
}
}
func worker(out chan<- Output, in <-chan Work, wg *sync.WaitGroup) {
defer wg.Done()
for work := range in {
resp, err := http.Get(work.Link)
if err != nil {
continue
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
continue
}
if err = resp.Body.Close(); err != nil {
fmt.Println(err)
}
linkGetter(out, bytes.NewReader(body), work.Filename)
}
}
func head(c chan<- Work) {
r, _ := regexp.Compile("(.*)(?:.json)")
files, _ := filepath.Glob("*.json")
for _, elem := range files {
res := r.FindStringSubmatch(elem)
for k, v := range res {
if k == 0 {
outpath, _ := filepath.Abs(fmt.Sprintf("go_tester/%s", v))
abspath, _ := filepath.Abs(fmt.Sprintf("url_links/%s", v))
f, _ := os.Open(abspath)
scanner := bufio.NewScanner(f)
for scanner.Scan() {
c <- Work{outpath, scanner.Text()}
}
}
}
}
}
func tail(c <-chan Output) {
currentfile := ""
var f *os.File
var err error
for out := range c {
if out.Filename != currentfile {
if err = f.Close(); err != nil { // might cause an error on first run
fmt.Println(err)
}
f, err = os.OpenFile(out.Filename, os.O_APPEND|os.O_WRONLY, 0600)
if err != nil {
log.Fatal(err)
}
currentfile = out.Filename
}
if _, err = f.WriteString(out.Href + "
"); err != nil {
fmt.Println(err)
}
}
}
const (
nworkers = 80
)
func main() {
//fmt.Println("hi")
in := make(chan Work)
out := make(chan Output)
go head(in)
go tail(out)
var wg sync.WaitGroup
for i := 0; i < 85; i++ {
wg.Add(1)
go worker(out, in, &wg)
}
close(in)
close(out)
wg.Wait()
}
What is wrong with the way the channels are closed?
You're not really paying attention to your pipeline design here. You have to ask yourself "When is section X done? What should happen when it is done? What happens after it is done?" for every section of the pipeline.
You start up head
, tail
, and worker
to range over channels. The only way these functions are going to return successfully is if these channels are closed.
Draw it out of you need to.
head(in)
feeds in to in
worker(out, in, &wg)
ranges over in
, feeds into out
, and tells you it is done with wg
once in
is closedtail(out)
ranges over out
So what do you need to do to:
Like so:
in
from head
once it is done processing all of the files.worker
to actually return once all items it can get from in
are processed, causing wg.Wait()
to returnout
since nothing is feeding in to it and this will cause tail
to eventually return.But you'll probably need another sync.WaitGroup
associated with tail
for this particular design because the whole program will exit right when wg.Wait()
returns, thus possibly not finishing all of the work that tail
is doing. See here. Specifically:
Program execution begins by initializing the main package and then invoking the function main. When that function invocation returns, the program exits. It does not wait for other (non-main) goroutines to complete.
You'll probably also want to use buffered channels referenced here to aid in not having switch execution between goroutines so much. With your current design you're wasting a lot of time with context switching.