如何等待执行

I have a large log file that you want to analyze in parallel.

I have this code:

package main

import (
    "bufio"
    "fmt"
    "os"
    "time"
)

func main() {
    filename := "log.txt"
    threads := 10

    // Read the  file
    file, err := os.Open(filename)
    if err != nil {
        fmt.Println("Could not open file with the database.")
        os.Exit(1)
    }
    defer file.Close()

    scanner := bufio.NewScanner(file)

    // Channel for strings
    tasks := make(chan string)

    // Run the threads that catch events from the channel and understand one line of the log file
    for i := 0; i < threads; i++ {
        go parseStrings(tasks)
    }

    // Start a thread load lines from a file into the channel
    go getStrings(scanner, tasks)

    // At this point I have to wait until all of the threads executed
    // For example, I set the sleep
    for {
        time.Sleep(1 * time.Second)
    }
}

func getStrings(scanner *bufio.Scanner, tasks chan<- string) {
    for scanner.Scan() {
        s := scanner.Text()
        tasks <- s
    }
}

func parseStrings(tasks <-chan string) {
    for {
        s := <-tasks
        event := parseLine(s)
        fmt.Println(event)
    }
}

func parseLine(line string) string {
    return line
}

Actually, as I wait for the end of all threads? I was advised to create a separate thread in which I'll add the result of, but how to add?

Using the pipeline pattern, and the "fan out / fan in" pattern:

package main

import (
    "bufio"
    "fmt"
    "strings"
    "sync"
)

func main() {
    file := "here is first line
" +
        "here is second line
" +
        "here is line 3
" +
        "here is line 4
" +
        "here is line 5
" +
        "here is line 6
" +
        "here is line 7
"
    scanner := bufio.NewScanner(strings.NewReader(file))

    // all lines onto one channel
    in := getStrings(scanner)

    // FAN OUT
    // Multiple functions reading from the same channel until that channel is closed
    // Distribute work across multiple functions (ten goroutines) that all read from in.
    xc := fanOut(in, 10)

    // FAN IN
    // multiplex multiple channels onto a single channel
    // merge the channels from c0 through c9 onto a single channel
    for n := range merge(xc) {
        fmt.Println(n)
    }
}

func getStrings(scanner *bufio.Scanner) <-chan string {
    out := make(chan string)
    go func() {
        for scanner.Scan() {
            out <- scanner.Text()
        }
        close(out)
    }()
    return out
}

func fanOut(in <-chan string, n int) []<-chan string {
    var xc []<-chan string
    for i := 0; i < n; i++ {
        xc = append(xc, parseStrings(in))
    }
    return xc
}

func parseStrings(in <-chan string) <-chan string {
    out := make(chan string)
    go func() {
        for n := range in {
            out <- parseLine(n)
        }
        close(out)
    }()
    return out
}

func parseLine(line string) string {
    return line
}

func merge(cs []<-chan string) <-chan string {
    var wg sync.WaitGroup
    wg.Add(len(cs))

    out := make(chan string)
    for _, c := range cs {
        go func(c <-chan string) {
            for n := range c {
                out <- n
            }
            wg.Done()
        }(c)
    }

    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

Check it out on the playground.

var wg sync.WaitGroup

When start each goroutine do:

wg.Add(1)

When goroutine work done decrement counter with

wg.Done()

as a result, instead of

for {
    time.Sleep(1 * time.Second)
}

do

 wg.Wait()

Just use the sync.WaitGroup

package main

import(
    "sync"
)

func stuff(wg *sync.WaitGroup) {
    defer wg.Done() // tell the WaitGroup it's done
    /* stuff */
}

func main() {
    count := 50
    wg := new(sync.WaitGroup)
    wg.Add(count) // add number of gorutines to the WaitGroup
    for i := 0; i < count; i++ {
        go stuff(wg)
    }
    wg.Wait() // wait for all
}

Play