去lang关闭管道死锁

I'm working on a data import job using the Go language, I want to write each step as a closure, and use channels for communication, that is, each step is concurrent. The problem can be defined by the following structure.

  1. Get Widgets from data source
    1. Add translations from source 1 to Widgets.
    2. Add translations from source 2 to Widgets.
    3. Add pricing from source 1 to Widgets.
    4. Add WidgetRevisions to Widgets.
      1. Add translations from source 1 to WidgetRevisions
      2. Add translations from source 2 to WidgetRevisions

For the purposes of this question, I'm only dealing with the first three steps which must be taken on a new Widget. I assume on that basis that step four could be implemented as a pipeline step, which in itself is implemented in terms of a sub-three-step pipeline to control the *WidgetRevision*s

To that end I've been writing a small bit of code to give me the following API:

// A Pipeline is just a list of closures, and a smart 
// function to set them all off, keeping channels of
// communication between them.
p, e, d := NewPipeline()

// Add the three steps of the process
p.Add(whizWidgets)
p.Add(popWidgets)
p.Add(bangWidgets)

// Start putting things on the channel, kick off
// the pipeline, and drain the output channel
// (probably to disk, or a database somewhere)
go emit(e)
p.Execute()
drain(d)

I've implemented it already ( code at Gist or at the Go Playground) but it's deadlocking with a 100% success failure rate

The deadlock comes when calling p.Execute(), because presumably one of the channels is ending up with nothing to do, nothing being sent on any of them, and no work to do...

Adding a couple of lines of debug output to emit() and drain(), I see the following output, I believe the pipelining between the closure calls is correct, and I'm seeing some Widgets being omitted.

Emitting A Widget
Input Will Be Emitted On 0x420fdc80
Emitting A Widget
Emitting A Widget
Emitting A Widget
Output Will Drain From 0x420fdcd0
Pipeline reading from 0x420fdc80 writing to 0x420fdd20
Pipeline reading from 0x420fdd20 writing to 0x420fddc0
Pipeline reading from 0x420fddc0 writing to 0x42157000

Here's a few things I know about this approach:

  • I believe it's not uncommon for this design to "starve" one coroutine or another, I believe that's why this is deadlocking
  • I would prefer if the pipeline had things fed into it in the first place (API would implement Pipeline.Process(*Widget)
    • If I could make that work, the drain could be a "step" which just didn't pass anything on to the next function, that might be a cleaner API
  • I know I haven't implemented any kind of rung buffers, so it's entirely possible that I'll just overload the available memory of the machine
  • I don't really believe this is good Go style... but it seems to make use of a lot of Go features, but that isn't really a benefit
  • Because of the WidgetRevisions also needing a pipeline, I'd like to make my Pipeline more generic, maybe an interface{} type is the solution, I don't know Go well enough to determine if that'd be sensible or not yet.
  • I've been advised to consider implementing a mutex to guard against race conditions, but I believe I'm save as the closures will each operate on one particular unit of the Widget struct, however I'd be happy to be educated on that topic.

In Summary: How can I fix this code, should I fix this code, and if you were a more experienced go programmer than I, how would you solve this "sequential units of work" problem?

I just don't think I would've built the abstractions that far away from the channels. Pipe explicitly.

You can pretty easily make a single function for all of the actual pipe manipulation, looking something like this:

type StageMangler func(*Widget)

func stage(f StageMangler, chi <-chan *Widget, cho chan<- *Widget) {
    for widget := range chi {
                f(widget)
                cho <- widget
    }
    close(cho)
}

Then you can pass in func(w *Widget) { w.Whiz = true} or similar to the stage builder.

Your add at that point could have a collection of these and their worker counts so a particular stage could have n workers a lot more easily.

I'm just not sure this is easier than piecing channels together directly unless you're building these pipelines at runtime.