Is there a buffered version of io.Pipe (either in the standard library or a third-party library) before I roll my own?
Context: I'm trying to use this solution for parsing of JSON data compressed with bzip2, so that the decompression and parsing happen in parallel, but finding that the speed-up is very small. Parsing uncompressed data takes ~22 sec per million records. Decompressing that much data takes about the same time. Doing them on a single thread takes ~44 seconds, as expected. Using the solution above takes ~41 seconds.
The documentation for io.Pipe says:
Reads and Writes on the pipe are matched one to one except when multiple Reads are needed to consume a single Write. That is, each Write to the PipeWriter blocks until it has satisfied one or more Reads from the PipeReader that fully consume the written data. The data is copied directly from the Write to the corresponding Read (or Reads); there is no internal buffering.
I suspect this could be a problem, depending on the way the bzip2 decompressor writes data and the way the JSON parser reads it, so I'd like to try a buffered version.
That's what the bufio
package is for. It lets you turn any io.Reader
into a buffered reader with NewReader
, or any io.Writer
into a buffered writer with NewWriter
.
(Whether buffered IO will actually help with your specific problem, I have no idea...)
You can use a buffered pipe from https://github.com/djherbis/nio like this:
import (
"github.com/djherbis/buffer"
"github.com/djherbis/nio"
)
...
b := buffer.New(32*1024)
pr, pw := nio.Pipe(b)
...
If you want async operation, where reads are done on a separate goroutine you can also use a readahead
I made a few years ago:
https://github.com/klauspost/readahead
Example:
ra := readahead.NewReader(input)
defer r.Close()
pr, pw := nio.Pipe(ra)
With default settings It will read up to four 1MB buffers that will be sent as the pipe is ready for it.
My https://github.com/acomagu/bufpipe has simple interface and works well.
r, w := bufpipe.New(nil)
io.WriteString(w, "abc") // No blocking.
io.WriteString(w, "def") // No blocking, too.
w.Close()
io.Copy(os.Stdout, r)
// Output: abcdef