I have a "chan string", where each entry is a CSV log line that I would like to convert to columns "[]string", currently I am (un-efficiently) creating a csv.NewReader(strings.NewReader(i)) for each item, which looks a lot more work than it really needs to be:
for i := range feederChan {
r := csv.NewReader(strings.NewReader(i))
a, err := r.Read()
if err != nil {
// log error...
continue
}
// then do stuff with 'a'
// ...
}
So, I'd really appreciate sharing if there's a more efficient way to do that, like creating the csv.Reader once, then feeding it the chan content somehow (stream 'chan' content to something that implements the 'io.Reader' interface?).
Use the following to convert a channel of strings to a reader:
type chanReader struct {
c chan string
buf string
}
func (r *chanReader) Read(p []byte) (int, error) {
// Fill the buffer when we have no data to return to the caller
if len(r.buf) == 0 {
var ok bool
r.buf, ok = <-r.c
if !ok {
// Return eof on channel closed
return 0, io.EOF
}
}
n := copy(p, r.buf)
r.buf = r.buf[n:]
return n, nil
}
Use it like this:
r := csv.NewReader(&chanReader{c: feederChan})
for {
a, err := r.Read()
if err != nil {
// handle error, break out of loop
}
// do something with a
}
If the application assumes that newlines separate the values received from the channel, then append a newline to each value received:
...
var ok bool
r.buf, ok = <-r.c
if !ok {
// Return eof on channel closed
return 0, io.EOF
}
r.buf += "
"
...
The += " "
copies the string. If this does not meet the application's efficiency requirements, then introduce a new field to manage line separators.
type chanReader struct {
c chan string // source of lines
buf string // the current line
nl bool // true if line separator is pending
}
func (r *chanReader) Read(p []byte) (int, error) {
// Fill the buffer when we have no data to return to the caller
if len(r.buf) == 0 && !r.nl {
var ok bool
r.buf, ok = <-r.c
if !ok {
// Return eof on channel closed
return 0, io.EOF
}
r.nl = true
}
// Return data if we have it
if len(r.buf) > 0 {
n := copy(p, r.buf)
r.buf = r.buf[n:]
return n, nil
}
// No data, return the line separator
n := copy(p, "
")
r.nl = n == 0
return n, nil
}
Another approach is to use an io.Pipe and goroutine to convert the channel to a io.Reader as suggested in a comment to the question. A first pass at this approach is:
var nl = []byte("
")
func createChanReader(c chan string) io.Reader {
r, w := io.Pipe()
go func() {
defer w.Close()
for s := range c {
io.WriteString(w, s)
w.Write(nl)
}
}
}()
return r
}
Use it like this:
r := csv.NewReader(createChanReader(feederChan))
for {
a, err := r.Read()
if err != nil {
// handle error, break out of loop
}
// do something with a
}
This first pass at the io.Pipe solution leaks a goroutine when the application exits the loop before reading the pipe to EOF. The application might break out early because the CSV reader detected a syntax error, the application panicked because of a programmer error, or any number of other reasons.
To fix the goroutine leak, exit the writing goroutine on write error and close the pipe reader when done reading.
var nl = []byte("
")
func createChanReader(c chan string) *io.PipeReader {
r, w := io.Pipe()
go func() {
defer w.Close()
for s := range c {
if _, err := io.WriteString(w, s); err != nil {
return
}
if _, err := w.Write(nl); err != nil {
return
}
}
}()
return r
}
Use it like this:
cr := createChanReader(feederChan)
defer cr.Close() // Required for goroutine cleanup
r := csv.NewReader(cr)
for {
a, err := r.Read()
if err != nil {
// handle error, break out of loop
}
// do something with a
}
Even though "ThunderCat's" answer was really useful and appreciated, I ended up using io.Pipe() "as mh-cbon mentioned" which is much simpler and looks like more efficient (explained below):
rp, wp := io.Pipe()
go func() {
defer wp.Close()
for i := range feederChan {
fmt.Fprintln(wp, i)
}
}()
r := csv.NewReader(rp)
for { // keep reading
a, err := r.Read()
if err == io.EOF {
break
}
// do stuff with 'a'
// ...
}
The io.Pipe() is synchronous, and should be fairly efficient: it pipes data from writer to a reader; I fed the csv.NewReader() the reader part, and created a goroutine that drains the chan writing to the writer part.
Thanks a lot.
EDIT: ThunderCat added the io.Pipe approach to his answer (after I posted this I guess) ... his answer is much more comprehensive and was accepted as such.