如何有效测试管道和过滤器图案

I am using the pipes and filters pattern as described in this blog post.

I am wondering how to test this effectively. My idea was to just test each filter independently. For example I have a filter that looks like this

func watchTemperature(ctx context.Context, inStream <-chan int) {
    maxTemp = 90

    go func() {
        for {
            select {
            case <-ctx.Done():
                return
            case temp := <-inStream:
                if temp > maxTemp{
                    log.Print("Temperature too high!")
                }
            }
        }
    }()
}

In my test for now I only want to see if the log has been printed. My test looks as follows.

func TestWatchTemperature(t *testing.T) {
    maxTemp = 90

    ctx := context.Background()
    inStream := make(chan int)
    defer close(inStream)
    watchTemperature(ctx, inStream)

    var buf bytes.Buffer
    log.SetOutput(&buf)

    inStream<-maxTemp+1

    logMsg := buf.String()
    assert.True(t,  strings.Contains(logMsg, "Temperature too high!"), 
        "Expected log message not found")
}

As this filter is the end of my pipeline, I do not have an out channel I can read from to determine if this goroutine/filter has already done something.

The only thing I found online so far was, to just wait for a few seconds after writing to the inStream in my test and then check the log. However this seems like a really poor choice, as it simple introduces a race condition and slows down the test.

What is the best way to test something like this or is there simply no good way to test it with this design of my filter and I always need an outStream?

Not always a worker goroutine has a result to deliver. But, if you want to know exactly when it is done, you need to synchronize it with your main goroutine using one of the concurrency primitives. It could be a signaling channel, or a wait group.

Here's an example:

package main

import (
    "bytes"
    "context"
    "fmt"
    "log"
    "strings"
)

const (
    maxTemp = 90
    errMsg  = "Temperature too high!"
)

func watchTemperature(ctx context.Context, inStream <-chan int, finished chan<- bool) {
    go func() {
        defer func() {
            finished <- true
        }()
        for {
            select {
            case <-ctx.Done():
                return
            case temp := <-inStream:
                if temp > maxTemp {
                    log.Print(errMsg)
                }
            }
        }
    }()
}

func main() {
    // quit signals to stop the work
    ctx, quit := context.WithCancel(context.Background())
    var buf bytes.Buffer
    // Make sure, this is called before launching the goroutine!
    log.SetOutput(&buf)
    inStream := make(chan int)
    finished := make(chan bool)
    // pass the callback channel to the goroutine
    watchTemperature(ctx, inStream, finished)

    // asynchronously to prevent a deadlock
    go func() {
        inStream <- maxTemp + 1
        quit()
    }()
    // Block until the goroutine returns.
    <-finished

    if !strings.Contains(buf.String(), errMsg) {
        panic("Expected log message not found")
    }

    fmt.Println("Pass!")
}

I think you should change your structure a bit. First, testing that a function prints something or not don't seems good to me at all. Logs shouldn't be a part of your business logic. They are just add-ons to make debugging and tracing easier. Second you are starting a goroutine that doesn't provide any output (except logs) so you couldn't control when it has finished its job.

An alternative solution:

Declare a channel to get outputs from your function and preferably pass it to your function. I used a string channel to be simple as possible.

var outStream = make(chan string)
watchTemperature(ctx, inStream, outStream)

Instead of normal log facilities, log to this channel and for each input token you should create an output token:

if temp > maxTemp {
    outStream <- "Temperature too high!"
} else {
    outStream <- "Normal"
}

And in your test after each send you wait for an output:

inStream <- maxTemp + 1
reply <- outStream
if reply != "Temperature too high!" {
    // test failed
}