Implement a force-checkpoint after multiple failures policy

This commit is contained in:
Philip O'Toole
2026-01-09 02:01:30 -05:00
committed by GitHub
parent e6e6e8859b
commit 976f2779d3
2 changed files with 56 additions and 32 deletions

View File

@@ -1,6 +1,7 @@
## v9.3.14 (unreleased)
### Implementation changes and bug fixes
- [PR #2444](https://github.com/rqlite/rqlite/pull/2444): Increase WAL Checkpoint busy timeout to 250ms.
- [PR #2445](https://github.com/rqlite/rqlite/pull/2445): Implement a must-checkpoint-WAL after multiple failures policy.
## v9.3.13 (January 8th 2026)
### Implementation changes and bug fixes

View File

@@ -138,6 +138,7 @@ const (
raftLogCacheSize = 512
trailingScale = 1.25
observerChanLen = 50
maxFailedSnapshotsInRow = 5
baseVacuumTimeKey = "rqlite_base_vacuum"
lastVacuumTimeKey = "rqlite_last_vacuum"
@@ -344,9 +345,10 @@ type Store struct {
snapshotWClose chan struct{}
snapshotWDone chan struct{}
// Snapshotting synchronization
snapshotSync *rsync.SyncChannels
snapshotCAS *rsync.CheckAndSet
// Snapshotting synchronization and and management
snapshotSync *rsync.SyncChannels
snapshotCAS *rsync.CheckAndSet
numFailedSnapshotsInRow int
// Latest log entry index actually reflected by the FSM. Due to Raft code
// these values are not updated automatically after a Snapshot-restore.
@@ -2533,22 +2535,11 @@ func (s *Store) fsmSnapshot() (fSnap raft.FSMSnapshot, retErr error) {
var walTmpFD *os.File
if fullNeeded {
chkStartTime := time.Now()
meta, err := s.db.Checkpoint(sql.CheckpointTruncate)
if err != nil {
if err := s.checkpointWAL(); err != nil {
stats.Add(numFullCheckpointFailed, 1)
return nil, fmt.Errorf("snapshot can't complete due to FULL Snapshot checkpoint error (will retry): %s",
return nil, fmt.Errorf("full snapshot can't complete due to WAL checkpoint error (will retry): %s",
err.Error())
}
if !meta.Success() {
if meta.Moved < meta.Pages {
stats.Add(numWALCheckpointIncomplete, 1)
return nil, fmt.Errorf("snapshot can't complete due to FULL Snapshot checkpoint incomplete (will retry): %s)",
meta.String())
}
s.logger.Printf("full Snapshot checkpoint moved %d/%d pages, but did not truncate WAL, forcing truncate",
meta.Moved, meta.Pages)
s.mustTruncateCheckpoint()
}
stats.Get(snapshotCreateChkTruncateDuration).(*expvar.Int).Set(time.Since(chkStartTime).Milliseconds())
dbFD, err := os.Open(s.db.Path())
if err != nil {
@@ -2605,22 +2596,11 @@ func (s *Store) fsmSnapshot() (fSnap raft.FSMSnapshot, retErr error) {
return nil, err
}
chkTStartTime := time.Now()
meta, err := s.db.Checkpoint(sql.CheckpointTruncate)
if err != nil {
if err := s.checkpointWAL(); err != nil {
stats.Add(numWALCheckpointTruncateFailed, 1)
return nil, fmt.Errorf("snapshot can't complete due to WAL checkpoint error (will retry): %s",
return nil, fmt.Errorf("incremental snapshot can't complete due to WAL checkpoint error (will retry): %s",
err.Error())
}
if !meta.Success() {
if meta.Moved < meta.Pages {
stats.Add(numWALCheckpointIncomplete, 1)
return nil, fmt.Errorf("snapshot can't complete due to Snapshot checkpoint incomplete (will retry %s)",
meta.String())
}
s.logger.Printf("incremental Snapshot checkpoint moved %d/%d pages, but did not truncate WAL, forcing truncate",
meta.Moved, meta.Pages)
s.mustTruncateCheckpoint()
}
stats.Get(snapshotCreateChkTruncateDuration).(*expvar.Int).Set(time.Since(chkTStartTime).Milliseconds())
stats.Get(snapshotPrecompactWALSize).(*expvar.Int).Set(walSzPre)
stats.Get(snapshotWALSize).(*expvar.Int).Set(walSzPost)
@@ -2923,6 +2903,46 @@ func (s *Store) runWALSnapshotting() (closeCh, doneCh chan struct{}) {
return closeCh, doneCh
}
// checkpointWAL performs a checkpoint of the WAL, truncating it. If it returns an error
// the checkpoint operation can be retried at the caller's discretion. If this function
// encounters an error such that the checkpoint must be retried, it will automatically do
// that until it is successful (or a timeout fires).
//
// This function also implements the policy that if a certain number of checkpoint attempts
// fail in a row, it will loop until is successful.
func (s *Store) checkpointWAL() (retErr error) {
defer func() {
if retErr != nil {
s.numFailedSnapshotsInRow++
if s.numFailedSnapshotsInRow == maxFailedSnapshotsInRow {
s.logger.Printf("too many failed snapshots in a row (%d), forcing WAL checkpoint truncate",
s.numFailedSnapshotsInRow)
s.mustTruncateCheckpoint()
s.numFailedSnapshotsInRow = 0
retErr = nil
}
} else {
s.numFailedSnapshotsInRow = 0
}
}()
meta, err := s.db.Checkpoint(sql.CheckpointTruncate)
if err != nil {
return err
}
if !meta.Success() {
if meta.Pages == meta.Moved {
s.logger.Printf("checkpoint moved %d/%d pages, but did not truncate WAL, forcing truncate",
meta.Moved, meta.Pages)
s.mustTruncateCheckpoint()
return nil
}
stats.Add(numWALCheckpointIncomplete, 1)
return fmt.Errorf("checkpoint incomplete: %s", meta.String())
}
return nil
}
// mustTruncateCheckpoint truncates the checkpointed WAL, retrying until successful or
// timing out.
//
@@ -2932,8 +2952,8 @@ func (s *Store) runWALSnapshotting() (closeCh, doneCh chan struct{}) {
// We do this by blocking all readers (writes are already blocked). This handling is due to
// research into SQLite and not seen as of yet.
//
// Finally, we could still panic here if we timeout trying to truncate. This could happen if
// a reader external to rqlite just won't let go.
// Finally, we could still timeout here while trying to truncate. This could happen if a
// reader external to rqlite just won't let go.
func (s *Store) mustTruncateCheckpoint() {
startT := time.Now()
defer func() {
@@ -2954,7 +2974,10 @@ func (s *Store) mustTruncateCheckpoint() {
return
}
case <-time.After(mustWALCheckpointTimeout):
s.logger.Fatal("timed out trying to truncate checkpointed WAL - aborting")
msg := fmt.Sprintf("timed out trying to truncate checkpoint WAL after %s,"+
" probably due to external long-running read - aborting",
mustWALCheckpointTimeout)
s.logger.Fatal(msg)
}
}
}