I am implementing a server to stream many arrays of floats. I need some help to design my system to achieve the following:
Any reply is appreciated. This is my current thoughts:
package main
import (
"fmt"
"io"
"net/http"
"strconv"
"time"
)
func main() {
c := AudioProcess()
handleHello := makeHello(c)
http.HandleFunc("/", handleHello)
http.ListenAndServe(":8000", nil)
}
func makeHello(c chan string) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
for item := range c { // this loop runs when channel c is closed
io.WriteString(w, item)
}
}
}
func AudioProcess() chan string {
c := make(chan string)
go func() {
for i := 0; i <= 10; i++ { // Iterate the audio file
c <- strconv.Itoa(i) // have my frame of samples, send to channel c
time.Sleep(time.Second)
fmt.Println("send ", i) // logging
}
close(c) // done processing, close channel c
}()
return c
}
I'm not entirely sure if this addresses your problem as I'm not fully aware of your use case, but nevertheless, I've come up with a solution below.
I've used Gin for the HTTP router because it was more comfortable to me, but I'm pretty sure you can adapt the code to fit yours. I did this in a hurry (sorry), so there may be problems I'm not aware of, but do let me know if there are any.
In short:
Manager
that takes care of several Client
. It also contains a sync.Mutex
to ensure only one thread is modifying the clients
at any given time;InitBackgroundTask()
that will generate a random float64
number, and pass it to ALL clients
in a Manager
(if there are any). If there aren't any clients
, we just sleep, and carry on...<-c.Writer.CloseNotify()
channel (because the method returns thereby calling the defer
). We can also receive the random float64
number in the next background task tick. Finally, we can also terminate if we have not received anything in 20s.I made several assumptions about your needs here (e.g. that the background task will return X every Y minutes). If you are looking for more fine grain streaming, I'd recommend using websockets instead (and the pattern below can still be used).
Let me know if you have any questions.
Code:
package main
import (
"github.com/gin-gonic/gin"
"github.com/satori/go.uuid"
"log"
"math/rand"
"net/http"
"sync"
"time"
)
type Client struct {
uuid string
out chan float64
}
type Manager struct {
clients map[string]*Client
mutex sync.Mutex
}
func NewManager() *Manager {
m := new(Manager)
m.clients = make(map[string]*Client)
return m
}
func (m *Manager) AddClient(c *Client) {
m.mutex.Lock()
defer m.mutex.Unlock()
log.Printf("add client: %s
", c.uuid)
m.clients[c.uuid] = c
}
func (m *Manager) DeleteClient(id string) {
m.mutex.Lock()
defer m.mutex.Unlock()
// log.Println("delete client: %s", c.uuid)
delete(m.clients, id)
}
func (m *Manager) InitBackgroundTask() {
for {
f64 := rand.Float64()
log.Printf("active clients: %d
", len(m.clients))
for _, c := range m.clients {
c.out <- f64
}
log.Printf("sent output (%+v), sleeping for 10s...
", f64)
time.Sleep(time.Second * 10)
}
}
func main() {
r := gin.Default()
m := NewManager()
go m.InitBackgroundTask()
r.GET("/", func(c *gin.Context) {
cl := new(Client)
cl.uuid = uuid.NewV4().String()
cl.out = make(chan float64)
defer m.DeleteClient(cl.uuid)
m.AddClient(cl)
select {
case <-c.Writer.CloseNotify():
log.Printf("%s : disconnected
", cl.uuid)
case out := <-cl.out:
log.Printf("%s : received %+v
", out)
c.JSON(http.StatusOK, gin.H{
"output": out,
})
case <-time.After(time.Second * 20):
log.Println("timed out")
}
})
r.Run()
}
Note: if you're testing this on Chrome, you might have to append a random parameter at the end of the URL so that the request will actually be made, e.g. ?rand=001
, ?rand=002
and so on.