From a2a089b5425ad9126c40918011ee7e545db468d5 Mon Sep 17 00:00:00 2001 From: Copilot <198982749+Copilot@users.noreply.github.com> Date: Wed, 20 Aug 2025 13:53:44 -0400 Subject: [PATCH] Persist High Water Mark across CDC Service restarts --- CHANGELOG.md | 1 + cdc/service.go | 81 ++++++++++++++++++++++++++++++++++++++++++--- cdc/service_test.go | 62 ++++++++++++++++++++++++++++++++++ 3 files changed, 139 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 35170ba6..fd3b2069 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ - [PR #2233](https://github.com/rqlite/rqlite/pull/2233): Add Voters() and NonVoters() convenience methods to Store Servers. - [PR #2238](https://github.com/rqlite/rqlite/pull/2238): Improve CDC and FIFO unit testing. - [PR #2240](https://github.com/rqlite/rqlite/pull/2240): Add Len() method to CDC FIFO. +- [PR #2242](https://github.com/rqlite/rqlite/pull/2242): Persist High Water Mark across CDC Service restarts. ## v8.43.3 (August 14th 2025) ### Implementation changes and bug fixes diff --git a/cdc/service.go b/cdc/service.go index 4f44fbd0..ccba000c 100644 --- a/cdc/service.go +++ b/cdc/service.go @@ -3,6 +3,7 @@ package cdc import ( "bytes" "crypto/tls" + "encoding/json" "expvar" "fmt" "log" @@ -21,6 +22,7 @@ import ( const ( cdcDB = "fifo.db" + hwmFile = "hwm.json" leaderChanLen = 5 // Support any fast back-to-back leadership changes. ) @@ -31,6 +33,12 @@ const ( numSent = "sent_events" ) +// hwmData represents the structure of the high watermark file. +type hwmData struct { + HWM uint64 `json:"hwm"` + Timestamp string `json:"timestamp"` +} + // stats captures stats for the CDC Service. var stats *expvar.Map @@ -64,10 +72,11 @@ type Cluster interface { // Service is a CDC service that reads events from a channel and processes them. // It is used to stream changes to a HTTP endpoint. type Service struct { - serviceID string - nodeID string - dir string - clstr Cluster + serviceID string + nodeID string + dir string + hwmFilePath string + clstr Cluster // in is the channel from which the CDC events are read. in <-chan *proto.CDCIndexedEventGroup @@ -166,6 +175,7 @@ func NewService(nodeID, dir string, clstr Cluster, in <-chan *proto.CDCIndexedEv serviceID: cfg.ServiceID, nodeID: nodeID, dir: dir, + hwmFilePath: filepath.Join(dir, hwmFile), clstr: clstr, in: in, logOnly: cfg.LogOnly, @@ -193,7 +203,9 @@ func NewService(nodeID, dir string, clstr Cluster, in <-chan *proto.CDCIndexedEv } srv.fifo = fifo - srv.highWatermark.Store(0) + // Read initial HWM from file if it exists + initialHWM := readHWMFromFile(srv.hwmFilePath) + srv.highWatermark.Store(initialHWM) srv.highWatermarkingDisabled.SetBool(cfg.HighWatermarkingDisabled) return srv, nil } @@ -309,6 +321,9 @@ func (s *Service) mainLoop() { case hwm := <-s.hwmObCh: if hwm > s.highWatermark.Load() { s.highWatermark.Store(hwm) + if err := writeHWMToFile(s.hwmFilePath, hwm); err != nil { + s.logger.Printf("error writing high watermark to file: %v", err) + } // This means all events up to this high watermark have been // successfully sent to the webhook by the cluster. We can // delete all events up and including that point from our FIFO. @@ -413,3 +428,59 @@ func (s *Service) initBatcher() { } s.batcher = queue.New[*proto.CDCIndexedEventGroup](s.maxBatchSz, s.maxBatchSz, s.maxBatchDelay) } + +// readHWMFromFile reads the high watermark from the specified file. +// Returns 0 if the file doesn't exist or if there's an error reading it. +func readHWMFromFile(path string) uint64 { + data, err := os.ReadFile(path) + if err != nil { + return 0 // File doesn't exist or can't be read + } + + var hwm hwmData + if err := json.Unmarshal(data, &hwm); err != nil { + return 0 // Invalid JSON + } + + return hwm.HWM +} + +// writeHWMToFile writes the high watermark to the specified file with an fsync. +func writeHWMToFile(path string, hwm uint64) error { + data := hwmData{ + HWM: hwm, + Timestamp: time.Now().UTC().Format(time.RFC3339), + } + + jsonData, err := json.Marshal(data) + if err != nil { + return err + } + + // Write to a temporary file first, then rename to ensure atomicity + tmpPath := path + ".tmp" + file, err := os.Create(tmpPath) + if err != nil { + return err + } + defer os.Remove(tmpPath) + + _, err = file.Write(jsonData) + if err != nil { + file.Close() + return err + } + + // Sync to disk + if err := file.Sync(); err != nil { + file.Close() + return err + } + + if err := file.Close(); err != nil { + return err + } + + // Atomically replace the old file + return os.Rename(tmpPath, path) +} diff --git a/cdc/service_test.go b/cdc/service_test.go index fc835957..26d1d497 100644 --- a/cdc/service_test.go +++ b/cdc/service_test.go @@ -821,3 +821,65 @@ func testPoll(t *testing.T, condition func() bool, timeout time.Duration) { } } } + +// Test_ServiceHWMPersistence tests that the high watermark persists across service restarts. +func Test_ServiceHWMPersistence(t *testing.T) { + ResetStats() + + // Use a temp directory for this test + dir := t.TempDir() + + // Channel for the service to receive events + eventsCh := make(chan *proto.CDCIndexedEventGroup, 1) + + cl := &mockCluster{} + + cfg := DefaultConfig() + cfg.LogOnly = true // Use log-only mode to avoid HTTP server setup + cfg.MaxBatchSz = 1 + cfg.MaxBatchDelay = 50 * time.Millisecond + + // Create the first service + svc1, err := NewService("node1", dir, cl, eventsCh, cfg) + if err != nil { + t.Fatalf("failed to create first service: %v", err) + } + if err := svc1.Start(); err != nil { + t.Fatalf("failed to start first service: %v", err) + } + + // Make it the leader + cl.SignalLeaderChange(true) + + // Send an HWM update + testHWM := uint64(12345) + cl.SignalHWMUpdate(testHWM) + + // Wait for HWM update to be processed + initialCount := svc1.hwmUpdated.Load() + testPoll(t, func() bool { + return svc1.hwmUpdated.Load() > initialCount + }, 2*time.Second) + + // Verify that the HWM was updated + if svc1.HighWatermark() != testHWM { + t.Fatalf("expected high watermark to be %d, got %d", testHWM, svc1.HighWatermark()) + } + + // Stop the first service + svc1.Stop() + + // Create a new service using the same directory + svc2, err := NewService("node1", dir, cl, eventsCh, cfg) + if err != nil { + t.Fatalf("failed to create second service: %v", err) + } + + // Verify that the new service has the correct HWM value from the file + if svc2.HighWatermark() != testHWM { + t.Fatalf("expected new service to have high watermark %d, got %d", testHWM, svc2.HighWatermark()) + } + + // Clean up + svc2.Stop() +}