如果接收方可以正常工作,为什么我有“未送达的邮件”?

stackdriver undelivered messages

I restarted my Windows VM but this did not help. On the next day I restarted my main.go and right after that I saw that old stuck messages began to come.

My Subscription Type is Pull, my Acknowledgement Deadline is maximum: 600 seconds.

Background: I want to use Pubsub as a load balancer in my managed group of Windows instances (I need Windows API for that task). Message processing is CPU intensive (with a few HTTP calls) and can take from a few seconds to a few minutes.

Some other metrics from Stackdriver: stackdriver pubsubstackdriver oldest unacknowledged messagestackdriver backlog size

I have no idea what I can check. Day ago I did a highload testing and looks like everything was fine (Undelivered Messages was zero as we see on the first screenshot above). Now my CPU consumption is zero, managed group decreased to one instance (this is not in a production environment). I try to use Pubsub for the first time. Code of my main() that synthesize audio from text chunks, encode to two formats and upload to S3:

func main() {
    fmt.Printf("Current time: %v
",
        time.Now().Format(timeFormat),
    )

    // https://godoc.org/cloud.google.com/go/pubsub#Subscription.Receive
    err := subscription.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
        timeStart := time.Now()
        if timeStartFirstMessage == nil {
            timeStartFirstMessage = &timeStart
        }

        var data pubsubMessage
        json.Unmarshal(m.Data, &data)
        fmt.Printf("Got message: %#v
", data)

        var wg sync.WaitGroup
        wg.Add(len(data.TextChunks))

        wavs := make([][]byte, len(data.TextChunks))
        for i, chunk := range data.TextChunks {
            /** TODO without 'go' (sequential) will process in correct order:
             * user can listen first and seconds chunks faster,
             * but m4a will be later,
             * but on "high load" of the VM with ~2x more messages in parallel
             * (each message on it is own goroutine)
             * performance may be even better (I think) because
             * hundreds of CPU intensive goroutine is worse.
             *
             * Also in sequential code I can append() to wav[]
             * instead of doind it later in another func,
             * maybe this will also improve perfomance (profile).
            **/
            go func(i int, chunk string) {
                fmt.Printf("Index of text chunk: %d
", i)
                wav := tts(data.Voice, chunk)
                streamOfOggEncoder := encodeOggVorbis(wav)
                uploadToS3(
                    "intelligentspeaker",
                    data.Sub+"/"+data.HashArticle+"/"+fmt.Sprint(i),
                    "audio/ogg",
                    streamOfOggEncoder,
                )
                wavs[i] = wav

                wg.Done()
            }(i, chunk)
        }
        wg.Wait()
        wavConcated := concat(wavs)

        filename := buildPodcastEpisodeFilename(data.Title)
        err := encodePodcastEpisode(wavConcated, filename)
        if err != nil {
            m.Nack()
            return
        }

        if err != nil {
            logger.Log(logging.Entry{Payload: fmt.Sprintf("ERROR on m4a deleting: %v", err)})
        }

        key := data.Sub + "/" + data.HashArticle + "/" + random() + "/" + filename
        readCloser, size := getReadCloserAndSize(filename)
        if readCloser == nil {
            m.Nack()
            return
        }
        uploadToS3("intelligentspeaker--podcasts", key, "audio/x-m4a", readCloser)
        // Next message may be with the same title (filename)
        err = os.Remove(filename)

        fmt.Printf("Duration: %v
", duration(wavConcated))
        updatePodcastXML(
            key,
            data.Title,
            data.URLArticle,
            data.FavIconURL,
            duration(wavConcated),
            data.Utc,
            size,
        )

        fmt.Printf("DONE pubsub message, current time: %v
", time.Now().Format(timeFormat))
        fmt.Printf("Time of message processing: %v
", time.Since(timeStart).Round(time.Second))
        fmt.Printf(
            "Time of all messages processing (counted from the first message, not from the start of this app): %v
",
            time.Since(*timeStartFirstMessage).Round(time.Second),
        )

        m.Ack()

    })
    if err != nil {
        logger.Log(logging.Entry{Payload: fmt.Sprintf("ERROR on registering receiver: %v", err)})
    }

}

Update: found similar question.

I'm assuming that the metric that is concerning is pubsub.googleapis.com/subscription/num_undelivered_messages which measures the "Number of unacknowledged messages (a.k.a. backlog messages) in a subscription." From the "Acknowledge Requests" graph, we see that your subscribers stopped acknowledging message at around 9pm on the 28th. That's the same time when the "Oldest Unacknowledged Message" age started growing (linearly). Meanwhile, the "Backlog Size" graph is pretty flat. This means that there is a small number of "problematic" messages in the subscription's backlog that the subscribers are not acknowledging. Most likely, the subscribers are having trouble processing this small set of messages for some reason. Perhaps these messages are malformed in some way, or do not conform some expectation of the subscriber code.

One thing you can do to try to debug this is to use the gcloud command-line tool to "peek" into the messages in the subscription's backlog: https://cloud.google.com/sdk/gcloud/reference/pubsub/subscriptions/pull. Note that gcloud tool will not acknowledge any messages if you do not set the --auto-ack flag.