HTTP分块流式传输到WebSocket

I want to listen to multiple HTTP streams that are transfer encoded responses, then fetch the messages from them line by line and then push the messages to one channel. I want to then read from the channel and push through a websocket later.

func subscribe(ws chan<- string, group string) (scanner *bufio.Scanner, err error){
    res, _ := req(STREAM_URL, channelTemplate(group))
    reader := bufio.NewScanner(res.Body)
    return reader, reader.Err()
}

func main() {
    ws := make(chan string)
    request, _ := http.NewRequest("GET", URL, nil)
    request.Header.Add("Content-Type", "application/json")
    client := &http.Client{}
    resp, _ := client.Do(request)
    ac := ACResponse{}
    json.NewDecoder(resp.Body).Decode(&ac)
    resp.Body.Close()
    var scanners = make([]*bufio.Scanner, 0)
    for _, group := range ac.Groups {
        fmt.Println("Started worker for", group)
        //listen to all stream URLs
        scanner, err := subscribe(ws, group)
        if err != nil {
            panic(err)
        }
        // keep track of Scanner to read later
        scanners = append(scanners, scanner)
    }
    for {
        select {
        case msg := <-ws:
            fmt.Println("[events] ", msg)
        default:
            randScanner := rand.Intn(len(ac.Groups)-1)
            fmt.Println("Reading from", randScanner)
            reader := scanners[randScanner]
            reader.Scan()
            if err := reader.Err(); err != nil {
                panic(err)
            }
            text := reader.Text()
            ws <- text
        }
    }
}

The program is blocking at reader.Scan(). The output is Reading from 1 and nothing else. I looked at wireshark, and the messages are coming through.

How can I design this problem better with Go?

Main blocks sending to the unbuffered channel ws. To fix this issue, change ws to a buffered channel:

ws := make(chan string, 1)

A second issue is that main() continues to read scanners after they reach EOF. The problem is on these lines:

        reader.Scan()
        if err := reader.Err(); err != nil {
            panic(err)
        }
        text := reader.Text()

Scan() returns false at EOF, but the return from scan is ignored. Err() returns nil on EOF. Modify the application to use the return value from Scan().

Yet another issue is that main blocks on read of any one scanner. To avoid blocking on a single connection, start a goroutine to read each connection:

func subscribe(wg *sync.WaitGroup, ws chan<- string, group string) {
    defer wg.Done()
    res, err := req(STREAM_URL, channelTemplate(group))
    if err ! nil {
         // handle error
    }
    defer resp.Body.Close()
    reader := bufio.NewScanner(res.Body)
    for reader.Scan() {
       ws <- reader.Text()
    }
    if err := reader.Err(); err != nil {
        // handle error
    }
}

func main() {
    ws := make(chan string)
    request, _ := http.NewRequest("GET", URL, nil)
    request.Header.Add("Content-Type", "application/json")
    resp, err := http.DefaultClient.Do(request)
    if err != nil {
         // handle error
    }
    var ac ACResponse
    if err := json.NewDecoder(resp.Body).Decode(&ac); err != nil {
       // handle error
    }
    resp.Body.Close()
    var wg sync.WaitGroup
    for _, group := range ac.Groups {
         wg.Add(1)
         go subscribe(&wg, ws, group)
    }

    go func() {
       wg.Wait()
       close(ws)
    }()

    for msg := range ws {
        fmt.Println("[events] ", msg)
    }
}

The above code is uncompiled and untested. I've marked where error handling is required. I wrote the code to exit main after all connections reach EOF. That may or may not be what you want in your application.