I'm a beginner gopher, and I wrote an event listener worker queue for a project I'm working on.
I've deployed it on a staging server. After around 100 events have been triggered the listeners stop getting called when events are published. The server hasn't crashed either.
Here is my implementation:
// Event struct
type Event struct {
Name string
Data interface{}
}
// Stream to publish events to
var stream = make(chan *Event, 100)
// Publish sends new event data to the stream by the event name
func Publish(name string, data interface{}) {
ev := &Event{name, data}
stream <- ev
}
// Handler provides the interface for all event handlers.
// The Work will be called with the event that it should process
type Handler interface {
Work(*Event)
}
type worker struct {
Handler
Listen chan *Event
Quit chan bool
}
// Stop shuts down the worker
func (w *worker) Stop() {
go func() {
w.Quit <- true
}()
}
// Queue of worker Listen channels
type workerQueue chan chan *Event
// registry of workers
var registry = make(map[string][]workerQueue)
// Register creates 20 workers, assigns them to a queue, and
// appends the resulting worker queue to an event on the handler registry
func Register(name string, handlers ...Handler) {
if _, ok := registry[name]; !ok {
registry[name] = make([]workerQueue, 0)
}
// Create workerQueues for each handler
for _, h := range handlers {
queue := make(workerQueue, numListeners)
// Create 20 workers
for i := 0; i < 20; i++ {
newWorker := worker{
Handler: h,
Listen: make(chan *Event),
Quit: make(chan bool),
}
go func() {
for {
select {
case ev := <-newWorker.Listen:
nl.Work(ev)
case <-newWorker.Quit:
return
}
}
}()
queue <- newWorker.Listen
}
registry[name] = append(registry[name], queue)
}
}
// Start begins listening for events on stream
func Start() {
go func() {
for {
select {
// listen for events
case ev := <-stream:
go func() {
// get registered queues for the event
queues, ok := registry[ev.Name]
if !ok {
return
}
// Get worker channel from queue and send the event
for _, queue := range queues {
worker := <-queue
worker <- ev
}
}()
}
}
}()
}
Here is an example usage.
// Usage
Start()
type demoHandler struct {
db *sql.DB
}
type eventData struct {}
func (h *demoHandler) Work(ev *Event) {
// Do something
return
}
// Register handler
Register('some-event', &demoHandler{r})
Publish('some-event', &eventData{})
I'm passing a pointer to a demoHandler as the event handler because they need access to the underlying sql instance. Is it a problem that each worker queue uses the same demoHandler?
I can't for the life of me figure out where I went wrong! Short of an error in the handler code, is there a mistake in my code which causes all of my workers to go down?
"In a go worker/event system, should workers access the same struct (via pointer) to do work?" No, it's not a problem. It would be a problem if the code inside your handler access a critical section, but I think that's not causing your program to block.
Your server doesn't crash or block either because no panic is being triggered, and your program is listening and executing on separate goroutines, which are lightweight threads of execution.
It probably has to be with the channels you are using to send and receive events.
Sends and receives to a channel are blocking by default. This means that when you send or receive from a channel it will block its goroutine until the other side is ready.
In the case of buffered channels, sends block when the buffer is full, and receives block when the buffer is empty, as in your stream channel:
var stream = make(chan *Event, 100)
You said: "After around 100 events have been triggered the listeners stop getting called when events are published".
So if you call the Publish function and do stream <- ev
when the "stream" channel buffer is full, it will block until the channel has place to receive another element.
I'd suggest reading a bit about non-blocking channel operations.
Maybe the block is occurring in some part of your real usage code.