I'm working on using Postgres logical replication in Go using the pgx library to get database changes via a logical replication slot using wal2json as the output plugin. The Postgres version being used is v10.1
.
When in the WaitForReplicationMessage
loop, I receive a message, and if that message is a ServerHeartbeat
, I send a standby status message to update the server of my consumed position in the WAL. This StandbyStatus
message has a field called ReplyRequested
which, if equals 1
tells the server to send a ServerHeartbeat
; and if the value is 0
it's not supposed to do anything.
Now I'm sending a StandbyStatus
message with the ReplyRequested
value to 0
(which is the default value when the object is created). On sending this, the server sends a heartbeat message despite me telling it not to. I'm unable to see the cause of this issue.
Here's my code:
for {
log.Info("Waiting for message")
message, err := session.ReplConn.WaitForReplicationMessage(context.TODO())
if err != nil {
log.WithError(err).Errorf("%s", reflect.TypeOf(err))
continue
}
if message.WalMessage != nil {
log.Info(string(message.WalMessage.WalData))
} else if message.ServerHeartbeat != nil {
log.Info("Heartbeat requested")
// set the flushed LSN (and other LSN values) in the standby status and send to PG
log.Info(message.ServerHeartbeat)
// send Standby Status with the LSN position
err = session.sendStandbyStatus()
if err != nil {
log.WithError(err).Error("Unable to send standby status")
}
}
}
The sendStandbyStatus
function above is:
func (session *Session) sendStandbyStatus() error {
standbyStatus, err := pgx.NewStandbyStatus(session.RestartLSN)
if err != nil {
return err
}
log.Info(standbyStatus) // the output of this confirms ReplyRequested is indeed 0
standbyStatus.ReplyRequested = 0 // still set it
err = session.ReplConn.SendStandbyStatus(standbyStatus)
if err != nil {
return err
}
return nil
}