GoLang:在goroutine上解压缩bz2,在其他goroutine中使用

I am a new-grad SWE learning Go (and loving it).

I am building a parser for Wikipedia dump files - basically a huge bzip2-compressed XML file (~50GB uncompressed).

I want to do both streaming decompression and parsing, which sounds simple enough. For decompression, I do:

inputFilePath := flag.Arg(0) inputReader := bzip2.NewReader(inputFile)

And then pass the reader to the XML parser:

decoder := xml.NewDecoder(inputFile)

However, since both decompressing and parsing are expensive operations, I would like to have them run on separate Go routines to make use of additional cores. How would I go about doing this in Go?

The only thing I can think of is wrapping the file in a chan []byte, and implementing the io.Reader interface, but I presume there might be a built way (and cleaner) way of doing it.

Has anyone ever done something like this?

Thanks! Manuel

You can use io.Pipe, then use io.Copy to push the decompressed data into the pipe, and read it in another goroutine:

package main

import (
    "bytes"
    "encoding/json"
    "fmt"
    "io"
    "sync"
)

func main() {

    rawJson := []byte(`{
            "Foo": {
                "Bar": "Baz"
            }
        }`)

    bzip2Reader := bytes.NewReader(rawJson) // this stands in for the bzip2.NewReader

    var wg sync.WaitGroup
    wg.Add(2)

    r, w := io.Pipe()

    go func() {
        // write everything into the pipe. Decompression happens in this goroutine.
        io.Copy(w, bzip2Reader)
        w.Close()
        wg.Done()
    }()

    decoder := json.NewDecoder(r)

    go func() {
        for {
            t, err := decoder.Token()
            if err != nil {
                break
            }
            fmt.Println(t)
        }
        wg.Done()
    }()

    wg.Wait()
}

http://play.golang.org/p/fXLnfnaWYA

An easy solution is to use a readahead package I created some time back: https://github.com/klauspost/readahead

inputReader := bzip2.NewReader(inputFile)
ra := readahead.NewReader(input)
defer ra.Close()

And then pass the reader to the XML parser:

decoder := xml.NewDecoder(ra)

With default settings it will decode up to 4MB ahead of time in 4 buffers.