Persist High Water Mark across CDC Service restarts

This commit is contained in:
Copilot
2025-08-20 13:53:44 -04:00
committed by GitHub
parent abc9db0aff
commit a2a089b542
3 changed files with 139 additions and 5 deletions

View File

@@ -7,6 +7,7 @@
- [PR #2233](https://github.com/rqlite/rqlite/pull/2233): Add Voters() and NonVoters() convenience methods to Store Servers.
- [PR #2238](https://github.com/rqlite/rqlite/pull/2238): Improve CDC and FIFO unit testing.
- [PR #2240](https://github.com/rqlite/rqlite/pull/2240): Add Len() method to CDC FIFO.
- [PR #2242](https://github.com/rqlite/rqlite/pull/2242): Persist High Water Mark across CDC Service restarts.
## v8.43.3 (August 14th 2025)
### Implementation changes and bug fixes

View File

@@ -3,6 +3,7 @@ package cdc
import (
"bytes"
"crypto/tls"
"encoding/json"
"expvar"
"fmt"
"log"
@@ -21,6 +22,7 @@ import (
const (
cdcDB = "fifo.db"
hwmFile = "hwm.json"
leaderChanLen = 5 // Support any fast back-to-back leadership changes.
)
@@ -31,6 +33,12 @@ const (
numSent = "sent_events"
)
// hwmData represents the structure of the high watermark file.
type hwmData struct {
HWM uint64 `json:"hwm"`
Timestamp string `json:"timestamp"`
}
// stats captures stats for the CDC Service.
var stats *expvar.Map
@@ -64,10 +72,11 @@ type Cluster interface {
// Service is a CDC service that reads events from a channel and processes them.
// It is used to stream changes to a HTTP endpoint.
type Service struct {
serviceID string
nodeID string
dir string
clstr Cluster
serviceID string
nodeID string
dir string
hwmFilePath string
clstr Cluster
// in is the channel from which the CDC events are read.
in <-chan *proto.CDCIndexedEventGroup
@@ -166,6 +175,7 @@ func NewService(nodeID, dir string, clstr Cluster, in <-chan *proto.CDCIndexedEv
serviceID: cfg.ServiceID,
nodeID: nodeID,
dir: dir,
hwmFilePath: filepath.Join(dir, hwmFile),
clstr: clstr,
in: in,
logOnly: cfg.LogOnly,
@@ -193,7 +203,9 @@ func NewService(nodeID, dir string, clstr Cluster, in <-chan *proto.CDCIndexedEv
}
srv.fifo = fifo
srv.highWatermark.Store(0)
// Read initial HWM from file if it exists
initialHWM := readHWMFromFile(srv.hwmFilePath)
srv.highWatermark.Store(initialHWM)
srv.highWatermarkingDisabled.SetBool(cfg.HighWatermarkingDisabled)
return srv, nil
}
@@ -309,6 +321,9 @@ func (s *Service) mainLoop() {
case hwm := <-s.hwmObCh:
if hwm > s.highWatermark.Load() {
s.highWatermark.Store(hwm)
if err := writeHWMToFile(s.hwmFilePath, hwm); err != nil {
s.logger.Printf("error writing high watermark to file: %v", err)
}
// This means all events up to this high watermark have been
// successfully sent to the webhook by the cluster. We can
// delete all events up and including that point from our FIFO.
@@ -413,3 +428,59 @@ func (s *Service) initBatcher() {
}
s.batcher = queue.New[*proto.CDCIndexedEventGroup](s.maxBatchSz, s.maxBatchSz, s.maxBatchDelay)
}
// readHWMFromFile reads the high watermark from the specified file.
// Returns 0 if the file doesn't exist or if there's an error reading it.
func readHWMFromFile(path string) uint64 {
data, err := os.ReadFile(path)
if err != nil {
return 0 // File doesn't exist or can't be read
}
var hwm hwmData
if err := json.Unmarshal(data, &hwm); err != nil {
return 0 // Invalid JSON
}
return hwm.HWM
}
// writeHWMToFile writes the high watermark to the specified file with an fsync.
func writeHWMToFile(path string, hwm uint64) error {
data := hwmData{
HWM: hwm,
Timestamp: time.Now().UTC().Format(time.RFC3339),
}
jsonData, err := json.Marshal(data)
if err != nil {
return err
}
// Write to a temporary file first, then rename to ensure atomicity
tmpPath := path + ".tmp"
file, err := os.Create(tmpPath)
if err != nil {
return err
}
defer os.Remove(tmpPath)
_, err = file.Write(jsonData)
if err != nil {
file.Close()
return err
}
// Sync to disk
if err := file.Sync(); err != nil {
file.Close()
return err
}
if err := file.Close(); err != nil {
return err
}
// Atomically replace the old file
return os.Rename(tmpPath, path)
}

View File

@@ -821,3 +821,65 @@ func testPoll(t *testing.T, condition func() bool, timeout time.Duration) {
}
}
}
// Test_ServiceHWMPersistence tests that the high watermark persists across service restarts.
func Test_ServiceHWMPersistence(t *testing.T) {
ResetStats()
// Use a temp directory for this test
dir := t.TempDir()
// Channel for the service to receive events
eventsCh := make(chan *proto.CDCIndexedEventGroup, 1)
cl := &mockCluster{}
cfg := DefaultConfig()
cfg.LogOnly = true // Use log-only mode to avoid HTTP server setup
cfg.MaxBatchSz = 1
cfg.MaxBatchDelay = 50 * time.Millisecond
// Create the first service
svc1, err := NewService("node1", dir, cl, eventsCh, cfg)
if err != nil {
t.Fatalf("failed to create first service: %v", err)
}
if err := svc1.Start(); err != nil {
t.Fatalf("failed to start first service: %v", err)
}
// Make it the leader
cl.SignalLeaderChange(true)
// Send an HWM update
testHWM := uint64(12345)
cl.SignalHWMUpdate(testHWM)
// Wait for HWM update to be processed
initialCount := svc1.hwmUpdated.Load()
testPoll(t, func() bool {
return svc1.hwmUpdated.Load() > initialCount
}, 2*time.Second)
// Verify that the HWM was updated
if svc1.HighWatermark() != testHWM {
t.Fatalf("expected high watermark to be %d, got %d", testHWM, svc1.HighWatermark())
}
// Stop the first service
svc1.Stop()
// Create a new service using the same directory
svc2, err := NewService("node1", dir, cl, eventsCh, cfg)
if err != nil {
t.Fatalf("failed to create second service: %v", err)
}
// Verify that the new service has the correct HWM value from the file
if svc2.HighWatermark() != testHWM {
t.Fatalf("expected new service to have high watermark %d, got %d", testHWM, svc2.HighWatermark())
}
// Clean up
svc2.Stop()
}