I'm working on a data import job using the Go language, I want to write each step as a closure, and use channels for communication, that is, each step is concurrent. The problem can be defined by the following structure.
For the purposes of this question, I'm only dealing with the first three steps which must be taken on a new Widget. I assume on that basis that step four could be implemented as a pipeline step, which in itself is implemented in terms of a sub-three-step pipeline to control the *WidgetRevision*s
To that end I've been writing a small bit of code to give me the following API:
// A Pipeline is just a list of closures, and a smart
// function to set them all off, keeping channels of
// communication between them.
p, e, d := NewPipeline()
// Add the three steps of the process
p.Add(whizWidgets)
p.Add(popWidgets)
p.Add(bangWidgets)
// Start putting things on the channel, kick off
// the pipeline, and drain the output channel
// (probably to disk, or a database somewhere)
go emit(e)
p.Execute()
drain(d)
I've implemented it already ( code at Gist or at the Go Playground) but it's deadlocking with a 100% success failure rate
The deadlock comes when calling p.Execute()
, because presumably one of the channels is ending up with nothing to do, nothing being sent on any of them, and no work to do...
Adding a couple of lines of debug output to emit()
and drain()
, I see the following output, I believe the pipelining between the closure calls is correct, and I'm seeing some Widgets being omitted.
Emitting A Widget
Input Will Be Emitted On 0x420fdc80
Emitting A Widget
Emitting A Widget
Emitting A Widget
Output Will Drain From 0x420fdcd0
Pipeline reading from 0x420fdc80 writing to 0x420fdd20
Pipeline reading from 0x420fdd20 writing to 0x420fddc0
Pipeline reading from 0x420fddc0 writing to 0x42157000
Here's a few things I know about this approach:
Pipeline.Process(*Widget)
interface{}
type is the solution, I don't know Go well enough to determine if that'd be sensible or not yet.In Summary: How can I fix this code, should I fix this code, and if you were a more experienced go programmer than I, how would you solve this "sequential units of work" problem?
I just don't think I would've built the abstractions that far away from the channels. Pipe explicitly.
You can pretty easily make a single function for all of the actual pipe manipulation, looking something like this:
type StageMangler func(*Widget)
func stage(f StageMangler, chi <-chan *Widget, cho chan<- *Widget) {
for widget := range chi {
f(widget)
cho <- widget
}
close(cho)
}
Then you can pass in func(w *Widget) { w.Whiz = true}
or similar to the stage builder.
Your add
at that point could have a collection of these and their worker counts so a particular stage could have n workers a lot more easily.
I'm just not sure this is easier than piecing channels together directly unless you're building these pipelines at runtime.