Minor improvements to CDC Service

This commit is contained in:
Philip O'Toole
2025-09-10 08:17:09 -04:00
parent 65929b552b
commit 7ac40f511e

View File

@@ -29,14 +29,14 @@ const (
)
const (
numDroppedNotLeader = "dropped_not_leader"
numDroppedFailedToSend = "dropped_failed_to_send"
numRetries = "retries"
numSent = "sent_events"
numSnapshotSync = "snapshot_sync"
numFIFOIgnored = "fifo_ignored"
cdcNumFIFOWriteIgnored = "cdc_fifo_write_ignored"
cdcNumFIFOReadIgnored = "cdc_fifo_read_ignored"
numDroppedNotLeader = "dropped_not_leader"
numDroppedFailedToSend = "dropped_failed_to_send"
numRetries = "retries"
numSent = "sent_events"
numSnapshotSync = "snapshot_sync"
numFIFOIgnored = "fifo_ignored"
cdcNumBatcherWriteIgnored = "cdc_batcher_write_ignored"
cdcNumFIFOReadIgnored = "cdc_fifo_read_ignored"
)
// stats captures stats for the CDC Service.
@@ -56,7 +56,7 @@ func ResetStats() {
stats.Add(numSent, 0)
stats.Add(numSnapshotSync, 0)
stats.Add(numFIFOIgnored, 0)
stats.Add(cdcNumFIFOWriteIgnored, 0)
stats.Add(cdcNumBatcherWriteIgnored, 0)
stats.Add(cdcNumFIFOReadIgnored, 0)
}
@@ -136,7 +136,7 @@ type Service struct {
batcher *queue.Queue[*proto.CDCIndexedEventGroup]
// highWatermark is the index of the last event that was successfully sent to the webhook
// by the cluster (which is not necessarily the same thing as this node).
// by the cluster.
highWatermark atomic.Uint64
// highWatermarkInterval is the interval at which the high watermark is written to the store.
@@ -221,7 +221,7 @@ func NewService(nodeID, dir string, clstr Cluster, cfg *Config) (*Service, error
// Whatever is the first key in the FIFO is our initial high watermark. We assume
// that anything sitting in the queue has not been sent to the webhook. If that is
// not the case then an HWM update from other nodes in the cluster will correct it
// not the case then an HWM update from other nodes in the cluster may update it
// (and prune the FIFO).
higHWM, err := fifo.FirstKey()
if err != nil {
@@ -275,7 +275,8 @@ func (s *Service) Stop() {
}
// HighWatermark returns the high watermark of the CDC service. This
// is the index of the last event that was successfully sent to the webhook.
// is the index of the last event that this node is sure has been
// sent to the webhook by the cluster.
func (s *Service) HighWatermark() uint64 {
return s.highWatermark.Load()
}
@@ -410,11 +411,11 @@ func (s *Service) writeToBatcher() {
return
}
if o.Index != 0 && o.Index <= s.highWatermark.Load() {
// High watermark has advanced since we processed queueing requests.
// High watermark has advanced since we processed these CDC events.
// This could happen on followers if the Leader has advanced the HWM
// and this node hasn't even had the event generated by its underlying
// but this node hasn't even had the event generated by its underlying
// database yet.
stats.Add(cdcNumFIFOWriteIgnored, 1)
stats.Add(cdcNumBatcherWriteIgnored, 1)
continue
}
if _, err := s.batcher.WriteOne(o, nil); err != nil {
@@ -471,7 +472,7 @@ func (s *Service) leaderLoop() (chan struct{}, chan struct{}) {
if ev.Index <= s.highWatermark.Load() {
// High watermark has advanced since we read this event from the FIFO.
// This could happen on followers if the Leader has advanced the HWM
// and this node hasn't even had the event generated by its underlying
// but this node hasn't even had the event generated by its underlying
// database yet.
stats.Add(cdcNumFIFOReadIgnored, 1)
continue
@@ -550,12 +551,16 @@ func (s *Service) leaderHWMLoop() (chan struct{}, chan struct{}) {
if hwm == 0 {
continue
}
// Continually broadcast the high watermark, even if it
// hasn't advanced since the last time. This ensures that
// followers get the update even if there are no new events,
// or nodes that join the cluster get the current HWM.
if err := s.clstr.BroadcastHighWatermark(hwm); err != nil {
s.logger.Printf("error writing high watermark to store: %v", err)
s.logger.Printf("error broadcasting high watermark to Cluster: %v", err)
}
// While we always broadcast the high watermark, we only
// write it to disk and prune the FIFO if it has advanced
// since the last time we did so.
// While we always broadcast the high watermark, we only prune the
// FIFO if it has advanced since the last time we did so. There
// is no need to prune it multiple times for the same HWM.
if hwm <= hwmPersisted {
continue
}