Suppose that I have the following code to read lines and multiple each line by 2 and print each line out one by one.
I'd like to use N workers. Each worker takes M lines each time and processes them. More importantly, I'd like the output to be printed in the same order as the input. But the example here does not guarantee the output is printed in the same order as the input.
https://gobyexample.com/worker-pools
The following URL also shows some examples. But I don't think they fit my requirement. The problem is that the input can be arbitrarily long. There is no way to hold everything in memory before they are printed. There must be a way to get some output from the workers can determine if the output of a worker is ready to be printed and then it is print. It sounds like there should be a master goroutine to do this. But I am not sure how to implement it most efficiently, as this master gorountine can easily be a bottleneck when N is big.
How to collect values from N goroutines executed in a specific order?
Could anybody show an example program that results from the workers in order and prints the results as early as they can be printed?
$ cat main.go
#!/usr/bin/env gorun
// vim: set noexpandtab tabstop=2:
package main
import (
"bufio"
"fmt"
"strconv"
"io"
"os"
"log"
)
func main() {
stdin := bufio.NewReader(os.Stdin)
for {
line, err := stdin.ReadString('
')
if err == io.EOF {
if len(line) != 0 {
i, _ := strconv.Atoi(line)
fmt.Println(i*2)
}
break
} else if err != nil {
log.Fatal(err)
}
i, _ := strconv.Atoi(line[:(len(line)-1)])
fmt.Println(i*2)
}
}
If workers know the initial order they are e.g. informed about line numbers for example, then let workers preserve that information (just the line numbers). Your workers then feed that information back to your results channel. Your results aggregating code that receives from results channel then orders results based on the initial ordering information before further processing (e.g. printing).
Below is quick modification of one of the examples you show.
package main
import "fmt"
import "time"
type Result struct {
Data, Seq int
}
type Job struct {
Data string
Seq int
}
func worker(id int, jobs <-chan Job, results chan<- Result) {
for j := range jobs {
fmt.Println("worker", id, "started job", j)
time.Sleep(time.Second)
fmt.Println("worker", id, "finished job", j)
results <- Result{len(j.Data), j.Seq}
}
}
func main() {
workload := 5
jobs := make(chan Job, 100)
results := make(chan Result, 100)
output := make([]Result, workload)
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
for j := 0; j < workload; j++ {
jobs <- Job{ // explicit to make it clear
Data: fmt.Sprintf("blah blah blah %d", j),
Seq: j,
}
}
close(jobs)
// receive results
for a := 1; a <= workload; a++ {
res := <-results
output[res.Seq] = res
// uncomment to see unordered
// fmt.Printf("received: %#v", res)
}
for _, out := range output {
fmt.Printf("output %#v
", out)
}
}
BTW: this does not work well if you do not know your workload in advance... In which case your code that receives results needs to be a little smarter in processing part that is already received and ordered (homework) :). Essentially wait for line 0 then wait for next or print what is already received in sequence.
Have fun!