在Go中以跨平台方式并发读取/关闭

I recently realized that I don't know how to properly Read and Close in Go concurrently. In my particular case, I need to do that with a serial port, but the problem is more generic.

If we do that without any extra effort to synchronize things, it leads to a race condition. Simple example:

package main

import (
    "fmt"
    "os"
    "time"
)

func main() {
    f, err := os.Open("/dev/ttyUSB0")
    if err != nil {
        panic(err)
    }

    // Start a goroutine which keeps reading from a serial port
    go reader(f)

    time.Sleep(1000 * time.Millisecond)
    fmt.Println("closing")
    f.Close()
    time.Sleep(1000 * time.Millisecond)
}

func reader(f *os.File) {
    b := make([]byte, 100)
    for {
        f.Read(b)
    }
}

If we save the above as main.go, and run go run --race main.go, the output will look as follows:

closing
==================
WARNING: DATA RACE
Write at 0x00c4200143c0 by main goroutine:
  os.(*file).close()
      /usr/local/go/src/os/file_unix.go:143 +0x124
  os.(*File).Close()
      /usr/local/go/src/os/file_unix.go:132 +0x55
  main.main()
      /home/dimon/mydata/projects/go/src/dmitryfrank.com/testfiles/main.go:20 +0x13f

Previous read at 0x00c4200143c0 by goroutine 6:
  os.(*File).read()
      /usr/local/go/src/os/file_unix.go:228 +0x50
  os.(*File).Read()
      /usr/local/go/src/os/file.go:101 +0x6f
  main.reader()
      /home/dimon/mydata/projects/go/src/dmitryfrank.com/testfiles/main.go:27 +0x8b

Goroutine 6 (running) created at:
  main.main()
      /home/dimon/mydata/projects/go/src/dmitryfrank.com/testfiles/main.go:16 +0x81
==================
Found 1 data race(s)
exit status 66

Ok, but how to handle that properly? Of course, we can't just lock some mutex before calling f.Read(), because the mutex will end up locked basically all the time. To make it work properly, we'd need some sort of cooperation between reading and locking, like conditional variables do: the mutex gets unlocked before putting the goroutine to wait, and it's locked back when the goroutine wakes up.

I would implement something like this manually, but then I need some way to select things while reading. Like this: (pseudocode)

select {
case b := <-f.NextByte():
  // process the byte somehow
default:
}

I examined docs of the packages os and sync, and so far I don't see any way to do that.

I belive you need 2 signals:

  1. main -> reader, to tell it to stop reading
  2. reader -> main, to tell that reader has been terminated

of course you can select go signaling primitive (channel, waitgroup, context etc) that you prefer.

Example below, I use waitgroup and context. The reason is that you can spin multiple reader and only need to close the context to tell all the reader go-routine to stop.

I created multiple go routine just as an example that you can even coordinate multiple go routine with it.

package main

import (
    "context"
    "fmt"
    "os"
    "sync"
    "time"
)

func main() {

    ctx, cancelFn := context.WithCancel(context.Background())

    f, err := os.Open("/dev/ttyUSB0")
    if err != nil {
        panic(err)
    }

    var wg sync.WaitGroup
    for i := 0; i < 3; i++ {
        wg.Add(1)

        // Start a goroutine which keeps reading from a serial port
        go func(i int) {
            defer wg.Done()
            reader(ctx, f)
            fmt.Printf("reader %d closed
", i)
        }(i)
    }

    time.Sleep(1000 * time.Millisecond)
    fmt.Println("closing")
    cancelFn() // signal all reader to stop
    wg.Wait()  // wait until all reader finished
    f.Close()
    fmt.Println("file closed")
    time.Sleep(1000 * time.Millisecond)
}

func reader(ctx context.Context, f *os.File) {
    b := make([]byte, 100)
    for {
        select {
        case <-ctx.Done():
            return
        default:
            f.Read(b)
        }
    }
}