From 976f2779d326d9c175c2312b6c388a84fb3fd896 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Fri, 9 Jan 2026 02:01:30 -0500 Subject: [PATCH] Implement a force-checkpoint after multiple failures policy --- CHANGELOG.md | 1 + store/store.go | 87 +++++++++++++++++++++++++++++++------------------- 2 files changed, 56 insertions(+), 32 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index abe8d36d..74939c02 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ## v9.3.14 (unreleased) ### Implementation changes and bug fixes - [PR #2444](https://github.com/rqlite/rqlite/pull/2444): Increase WAL Checkpoint busy timeout to 250ms. +- [PR #2445](https://github.com/rqlite/rqlite/pull/2445): Implement a must-checkpoint-WAL after multiple failures policy. ## v9.3.13 (January 8th 2026) ### Implementation changes and bug fixes diff --git a/store/store.go b/store/store.go index 3293c8c4..4828f9d1 100644 --- a/store/store.go +++ b/store/store.go @@ -138,6 +138,7 @@ const ( raftLogCacheSize = 512 trailingScale = 1.25 observerChanLen = 50 + maxFailedSnapshotsInRow = 5 baseVacuumTimeKey = "rqlite_base_vacuum" lastVacuumTimeKey = "rqlite_last_vacuum" @@ -344,9 +345,10 @@ type Store struct { snapshotWClose chan struct{} snapshotWDone chan struct{} - // Snapshotting synchronization - snapshotSync *rsync.SyncChannels - snapshotCAS *rsync.CheckAndSet + // Snapshotting synchronization and and management + snapshotSync *rsync.SyncChannels + snapshotCAS *rsync.CheckAndSet + numFailedSnapshotsInRow int // Latest log entry index actually reflected by the FSM. Due to Raft code // these values are not updated automatically after a Snapshot-restore. @@ -2533,22 +2535,11 @@ func (s *Store) fsmSnapshot() (fSnap raft.FSMSnapshot, retErr error) { var walTmpFD *os.File if fullNeeded { chkStartTime := time.Now() - meta, err := s.db.Checkpoint(sql.CheckpointTruncate) - if err != nil { + if err := s.checkpointWAL(); err != nil { stats.Add(numFullCheckpointFailed, 1) - return nil, fmt.Errorf("snapshot can't complete due to FULL Snapshot checkpoint error (will retry): %s", + return nil, fmt.Errorf("full snapshot can't complete due to WAL checkpoint error (will retry): %s", err.Error()) } - if !meta.Success() { - if meta.Moved < meta.Pages { - stats.Add(numWALCheckpointIncomplete, 1) - return nil, fmt.Errorf("snapshot can't complete due to FULL Snapshot checkpoint incomplete (will retry): %s)", - meta.String()) - } - s.logger.Printf("full Snapshot checkpoint moved %d/%d pages, but did not truncate WAL, forcing truncate", - meta.Moved, meta.Pages) - s.mustTruncateCheckpoint() - } stats.Get(snapshotCreateChkTruncateDuration).(*expvar.Int).Set(time.Since(chkStartTime).Milliseconds()) dbFD, err := os.Open(s.db.Path()) if err != nil { @@ -2605,22 +2596,11 @@ func (s *Store) fsmSnapshot() (fSnap raft.FSMSnapshot, retErr error) { return nil, err } chkTStartTime := time.Now() - meta, err := s.db.Checkpoint(sql.CheckpointTruncate) - if err != nil { + if err := s.checkpointWAL(); err != nil { stats.Add(numWALCheckpointTruncateFailed, 1) - return nil, fmt.Errorf("snapshot can't complete due to WAL checkpoint error (will retry): %s", + return nil, fmt.Errorf("incremental snapshot can't complete due to WAL checkpoint error (will retry): %s", err.Error()) } - if !meta.Success() { - if meta.Moved < meta.Pages { - stats.Add(numWALCheckpointIncomplete, 1) - return nil, fmt.Errorf("snapshot can't complete due to Snapshot checkpoint incomplete (will retry %s)", - meta.String()) - } - s.logger.Printf("incremental Snapshot checkpoint moved %d/%d pages, but did not truncate WAL, forcing truncate", - meta.Moved, meta.Pages) - s.mustTruncateCheckpoint() - } stats.Get(snapshotCreateChkTruncateDuration).(*expvar.Int).Set(time.Since(chkTStartTime).Milliseconds()) stats.Get(snapshotPrecompactWALSize).(*expvar.Int).Set(walSzPre) stats.Get(snapshotWALSize).(*expvar.Int).Set(walSzPost) @@ -2923,6 +2903,46 @@ func (s *Store) runWALSnapshotting() (closeCh, doneCh chan struct{}) { return closeCh, doneCh } +// checkpointWAL performs a checkpoint of the WAL, truncating it. If it returns an error +// the checkpoint operation can be retried at the caller's discretion. If this function +// encounters an error such that the checkpoint must be retried, it will automatically do +// that until it is successful (or a timeout fires). +// +// This function also implements the policy that if a certain number of checkpoint attempts +// fail in a row, it will loop until is successful. +func (s *Store) checkpointWAL() (retErr error) { + defer func() { + if retErr != nil { + s.numFailedSnapshotsInRow++ + if s.numFailedSnapshotsInRow == maxFailedSnapshotsInRow { + s.logger.Printf("too many failed snapshots in a row (%d), forcing WAL checkpoint truncate", + s.numFailedSnapshotsInRow) + s.mustTruncateCheckpoint() + s.numFailedSnapshotsInRow = 0 + retErr = nil + } + } else { + s.numFailedSnapshotsInRow = 0 + } + }() + + meta, err := s.db.Checkpoint(sql.CheckpointTruncate) + if err != nil { + return err + } + if !meta.Success() { + if meta.Pages == meta.Moved { + s.logger.Printf("checkpoint moved %d/%d pages, but did not truncate WAL, forcing truncate", + meta.Moved, meta.Pages) + s.mustTruncateCheckpoint() + return nil + } + stats.Add(numWALCheckpointIncomplete, 1) + return fmt.Errorf("checkpoint incomplete: %s", meta.String()) + } + return nil +} + // mustTruncateCheckpoint truncates the checkpointed WAL, retrying until successful or // timing out. // @@ -2932,8 +2952,8 @@ func (s *Store) runWALSnapshotting() (closeCh, doneCh chan struct{}) { // We do this by blocking all readers (writes are already blocked). This handling is due to // research into SQLite and not seen as of yet. // -// Finally, we could still panic here if we timeout trying to truncate. This could happen if -// a reader external to rqlite just won't let go. +// Finally, we could still timeout here while trying to truncate. This could happen if a +// reader external to rqlite just won't let go. func (s *Store) mustTruncateCheckpoint() { startT := time.Now() defer func() { @@ -2954,7 +2974,10 @@ func (s *Store) mustTruncateCheckpoint() { return } case <-time.After(mustWALCheckpointTimeout): - s.logger.Fatal("timed out trying to truncate checkpointed WAL - aborting") + msg := fmt.Sprintf("timed out trying to truncate checkpoint WAL after %s,"+ + " probably due to external long-running read - aborting", + mustWALCheckpointTimeout) + s.logger.Fatal(msg) } } }