Goroutines分享片::试图了解数据竞赛

I try to make a program in Go to find some genes in very large files of DNA sequences. I already made a Perl program to do that but I would like to take advantage of the goroutines to perform this search in parallel ;)

Because the files are huge, my idea was to read 100 sequences at a time, then send the analysis to a goroutine, and read again 100 sequences etc.

I would like to thank the member of this site for their really helpful explanations concerning slices and goroutines.

I have made the suggested change, to use a copy of the slice processed by the goroutines. But the -race execution still detect one data race at the level of the copy() function :

Thank you very much for your comments !

    ==================
WARNING: DATA RACE
Read by goroutine 6:
  runtime.slicecopy()
      /usr/lib/go-1.6/src/runtime/slice.go:113 +0x0
  main.main.func1()
      test_chan006.go:71 +0xd8

Previous write by main goroutine:
  main.main()
      test_chan006.go:63 +0x3b7

Goroutine 6 (running) created at:
  main.main()
      test_chan006.go:73 +0x4c9
==================
[>5HSAA098909 BA098909 ...]
Found 1 data race(s)
exit status 66

    line 71 is : copy(bufCopy, buf_Seq)
    line 63 is : buf_Seq = append(buf_Seq, line)
    line 73 is :}(genes, buf_Seq)




    package main

import (
    "bufio"
    "fmt"
    "os"
    "github.com/mathpl/golang-pkg-pcre/src/pkg/pcre"
    "sync"
)

// function read a list of genes and return a slice of gene names
func read_genes(filename string) []string {
    var genes []string // slice of genes names
    // Open the file.
    f, _ := os.Open(filename)
    // Create a new Scanner for the file.
    scanner := bufio.NewScanner(f)
    // Loop over all lines in the file and print them.
    for scanner.Scan() {
          line := scanner.Text()
        genes = append(genes, line)
    }
    return genes
}

// function find the sequences with a gene matching gene[] slice
func search_gene2( genes []string, seqs []string) ([]string) {
  var res []string

  for r := 0 ; r <= len(seqs) - 1; r++ {
    for i := 0 ; i <= len(genes) - 1; i++ {

      match := pcre.MustCompile(genes[i], 0).MatcherString(seqs[r], 0)

      if (match.Matches() == true) {
          res = append( res, seqs[r])           // is the gene matches the gene name is append to res
          break
      }
    }
  }

  return res
}
//###########################################

func main() {
    var slice []string
    var buf_Seq []string
    read_buff := 100    // the number of sequences analysed by one goroutine

    var wg sync.WaitGroup
    queue := make(chan []string, 100)

    filename := "fasta/sequences.tsv"
    f, _ := os.Open(filename)
    scanner := bufio.NewScanner(f)
    n := 0
    genes := read_genes("lists/genes.csv")

    for scanner.Scan() {
            line := scanner.Text()
            n += 1
            buf_Seq = append(buf_Seq, line) // store the sequences into buf_Seq
            if n == read_buff {   // when the read buffer contains 100 sequences one goroutine analyses them

          wg.Add(1)

          go func(genes, buf_Seq []string) {
            defer wg.Done()
                        bufCopy := make([]string, len(buf_Seq))
                        copy(bufCopy, buf_Seq)
            queue <- search_gene2( genes, bufCopy)
            }(genes, buf_Seq)
                        buf_Seq = buf_Seq[:0]   // reset buf_Seq
              n = 0 // reset the sequences counter

        }
    }
    go func() {
            wg.Wait()
            close(queue)
        }()

        for t := range queue {
            slice = append(slice, t...)
        }

        fmt.Println(slice)
}

The data race exists because slices are reference types in Go. They are generally passed by value, but being reference types, any changes made to the one value is reflected in another. Consider:

func f(xs []string) {
    xs[0] = "changed_in_f"
}

func main() {
    xs := []string{"set_in_ main", "asd"}
    fmt.Println("Before call:", xs)
    f(xs)
    fmt.Println("After call:", xs)

    var ys []string
    ys = xs
    ys[0] = "changed_through_ys"
    fmt.Println("After ys:", xs)

}

This prints:

Before call: [set_in_main asd]
After call: [changed_in_f asd]
After ys: [changed_through_ys asd]

This happens because all three slices share the same underlying array in memory. More details here.

This is what might be happening when you pass buf_Seq to search_gene2. A new slice value is passed to each call, however, each slice value may be referring to the same underlying array, causing potential race condition (call to append may change the underlying array of a slice).

To solve the problem, try this in your main:

bufCopy := make([]string, len(buf_Seq))
// make a copy of buf_Seq in an entirely separate slice
copy(buffCopy, buf_Seq)
go func(genes, buf_Seq []string) {
        defer wg.Done()
        queue <- search_gene2( genes, bufCopy)
    }(genes, buf_Seq)
}

The goroutines are only working on copies of the slice headers, the underlying arrays are the same. To make a copy of a slice, you need to use copy (or append to a different slice).

buf_Seq = append(buf_Seq, line)
bufCopy := make([]string, len(buf_Seq))
copy(bufCopy, buf_Seq)

You can then safely pass bufCopy to the goroutines, or simply use it directly in the closure.

The slices are indeed copies, but slices themselves are reference types. A slice, fundamentally, is a 3-word structure. It contains a pointer to the start of an underlying array, an integer denoting the current number of elements in the slice, and another integer denoting the capacity of the underlying array. When you pass a slice into a function, a copy is made of this slice "header" structure, but the header still refers to the same underlying array as the header that was passed in.

This means any changes you make to the slice header itself, like sub-slicing it, appending to it enough to trigger a resize (and thus a reallocation to a new location, with a new start pointer), etc will only be reflected in the slice header inside that function. Any changes to the underlying data itself, however, will be reflected even in the slice outside the function (unless you triggered a reallocation due by growing the slice past capacity).

Example: https://play.golang.org/p/a2y5eGulXW

I think this is the idiomatic Go (for this job):
A code is worth a thousand comments:

genes = readGenes("lists/genes.csv") // read the gene list
n := runtime.NumCPU()                // the number of goroutines
wg.Add(n + 1)
go scan() // read the "fasta/sequences.tsv"
for i := 0; i < n; i++ {
    go search()
}
go WaitClose()
slice := []string{}
for t := range queue {
    slice = append(slice, t)
}
fmt.Println(slice)

scan() reads the "fasta/sequences.tsv" to this channel: var ch = make(chan string, 100) concurrently and search() is CPU intensive goroutine so for performance reason the number of goroutines limited to NumCPU.

Try this working sample code (simulated and tested):

package main

import (
    "bufio"
    "fmt"
    //"os"
    "runtime"
    "strings"
    "sync"
    //"github.com/mathpl/golang-pkg-pcre/src/pkg/pcre"
)

func main() {
    genes = readGenes("lists/genes.csv") // read the gene list
    n := runtime.NumCPU()                // the number of goroutines
    wg.Add(n + 1)
    go scan() // read the "fasta/sequences.tsv"
    for i := 0; i < n; i++ {
        go search()
    }
    go WaitClose()
    slice := []string{}
    for t := range queue {
        slice = append(slice, t)
    }
    fmt.Println(slice)
}

var wg sync.WaitGroup
var genes []string
var ch = make(chan string, 100)
var queue = make(chan string, 100)

func scan() {
    defer wg.Done()
    defer close(ch)
    scanner := bufio.NewScanner(strings.NewReader(strings.Join([]string{"A2", "B2", "C2", "D2", "E2", "F2", "G2", "H2", "I2"}, "
")))
    /*f, err := os.Open("fasta/sequences.tsv")
    if err != nil {
        panic(err)
    }
    defer f.Close()
     scanner := bufio.NewScanner(f)*/
    for scanner.Scan() {
        ch <- scanner.Text()
    }
}

func match(pattern, seq string) bool {
    //return pcre.MustCompile(pattern, 0).MatcherString(seq, 0).Matches()
    return pattern[0] == seq[0]
}

func search() {
    defer wg.Done()
    for seq := range ch {
        for _, gene := range genes {
            if match(gene, seq) {
                queue <- seq
                break
            }
        }
    }
}

func WaitClose() {
    wg.Wait()
    close(queue)
}

// function read a list of genes and return a slice of gene names.
func readGenes(filename string) []string {
    return []string{"A1", "B1", "C1", "D1", "E1", "F1", "G1", "H1", "I1"}
    /*var genes []string // slice of genes names
    f, err := os.Open(filename)
    if err != nil {
        panic(err)
    }
    defer f.Close()
    scanner := bufio.NewScanner(f)
    for scanner.Scan() {
        line := scanner.Text()
        genes = append(genes, line)
    }
    return genes*/
}

Output:

[A2 B2 C2 D2 E2 F2 G2 H2 I2]

I hope this could help for your real case (comments switched in that code, not tested):

package main

import (
    "bufio"
    "fmt"
    "os"
    "runtime"
    //"strings"
    "sync"

    "github.com/mathpl/golang-pkg-pcre/src/pkg/pcre"
    //pcre "regexp"
)

func main() {
    genes = readGenes("lists/genes.csv") // read the gene list
    n := runtime.NumCPU()                // the number of goroutines
    wg.Add(n + 1)
    go scan() // read the "fasta/sequences.tsv"
    for i := 0; i < n; i++ {
        go search()
    }
    go WaitClose()
    slice := []string{}
    for t := range queue {
        slice = append(slice, t)
    }
    fmt.Println(slice)
}

var wg sync.WaitGroup
var genes []string
var ch = make(chan string, 100)
var queue = make(chan string, 100)

func scan() {
    defer wg.Done()
    defer close(ch)
    //scanner := bufio.NewScanner(strings.NewReader(strings.Join([]string{"A2", "B2", "C2", "D2", "E2", "F2", "G2", "H2", "I2"}, "
")))
    f, err := os.Open("fasta/sequences.tsv")
    if err != nil {
        panic(err)
    }
    defer f.Close()
    scanner := bufio.NewScanner(f)
    for scanner.Scan() {
        ch <- scanner.Text()
    }
}

func match(pattern, seq string) bool {
    return pcre.MustCompile(pattern, 0).MatcherString(seq, 0).Matches()
    //return pattern[0] == seq[0]
    //return pcre.MustCompile(pattern).Match([]byte(seq))
}

func search() {
    defer wg.Done()
    for seq := range ch {
        for _, gene := range genes {
            if match(gene, seq) {
                queue <- seq
                break
            }
        }
    }
}

func WaitClose() {
    wg.Wait()
    close(queue)
}

// function read a list of genes and return a slice of gene names.
func readGenes(filename string) []string {
    //return []string{"A1", "B1", "C1", "D1", "E1", "F1", "G1", "H1", "I1"}
    var genes []string // slice of genes names
    f, err := os.Open(filename)
    if err != nil {
        panic(err)
    }
    defer f.Close()
    scanner := bufio.NewScanner(f)
    for scanner.Scan() {
        line := scanner.Text()
        genes = append(genes, line)
    }
    return genes
}

problems of your code:
1- in read_genes(filename string) []string you should check for errors:

f, err := os.Open(filename)
if err!=nil{
    panic(err)
}

2- in read_genes(filename string) []string Close Opened file:

defer f.Close()

3- Afterfilename := "fasta/sequences.tsv" you should check for errors:

f, err := os.Open(filename)
if err!=nil{
    panic(err)
}

4- Afterfilename := "fasta/sequences.tsv" Close Opened file:

defer f.Close()

5- Inside for scanner.Scan() { if this file fasta/sequences.tsv does not contain multiples of 100 lines if n == read_buff { will not succeeds for last slice and you will miss it.

6- How many CPU cores do you have? You should limit the number of goroutines.
7- your main question:
I made a Minimal, Complete, and Verifiable example (still problem 5 exists):

package main

import (
    "bufio"
    "fmt"
    "strings"
    "sync"
)

func match(pattern, str string) bool {
    return pattern[0] == str[0]
}
func search_gene2(genes, seqs []string) (res []string) {
    for _, r := range seqs {
        for _, i := range genes {
            if match(i, r) {
                res = append(res, r) // is the gene matches the gene name is append to res
                break
            }
        }
    }
    return
}

func main() {
    read_buff := 2 // the number of sequences analysed by one goroutine
    var wg sync.WaitGroup
    queue := make(chan []string, read_buff)
    genes := []string{"A1", "B1", "C1", "D1", "E1", "F1", "G1", "H1", "I1"}
    sequences := strings.Join([]string{"A2", "B2", "C2", "D2", "E2", "F2", "G2", "H2", "I2"}, "
")
    scanner := bufio.NewScanner(strings.NewReader(sequences))
    buf_Seq := make([]string, 0, read_buff)
    for n := 1; scanner.Scan(); n++ {
        line := scanner.Text()
        buf_Seq = append(buf_Seq, line) // store the sequences into buf_Seq
        if n == read_buff {             // when the read buffer contains 100 sequences one goroutine analyses them
            wg.Add(1)
            temp := make([]string, n)
            copy(temp, buf_Seq)
            buf_Seq = buf_Seq[:0] // reset buf_Seq
            n = 0                 // reset the sequences counter
            go func(genes, Seq []string) {
                defer wg.Done()
                fmt.Println(Seq)
                queue <- search_gene2(genes, Seq)
            }(genes, temp)
        }
    }
    go func() {
        wg.Wait()
        close(queue)
    }()
    slice := []string{}
    for t := range queue {
        slice = append(slice, t...)
    }
    fmt.Println(slice)
}

output (5: I2 ?):

[A2 B2]
[C2 D2]
[E2 F2]
[G2 H2]
[A2 B2 C2 D2 E2 F2 G2 H2]

here is the solution to your main question (make a new slice and copy all data ):

temp := make([]string, n)
copy(temp, buf_Seq)
buf_Seq = buf_Seq[:0] // reset buf_Seq
n = 0                 // reset the sequences counter
go func(genes, Seq []string) {
    defer wg.Done()
    fmt.Println(Seq)
    queue <- search_gene2(genes, Seq)
}(genes, temp)

cause of this:
Found 1 data race(s) exit status 66

    line 71 is : copy(bufCopy, buf_Seq)
    line 63 is : buf_Seq = append(buf_Seq, line)
    line 73 is :}(genes, buf_Seq)

As other answers said: you shared same underlying array of slice with all goroutines.

I hope this helps.