So I have a channel used for event processing, the main server goroutine select on this channel and call event handlers on each of the event received:
evtCh := make(chan Event)
// server loop:
for !quit {
select {
case e := <- evtCh:
handleEvent(e)
break
case quit := <-quitCh:
//finish
}
// for send a new event to processing
func addEvent(e Event) {
evtCh <- e
}
handleEvent
will call registered handlers on the event type. I have func registerEventHandler(typ EventType, func(Event))
to handle the register. This program will supports user to write extensions, that means they can register their own handlers to handle events.
Now the problem arise when in user's event handler, they might send new event to the server by calling addEvent
, this will cause the server to hang since the event handler itself is called in the context of the server's main loop(in the for loop).
How can I handle this situation elegantly? Is a queue modeled by slice a good idea?
this will cause the server to hang since the event handler itself is called in the context of the server's main loop
The main loop should never block on calling handleEvent
and the most common way to avoid that is to use a pool of worker goroutines. Here's a quick untested example:
type Worker struct {
id int
ch chan Event
quit chan bool
}
func (w *Worker) start {
for {
select {
case e := <- w.ch:
fmt.Printf("Worker %d called
", w.id)
//handle event
break;
case <- w.quit:
return
}
}
}
ch := make(chan Event, 100)
quit := make(chan bool, 0)
// Start workers
for i:=0; i<10; i++{
worker := &Worker{i,ch,quit}
go worker.start()
}
//
func addEvent (e Event) {
ch <- e
}
and when you are done, just close(quit)
to kill all workers.
EDIT: From the comments below:
what is the main loop looks like in this case?
Depends. If you have a fixed number of events, you can use a WaitGroup, like this:
type Worker struct {
id int
ch chan Event
quit chan bool
wg *sync.WaitGroup
}
func (w *Worker) start {
for {
select {
case e := <- w.ch:
//handle event
wg.Done()
break;
case <- w.quit:
return
}
}
}
func main() {
ch := make(chan Event, 100)
quit := make(chan bool, 0)
numberOfEvents := 100
wg := &sync.WaitGroup{}
wg.Add(numberOfEvents)
// start workers
for i:=0; i<10; i++{
worker := &Worker{i,ch,quit,wg}
go worker.start()
}
wg.Wait() // Blocks until all events are handled
}
If the number of events is not known beforehand, you can just block on the quit channel:
<- quit
and once another goroutine closes the channel, your program will terminate as well.
To make things more async you can either
add capacity to event channel
evtCh := make(chan Event, 10)
call handleEvent(e) asynchronously
go handleEvent(e)
add event asynchronously in handlers
go addEvent(e)
Or if you want events to be handled in deterministic order you can call handleEvent(e)
in handlers directly instead of addEvent(e)