Implement a WAL checkpoint-retry policy

This commit is contained in:
Philip O'Toole
2026-01-09 01:41:26 -05:00
parent b179f5f7d5
commit 3b1ac61302

View File

@@ -138,6 +138,7 @@ const (
raftLogCacheSize = 512
trailingScale = 1.25
observerChanLen = 50
maxFailedSnapshotsInRow = 5
baseVacuumTimeKey = "rqlite_base_vacuum"
lastVacuumTimeKey = "rqlite_last_vacuum"
@@ -344,9 +345,10 @@ type Store struct {
snapshotWClose chan struct{}
snapshotWDone chan struct{}
// Snapshotting synchronization
snapshotSync *rsync.SyncChannels
snapshotCAS *rsync.CheckAndSet
// Snapshotting synchronization and and management
snapshotSync *rsync.SyncChannels
snapshotCAS *rsync.CheckAndSet
numFailedSnapshotsInRow int
// Latest log entry index actually reflected by the FSM. Due to Raft code
// these values are not updated automatically after a Snapshot-restore.
@@ -2533,22 +2535,11 @@ func (s *Store) fsmSnapshot() (fSnap raft.FSMSnapshot, retErr error) {
var walTmpFD *os.File
if fullNeeded {
chkStartTime := time.Now()
meta, err := s.db.Checkpoint(sql.CheckpointTruncate)
if err != nil {
if err := s.checkpointWAL(); err != nil {
stats.Add(numFullCheckpointFailed, 1)
return nil, fmt.Errorf("snapshot can't complete due to FULL Snapshot checkpoint error (will retry): %s",
return nil, fmt.Errorf("full snapshot can't complete due to WAL checkpoint error (will retry): %s",
err.Error())
}
if !meta.Success() {
if meta.Moved < meta.Pages {
stats.Add(numWALCheckpointIncomplete, 1)
return nil, fmt.Errorf("snapshot can't complete due to FULL Snapshot checkpoint incomplete (will retry): %s)",
meta.String())
}
s.logger.Printf("full Snapshot checkpoint moved %d/%d pages, but did not truncate WAL, forcing truncate",
meta.Moved, meta.Pages)
s.mustTruncateCheckpoint()
}
stats.Get(snapshotCreateChkTruncateDuration).(*expvar.Int).Set(time.Since(chkStartTime).Milliseconds())
dbFD, err := os.Open(s.db.Path())
if err != nil {
@@ -2605,22 +2596,11 @@ func (s *Store) fsmSnapshot() (fSnap raft.FSMSnapshot, retErr error) {
return nil, err
}
chkTStartTime := time.Now()
meta, err := s.db.Checkpoint(sql.CheckpointTruncate)
if err != nil {
if err := s.checkpointWAL(); err != nil {
stats.Add(numWALCheckpointTruncateFailed, 1)
return nil, fmt.Errorf("snapshot can't complete due to WAL checkpoint error (will retry): %s",
return nil, fmt.Errorf("incremental snapshot can't complete due to WAL checkpoint error (will retry): %s",
err.Error())
}
if !meta.Success() {
if meta.Moved < meta.Pages {
stats.Add(numWALCheckpointIncomplete, 1)
return nil, fmt.Errorf("snapshot can't complete due to Snapshot checkpoint incomplete (will retry %s)",
meta.String())
}
s.logger.Printf("incremental Snapshot checkpoint moved %d/%d pages, but did not truncate WAL, forcing truncate",
meta.Moved, meta.Pages)
s.mustTruncateCheckpoint()
}
stats.Get(snapshotCreateChkTruncateDuration).(*expvar.Int).Set(time.Since(chkTStartTime).Milliseconds())
stats.Get(snapshotPrecompactWALSize).(*expvar.Int).Set(walSzPre)
stats.Get(snapshotWALSize).(*expvar.Int).Set(walSzPost)
@@ -2923,6 +2903,43 @@ func (s *Store) runWALSnapshotting() (closeCh, doneCh chan struct{}) {
return closeCh, doneCh
}
// checkpointWAL performs a checkpoint of the WAL, truncating it. If it returns an error
// the checkpoint operation can be retried at the caller's discretion.
//
// This function also implements the policy that if a certain number of checkpoint attempts
// fail in a row, it will loop until is successful.
func (s *Store) checkpointWAL() (retErr error) {
defer func() {
if retErr != nil {
s.numFailedSnapshotsInRow++
if s.numFailedSnapshotsInRow == maxFailedSnapshotsInRow {
s.logger.Printf("too many failed snapshots in a row (%d), forcing WAL checkpoint truncate",
s.numFailedSnapshotsInRow)
s.mustTruncateCheckpoint()
s.numFailedSnapshotsInRow = 0
}
} else {
s.numFailedSnapshotsInRow = 0
}
}()
meta, err := s.db.Checkpoint(sql.CheckpointTruncate)
if err != nil {
return err
}
if !meta.Success() {
if meta.Pages == meta.Moved {
s.logger.Printf("checkpoint moved %d/%d pages, but did not truncate WAL, forcing truncate",
meta.Moved, meta.Pages)
s.mustTruncateCheckpoint()
return nil
}
stats.Add(numWALCheckpointIncomplete, 1)
return fmt.Errorf("checkpoint incomplete: %s)", meta.String())
}
return nil
}
// mustTruncateCheckpoint truncates the checkpointed WAL, retrying until successful or
// timing out.
//
@@ -2932,8 +2949,8 @@ func (s *Store) runWALSnapshotting() (closeCh, doneCh chan struct{}) {
// We do this by blocking all readers (writes are already blocked). This handling is due to
// research into SQLite and not seen as of yet.
//
// Finally, we could still panic here if we timeout trying to truncate. This could happen if
// a reader external to rqlite just won't let go.
// Finally, we could still timout here while trying to truncate. This could happen if a
// reader external to rqlite just won't let go.
func (s *Store) mustTruncateCheckpoint() {
startT := time.Now()
defer func() {
@@ -2954,7 +2971,7 @@ func (s *Store) mustTruncateCheckpoint() {
return
}
case <-time.After(mustWALCheckpointTimeout):
s.logger.Fatal("timed out trying to truncate checkpointed WAL - aborting")
s.logger.Fatal("timed out trying to truncate checkpointed WAL probably due to long-running read - aborting")
}
}
}