处理文件行的有效方法

Im trying to learn some go and I am trying to make a script that reads from a csv file, do some process and take some results.

I am following a pipeline pattern so that a goroutine reads from file per line with scanner, lines are sent to channel and different goroutines consume the channel content.

An example of what I am trying to do: https://gist.github.com/pkulak/93336af9bb9c7207d592

My problem is: In csv file there are lots of records. I want to do some math between consequent lines. Lets say record1 is r1 record2 is r2 and so on.

When I read file I have r1. In the next scanner loop I have r2. I want to see if r1-r2 is a valid number for me. If yes do some math between them. Then check r3 r4 the same way. If r1-r2 is not valid, don't care about r2 and do r1-r3 and so on.

Should I handle this when reading the file, after I put lines of file in channel and handle channel content?

Any suggestion that does not break concurrency?

I think you should determine "is r1-r2 valid numbers for you" inside Read the lines into the work queue function.

So, you should read current line, and read next lines one by one while you don't have valid numbers pair. When you got it -- you will send this pair inside workQueue channel and search for the next pair.

This is your code with changes:

package main

import (
    "bufio"
    "log"
    "os"
    "errors"
)

var concurrency = 100

type Pair struct {
    line1 string
    line2 string
}

func main() {

    // It will be better to receive file-path from somewhere (like args or something like this)
    filePath := "/path/to/file.csv"

    // This channel has no buffer, so it only accepts input when something is ready
    // to take it out. This keeps the reading from getting ahead of the writers.
    workQueue := make(chan Pair)

    // We need to know when everyone is done so we can exit.
    complete := make(chan bool)

    // Read the lines into the work queue.
    go func() {

        file, e := os.Open(filePath)
        if e != nil {
            log.Fatal(e)
        }
        // Close when the function returns
        defer file.Close()

        scanner := bufio.NewScanner(file)

        // Get pairs and send them into "workQueue" channel
        for {
            line1, e := getNextCorrectLine(scanner)
            if e != nil {
                break
            }
            line2, e := getNextCorrectLine(scanner)
            if e != nil {
                break
            }
            workQueue <- Pair{line1, line2}
        }

        // Close the channel so everyone reading from it knows we're done.
        close(workQueue)
    }()

    // Now read them all off, concurrently.
    for i := 0; i < concurrency; i++ {
        go startWorking(workQueue, complete)
    }

    // Wait for everyone to finish.
    for i := 0; i < concurrency; i++ {
        <-complete
    }
}

func getNextCorrectLine(scanner *bufio.Scanner) (string, error) {
    var line string
    for scanner.Scan() {
        line = scanner.Text()
        if isCorrect(line) {
            return line, nil
        }
    }
    return "", errors.New("no more lines")
}

func isCorrect(str string) bool {
    // Make your validation here
    return true
}

func startWorking(pairs <-chan Pair, complete chan<- bool) {
    for pair := range pairs {
        doTheWork(pair)
    }

    // Let the main process know we're done.
    complete <- true
}

func doTheWork(pair Pair) {
    // Do the work with the pair
}