并发模式帮助-扇入并返回结果?

I am writing up a quick concurrent integration testing package. I've already written the POC and now I'm trying to come up with a new pattern for it. I wish to adhere to the following rules for it:

  • A test suite may have many tests
  • A test suite must have n workers for running TestPreppers
  • A test suite must have n workers for running TestValidators
  • A test must pass prep before running validation
  • A test may have many children
  • A test must pass validation before running children tests

Here is the structure:

package conctest

func New() *TestSuite {
    return &TestSuite{nil, 1, 1, make(chan TestPrepper), make(chan TestValidator)}
}

type TestSuite struct {
    Tests                []*Test
    ConcurrentPreppers   int
    ConcurrentValidators int

    prepperChan   chan TestPrepper
    validatorChan chan TestValidator
}

type TestPrepper func() error
type TestValidator func() ValidatorResult

type ValidatorResult struct {
    Pass  bool
    Error error
}

type Test struct {
    Convey    string
    Details   string
    Prepper   TestPrepper
    Validator TestValidator
    MaxRuns   int
    Children  []*Test

    runs   int
    errors []error
}

I'm having trouble coming up with a concurrency design that meets the requirements. I need to expose a method from TestSuite that is available to Test that will allow it to send its work to the TestSuites workers as well as return the result back to the Test.

Here is the solution I came up with. I am welcome to any criticism or better approach and will accept that answer. I created a private transport struct which contains my function and a channel in which to return the result on:

package conctest

import (
    "sync"
    "time"
)

func New() *ConcTest {
    return &ConcTest{nil, 1, 1, make(chan *prepperTransport), make(chan *validatorTransport), nil}
}

type ConcTest struct {
    Tests                []*Test
    ConcurrentPreppers   int
    ConcurrentValidators int

    prepperChan   chan *prepperTransport
    validatorChan chan *validatorTransport

    testSync *sync.WaitGroup
}

func (ct *ConcTest) Run() {
    // start up prepper workers
    for i := 0; i < ct.ConcurrentPreppers; i++ {
        go func() {
            for p := range ct.prepperChan {
                time.Sleep(time.Second)
                p.Result <- p.Prepper()
            }
        }()
    }

    // start up validator workers
    for i := 0; i < ct.ConcurrentValidators; i++ {
        go func() {
            for v := range ct.validatorChan {
                time.Sleep(time.Second)
                v.Result <- v.Validator()
            }
        }()
    }

    // start parent tests, child tests will be called recursively
    ct.testSync = &sync.WaitGroup{}
    for _, t := range ct.Tests {
        ct.testSync.Add(1)
        go ct.runTest(t)
    }

    // wait for all tests to complete
    ct.testSync.Wait()
}

func (ct *ConcTest) runTest(t *Test) {
    // test is a pass until failure encountered
    t.Pass = true

    // run and wait for prep to finish
    pt := &prepperTransport{t.Prepper, make(chan PrepperResult)}
    ct.prepperChan <- pt
    pr := <-pt.Result

    // return on prep failure
    if pr != nil {
        t.Pass = false
        t.Errors = append(t.Errors, pr)
        ct.testSync.Done()
        return
    }

    // run the validator until pass or max runs reached
    for {
        // sleep for given frequency
        time.Sleep(t.Frequency)

        // send the validator to the queue
        t.Runs++
        vt := &validatorTransport{t.Validator, make(chan ValidatorResult)}
        ct.validatorChan <- vt

        // wait for validator response
        vr := <-vt.Result

        // append error to the test
        if vr.Error != nil {
            t.Errors = append(t.Errors, vr.Error)
        }

        // break on pass
        if vr.Pass {
            break
        }

        // break on max attempts
        if t.MaxRuns == t.Runs {
            t.Pass = false
            break
        }
    }

    // break on validator failure
    if !t.Pass {
        ct.testSync.Done()
        return
    }

    // run all children tests
    for _, c := range t.Children {
        ct.testSync.Add(1)
        go ct.runTest(c)
    }

    ct.testSync.Done()
    return
}

type Prepper func() PrepperResult
type PrepperResult error
type prepperTransport struct {
    Prepper Prepper
    Result  chan PrepperResult
}

type Validator func() ValidatorResult
type ValidatorResult struct {
    Pass  bool
    Error error
}
type validatorTransport struct {
    Validator Validator
    Result    chan ValidatorResult
}

type Test struct {
    Convey    string
    Details   string
    Frequency time.Duration
    MaxRuns   int

    Prepper   Prepper
    Validator Validator
    Children  []*Test

    Runs   int
    Errors []error
    Pass   bool
}