I am a new-grad SWE learning Go (and loving it).
I am building a parser for Wikipedia dump files - basically a huge bzip2-compressed XML file (~50GB uncompressed).
I want to do both streaming decompression and parsing, which sounds simple enough. For decompression, I do:
inputFilePath := flag.Arg(0) inputReader := bzip2.NewReader(inputFile)
And then pass the reader to the XML parser:
decoder := xml.NewDecoder(inputFile)
However, since both decompressing and parsing are expensive operations, I would like to have them run on separate Go routines to make use of additional cores. How would I go about doing this in Go?
The only thing I can think of is wrapping the file in a chan []byte, and implementing the io.Reader interface, but I presume there might be a built way (and cleaner) way of doing it.
Has anyone ever done something like this?
Thanks! Manuel
You can use io.Pipe, then use io.Copy to push the decompressed data into the pipe, and read it in another goroutine:
package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"sync"
)
func main() {
rawJson := []byte(`{
"Foo": {
"Bar": "Baz"
}
}`)
bzip2Reader := bytes.NewReader(rawJson) // this stands in for the bzip2.NewReader
var wg sync.WaitGroup
wg.Add(2)
r, w := io.Pipe()
go func() {
// write everything into the pipe. Decompression happens in this goroutine.
io.Copy(w, bzip2Reader)
w.Close()
wg.Done()
}()
decoder := json.NewDecoder(r)
go func() {
for {
t, err := decoder.Token()
if err != nil {
break
}
fmt.Println(t)
}
wg.Done()
}()
wg.Wait()
}
An easy solution is to use a readahead package I created some time back: https://github.com/klauspost/readahead
inputReader := bzip2.NewReader(inputFile)
ra := readahead.NewReader(input)
defer ra.Close()
And then pass the reader to the XML parser:
decoder := xml.NewDecoder(ra)
With default settings it will decode up to 4MB ahead of time in 4 buffers.