When a user hits a route, I make a NATS request, and wait for the reply:
ctx := r.Context()
reply, err := natsConnection.RequestWithContext(ctx, "Subject", payload)
The subscriber will do a task that will be resources intensive:
natsConnection.Subscribe("Subject", func(m *nats.Msg) {
//do consuming task and cancel if the request was cancelled.
natsConnection.Publish(m.Reply, []byte("Reply"))
})
How can I tell the subscriber to stop working if the request was cancelled.
I thought creating a context
for each message recived by the worker then give the sender a new inbox
to wait for a reply in it, or send a cancel notice to it:
contextMap := map[string]func(){}
natsConnection.Subscribe("Subject", func(m *nats.Msg) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
//create new inbox
inbox := nats.NewInbox()
//subscribe to the new inbox and cancel and clean up the current task is something is recived
newSub, err := natsConnection.Subscribe(inbox, func(m *nats.Msg) {
contextMap[inbox]()
})
contextMap[inbox] = func() {
fmt.Println("recived")
cancel()
delete(contextMap, inbox)
newSub.Unsubscribe()
}
//tell the requester to wait for reply in this inbox
natsConnection.Publish(m.Reply, []byte(inbox))
//do the work and send the reply when finished
tick(ctx)
natsConnection.Publish(inbox, []byte("done"))
})
On the requester end, I send the first request and wait in the specifed inbox
for the reply or the context
to be canceled in which case I send a message to the inbox
letting the reciver know:
ctx := r.Context()
inbox, _ := natsConnection.RequestWithContext(ctx, "Subject", []byte("Payload"))
reply := make(chan *nats.Msg, 1)
natsConnection.ChanSubscribe(string(inbox.Data), reply)
select {
case <-ctx.Done():
fmt.Println("Canceled")
natsConnection.Publish(string(inbox.Data), []byte(""))
return
case r := <-reply:
fmt.Println(string(r.Data))
}
tick
is just a function that tick:
func tick(ctx context.Context) {
ticker := time.NewTicker(time.Millisecond * 500)
for {
select {
case <-ctx.Done():
ticker.Stop()
fmt.Println("context was canceled", ctx.Err())
return
case <-ticker.C:
fmt.Println("Tick")
}
}
}
Right now this works, but i was wondering if there is simpler way to do it, or if not how can i make this code better?
If the response is intensive and takes multiple seconds to produce, then yes you can send a separate message that kills the request. Most architectures however let the responder continue and throw away the result.
What are you trying to save from cancelling the request? The work on the request or the transmission of a reply that you know will not be used?