处理大型csv文件并限制goroutines

I'm trying to find the best efficient way to read a csv file (~1M row). Each row contain a HTTP link to an image which I need to download.

This is my current code using worker pools:

func worker(queue chan []string, worknumber int, done, ks chan bool) {
    for true {
        select {
        case url := <-queue:
            fmt.Println("doing work!", url, "worknumber", worknumber)
            processData(url) // HTTP download
            done <- true
        case <-ks:
            fmt.Println("worker halted, number", worknumber)
            return
        }
    }
}

func main() {
    start := time.Now()
    flag.Parse()
    fmt.Print(strings.Join(flag.Args(), "
"))
    if *filename == "REQUIRED" {
        return
    }

    csvfile, err := os.Open(*filename)
    if err != nil {
        fmt.Println(err)
        return
    }
    count, _ := lineCounter(csvfile)
    fmt.Printf("Total count: %d
", count)
    csvfile.Seek(0, 0)

    defer csvfile.Close()

    //bar := pb.StartNew(count)
    bar := progressbar.NewOptions(count)
    bar.RenderBlank()

    reader := csv.NewReader(csvfile)

    //channel for terminating the workers
    killsignal := make(chan bool)

    //queue of jobs
    q := make(chan []string)
    // done channel takes the result of the job
    done := make(chan bool)

    numberOfWorkers := *numChannels
    for i := 0; i < numberOfWorkers; i++ {
        go worker(q, i, done, killsignal)
    }

    i := 0
    for {
        record, err := reader.Read()
        if err == io.EOF {
            break
        } else if err != nil {
            fmt.Println(err)
            return
        }
        i++

        go func(r []string, i int) {
            q <- r
            bar.Add(1)
        }(record, i)
    }

    // a deadlock occurs if c >= numberOfJobs
    for c := 0; c < count; c++ {
        <-done
    }

    fmt.Println("finished")

    // cleaning workers
    close(killsignal)
    time.Sleep(2 * time.Second)

    fmt.Printf("
%2fs", time.Since(start).Seconds())
}

My issue here is that it opens a lot of goroutines, use all the memory and crash.

What would be the best way to limit it?

I striped out the progress bar as i did not want to bother about it, but overall this is closer to what you are looking for.

It does not genuinely handle errors, they simply fail in a fatal state.

I have added context and cancellation support.

You might want to check for https://godoc.org/golang.org/x/sync/errgroup#Group.Go

As a general recommentation, you need to learn the golang patterns and their usage.

It is obvious you have not worked that enough, or that you are in process of learning.

Its not the fastest program at all, but it does the job.

This is only a draft to get you back on a better direction.

package main

import (
    "context"
    "encoding/csv"
    "flag"
    "fmt"
    "io"
    "log"
    "os"
    "os/signal"
    "sync"
    "time"
)

func worker(ctx context.Context, dst chan string, src chan []string) {
    for {
        select {
        case url, ok := <-src: // you must check for readable state of the channel.
            if !ok {
                return
            }
            dst <- fmt.Sprintf("out of %v", url) // do somethingg useful.
        case <-ctx.Done(): // if the context is cancelled, quit.
            return
        }
    }
}

func main() {

    // create a context
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    // that cancels at ctrl+C
    go onSignal(os.Interrupt, cancel)

    // parse command line arguments
    var filename string
    var numberOfWorkers int
    flag.StringVar(&filename, "filename", "", "src file")
    flag.IntVar(&numberOfWorkers, "c", 2, "concurrent workers")
    flag.Parse()

    // check arguments
    if filename == "" {
        log.Fatal("filename required")
    }

    start := time.Now()

    csvfile, err := os.Open(filename)
    if err != nil {
        log.Fatal(err)
    }
    defer csvfile.Close()

    reader := csv.NewReader(csvfile)

    // create the pair of input/output channels for the controller=>workers com.
    src := make(chan []string)
    out := make(chan string)

    // use a waitgroup to manage synchronization
    var wg sync.WaitGroup

    // declare the workers
    for i := 0; i < numberOfWorkers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            worker(ctx, out, src)
        }()
    }

    // read the csv and write it to src
    go func() {
        for {
            record, err := reader.Read()
            if err == io.EOF {
                break
            } else if err != nil {
                log.Fatal(err)
            }
            src <- record // you might select on ctx.Done().
        }
        close(src) // close src to signal workers that no more job are incoming.
    }()

    // wait for worker group to finish and close out
    go func() {
        wg.Wait() // wait for writers to quit.
        close(out) // when you close(out) it breaks the below loop.
    }()

    // drain the output
    for res := range out {
        fmt.Println(res)
    }

    fmt.Printf("
%2fs", time.Since(start).Seconds())
}

func onSignal(s os.Signal, h func()) {
    c := make(chan os.Signal, 1)
    signal.Notify(c, s)
    <-c
    h()
}

You're creating a new goroutine for every line in the file. That's why. There's no reason to do that, if you already have the workers you need.

So in short, change this:

    go func(r []string, i int) {
        q <- r
        bar.Add(1)
    }(record, i)

to this:

    q <- record
    bar.Add(1)

buffered channel can help you to limit goroutines

var taskPipe = make(chan interface{}, 5)

func main(){
    go func() {
        taskPipe <- nil
        sleep
    }()
}

func sleep() {
    time.Sleep(time.Second * 5)
    <- taskPipe
}