Handle possible WAL checkpoint failure

This commit is contained in:
Philip O'Toole
2026-01-03 22:54:51 -05:00
committed by GitHub
parent 9712e19b69
commit a325a9b9c7
11 changed files with 362 additions and 45 deletions

View File

@@ -1,6 +1,6 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.36.10
// protoc-gen-go v1.36.11
// protoc v6.33.0
// source: message.proto

View File

@@ -1,6 +1,6 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.36.10
// protoc-gen-go v1.36.11
// protoc v6.33.0
// source: command.proto
@@ -439,6 +439,7 @@ type Statement struct {
Sql string `protobuf:"bytes,1,opt,name=sql,proto3" json:"sql,omitempty"`
Parameters []*Parameter `protobuf:"bytes,2,rep,name=parameters,proto3" json:"parameters,omitempty"`
ForceQuery bool `protobuf:"varint,3,opt,name=forceQuery,proto3" json:"forceQuery,omitempty"`
ForceStall bool `protobuf:"varint,4,opt,name=forceStall,proto3" json:"forceStall,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
@@ -494,6 +495,13 @@ func (x *Statement) GetForceQuery() bool {
return false
}
func (x *Statement) GetForceStall() bool {
if x != nil {
return x.ForceStall
}
return false
}
type Request struct {
state protoimpl.MessageState `protogen:"open.v1"`
Transaction bool `protobuf:"varint,1,opt,name=transaction,proto3" json:"transaction,omitempty"`
@@ -2086,7 +2094,7 @@ const file_command_proto_rawDesc = "" +
"\x01y\x18\x04 \x01(\fH\x00R\x01y\x12\x0e\n" +
"\x01s\x18\x05 \x01(\tH\x00R\x01s\x12\x12\n" +
"\x04name\x18\x06 \x01(\tR\x04nameB\a\n" +
"\x05value\"q\n" +
"\x05value\"\x91\x01\n" +
"\tStatement\x12\x10\n" +
"\x03sql\x18\x01 \x01(\tR\x03sql\x122\n" +
"\n" +
@@ -2094,7 +2102,10 @@ const file_command_proto_rawDesc = "" +
"parameters\x12\x1e\n" +
"\n" +
"forceQuery\x18\x03 \x01(\bR\n" +
"forceQuery\"\xa7\x01\n" +
"forceQuery\x12\x1e\n" +
"\n" +
"forceStall\x18\x04 \x01(\bR\n" +
"forceStall\"\xa7\x01\n" +
"\aRequest\x12 \n" +
"\vtransaction\x18\x01 \x01(\bR\vtransaction\x122\n" +
"\n" +

View File

@@ -19,6 +19,7 @@ message Statement {
string sql = 1;
repeated Parameter parameters = 2;
bool forceQuery = 3;
bool forceStall = 4;
}
message Request {

View File

@@ -166,6 +166,23 @@ type PoolStats struct {
MaxLifetimeClosed int64 `json:"max_lifetime_closed"`
}
// CheckpointMeta contains metadata about a WAL checkpoint operation.
type CheckpointMeta struct {
Code int
Pages int
Moved int
}
// String returns a string representation of the CheckpointMeta.
func (cm *CheckpointMeta) String() string {
return fmt.Sprintf("Code=%d, Pages=%d, Moved=%d", cm.Code, cm.Pages, cm.Moved)
}
// Success returns true if the checkpoint completed successfully.
func (cm *CheckpointMeta) Success() bool {
return cm.Code == 0
}
// Open opens a file-based database using the default driver.
func Open(dbPath string, fkEnabled, wal bool) (retDB *DB, retErr error) {
return OpenWithDriver(DefaultDriver(), dbPath, fkEnabled, wal)
@@ -641,7 +658,7 @@ func (db *DB) BusyTimeout() (rwMs, roMs int, err error) {
// Checkpoint checkpoints the WAL file. If the WAL file is not enabled, this
// function is a no-op.
func (db *DB) Checkpoint(mode CheckpointMode) error {
func (db *DB) Checkpoint(mode CheckpointMode) (*CheckpointMeta, error) {
return db.CheckpointWithTimeout(mode, 0)
}
@@ -649,7 +666,7 @@ func (db *DB) Checkpoint(mode CheckpointMode) error {
// run to completion within the given duration, an error is returned. If the
// duration is 0, the busy timeout is not modified before executing the
// checkpoint.
func (db *DB) CheckpointWithTimeout(mode CheckpointMode, dur time.Duration) (err error) {
func (db *DB) CheckpointWithTimeout(mode CheckpointMode, dur time.Duration) (meta *CheckpointMeta, err error) {
start := time.Now()
defer func() {
if err != nil {
@@ -663,10 +680,10 @@ func (db *DB) CheckpointWithTimeout(mode CheckpointMode, dur time.Duration) (err
if dur > 0 {
rwBt, _, err := db.BusyTimeout()
if err != nil {
return fmt.Errorf("failed to get busy_timeout on checkpointing connection: %s", err.Error())
return nil, fmt.Errorf("failed to get busy_timeout on checkpointing connection: %s", err.Error())
}
if err := db.SetBusyTimeout(int(dur.Milliseconds()), -1); err != nil {
return fmt.Errorf("failed to set busy_timeout on checkpointing connection: %s", err.Error())
return nil, fmt.Errorf("failed to set busy_timeout on checkpointing connection: %s", err.Error())
}
defer func() {
// Reset back to default
@@ -678,15 +695,15 @@ func (db *DB) CheckpointWithTimeout(mode CheckpointMode, dur time.Duration) (err
ok, nPages, nMoved, err := checkpointDB(db.rwDB, mode)
if err != nil {
return fmt.Errorf("error checkpointing WAL: %s", err.Error())
return nil, fmt.Errorf("error checkpointing WAL: %s", err.Error())
}
stats.Add(numCheckpointedPages, int64(nPages))
stats.Add(numCheckpointedMoves, int64(nMoved))
if ok != 0 {
return fmt.Errorf("failed to completely checkpoint WAL (%d ok, %d pages, %d moved)",
ok, nPages, nMoved)
}
return nil
return &CheckpointMeta{
Code: ok,
Pages: nPages,
Moved: nMoved,
}, nil
}
// DisableCheckpointing disables the automatic checkpointing that occurs when
@@ -1233,6 +1250,13 @@ func (db *DB) queryStmtWithConn(ctx context.Context, stmt *command.Statement, xT
Parameters: params,
})
// Check for slow query, blocked query, etc testing. This field
// should never set by production code and is only for fault-injection
// testing purposes.
if stmt.ForceStall {
<-make(chan struct{})
}
// One-time population of any empty types. Best effort, ignore
// error.
if needsQueryTypes {

View File

@@ -7,7 +7,9 @@ import (
"testing"
"time"
command "github.com/rqlite/rqlite/v9/command/proto"
"github.com/rqlite/rqlite/v9/db/wal"
"github.com/rqlite/rqlite/v9/internal/rsum"
)
// Test_WALDatabaseCheckpointOKNoWrites tests that a checkpoint succeeds
@@ -22,9 +24,140 @@ func Test_WALDatabaseCheckpointOKNoWrites(t *testing.T) {
t.Fatalf("failed to open database in WAL mode: %s", err.Error())
}
defer db.Close()
if err := db.Checkpoint(CheckpointTruncate); err != nil {
meta, err := db.Checkpoint(CheckpointTruncate)
if err != nil {
t.Fatalf("failed to checkpoint database in WAL mode with nonexistent WAL: %s", err.Error())
}
if !meta.Success() {
t.Fatalf("expected checkpoint to complete successfully")
}
if meta.Moved != 0 {
t.Fatalf("expected MOVED to be 0, got %d", meta.Moved)
}
if meta.Pages != 0 {
t.Fatalf("expected PAGES to be 0, got %d", meta.Pages)
}
}
// Test_WALDatabaseCheckpointOK tests that a checkpoint succeeds
// with a write.
func Test_WALDatabaseCheckpointOK(t *testing.T) {
path := mustTempFile()
defer os.Remove(path)
db, err := Open(path, false, true)
if err != nil {
t.Fatalf("failed to open database in WAL mode: %s", err.Error())
}
defer db.Close()
_, err = db.ExecuteStringStmt(`CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`)
if err != nil {
t.Fatalf("failed to execute on single node: %s", err.Error())
}
meta, err := db.Checkpoint(CheckpointTruncate)
if err != nil {
t.Fatalf("failed to checkpoint database: %s", err.Error())
}
if !meta.Success() {
t.Fatalf("expected checkpoint to complete successfully")
}
if meta.Moved != 0 {
t.Fatalf("expected MOVED to be 0 since WAL was truncated, got %d", meta.Moved)
}
if meta.Pages != 0 {
t.Fatalf("expected PAGES to be 0 since WAL was truncated, got %d", meta.Pages)
}
// Ensure idempotency by checkpointing again.
meta, err = db.Checkpoint(CheckpointTruncate)
if err != nil {
t.Fatalf("failed to checkpoint database: %s", err.Error())
}
if !meta.Success() {
t.Fatalf("expected checkpoint to complete successfully")
}
}
// Test_WALDatabaseCheckpointOK_NoWALChange tests that a checkpoint
// that is blocked by a long-running read does not result in a
// change to the WAL file. This is to show that we can safely retry
// the truncate checkpoint later.
func Test_WALDatabaseCheckpointOK_NoWALChange(t *testing.T) {
path := mustTempFile()
defer os.Remove(path)
db, err := Open(path, false, true)
if err != nil {
t.Fatalf("failed to open database in WAL mode: %s", err.Error())
}
defer db.Close()
_, err = db.ExecuteStringStmt(`CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`)
if err != nil {
t.Fatalf("failed to execute on single node: %s", err.Error())
}
_, err = db.ExecuteStringStmt(`INSERT INTO foo(name) VALUES("alice")`)
if err != nil {
t.Fatalf("failed to execute INSERT on single node: %s", err.Error())
}
// Issue a long-running read that should block the checkpoint.
qr := &command.Request{
Statements: []*command.Statement{
{
Sql: "SELECT * FROM foo",
ForceStall: true,
},
},
}
go func() {
db.Query(qr, false)
}()
time.Sleep(2 * time.Second)
_, err = db.ExecuteStringStmt(`INSERT INTO foo(name) VALUES("alice")`)
if err != nil {
t.Fatalf("failed to execute INSERT on single node: %s", err.Error())
}
// Get the hash of the WAL file before the checkpoint.
h1, err := rsum.MD5(db.WALPath())
if err != nil {
t.Fatalf("failed to hash WAL file: %s", err.Error())
}
_, err = db.ExecuteStringStmt(`PRAGMA BUSY_TIMEOUT = 1`)
if err != nil {
t.Fatalf("failed to execute on single node: %s", err.Error())
}
meta, err := db.Checkpoint(CheckpointTruncate)
if err != nil {
t.Fatalf("failed to checkpoint database: %s", err.Error())
}
if meta.Success() {
t.Fatalf("expected checkpoint to be unsuccessful due to blocking read")
}
if meta.Moved == 0 {
t.Fatalf("expected MOVED to be > 0 since some pages should have been moved")
}
if meta.Pages == 0 {
t.Fatalf("expected PAGES to be > 0 since WAL should have pages")
}
if meta.Moved >= meta.Pages {
t.Fatalf("expected MOVED to be < PAGES since checkpoint incomplete")
}
// Check hash again.
h2, err := rsum.MD5(db.WALPath())
if err != nil {
t.Fatalf("failed to hash WAL file: %s", err.Error())
}
if h1 != h2 {
t.Fatalf("expected WAL file to be unchanged after incomplete checkpoint")
}
}
// Test_WALDatabaseCheckpointOKDelete tests that a checkpoint returns no error
@@ -41,7 +174,7 @@ func Test_WALDatabaseCheckpointOKDelete(t *testing.T) {
t.Fatalf("WAL mode enabled")
}
defer db.Close()
if err := db.Checkpoint(CheckpointTruncate); err != nil {
if _, err := db.Checkpoint(CheckpointTruncate); err != nil {
t.Fatalf("failed to checkpoint database in DELETE mode: %s", err.Error())
}
}
@@ -71,8 +204,14 @@ func Test_WALDatabaseCheckpoint_RestartTruncate(t *testing.T) {
}
walPreBytes := mustReadBytes(db.WALPath())
if err := db.Checkpoint(CheckpointRestart); err != nil {
if meta, err := db.Checkpoint(CheckpointRestart); err != nil {
t.Fatalf("failed to checkpoint database: %s", err.Error())
} else if !meta.Success() {
t.Fatalf("expected checkpoint to complete successfully")
} else if meta.Moved == 0 {
t.Fatalf("expected some pages to be moved during RESTART checkpoint")
} else if meta.Pages == 0 {
t.Fatalf("expected some pages to be in the WAL during RESTART checkpoint")
}
walPostBytes := mustReadBytes(db.WALPath())
if !bytes.Equal(walPreBytes, walPostBytes) {
@@ -88,8 +227,12 @@ func Test_WALDatabaseCheckpoint_RestartTruncate(t *testing.T) {
t.Fatalf("expected %s, got %s", exp, got)
}
if err := db.Checkpoint(CheckpointTruncate); err != nil {
if meta, err := db.Checkpoint(CheckpointTruncate); err != nil {
t.Fatalf("failed to checkpoint database: %s", err.Error())
} else if !meta.Success() {
t.Fatalf("expected checkpoint to complete successfully")
} else if meta.Moved != 0 {
t.Fatalf("expected 0 pages to be moved during checkpoint truncate since nowrite since restart checkpoint")
}
sz, err := fileSize(db.WALPath())
if err != nil {
@@ -148,8 +291,12 @@ func Test_WALDatabaseCheckpoint_RestartTimeout(t *testing.T) {
t.Fatalf("expected %s, got %s", exp, got)
}
if err := db.CheckpointWithTimeout(CheckpointRestart, 250*time.Millisecond); err == nil {
t.Fatal("expected error due to failure to checkpoint")
meta, err := db.CheckpointWithTimeout(CheckpointRestart, 250*time.Millisecond)
if err != nil {
t.Fatal("expected no error when checkpoint times out due to a blocking read transaction")
}
if meta.Success() {
t.Fatal("expected checkpoint to be unsuccessful")
}
// Get some information on the WAL file before the checkpoint. The goal here is
@@ -174,7 +321,7 @@ func Test_WALDatabaseCheckpoint_RestartTimeout(t *testing.T) {
}
blockingDB.Close()
if err := db.CheckpointWithTimeout(CheckpointRestart, 250*time.Millisecond); err != nil {
if _, err := db.CheckpointWithTimeout(CheckpointRestart, 250*time.Millisecond); err != nil {
t.Fatalf("failed to checkpoint database: %s", err.Error())
}
}
@@ -220,9 +367,14 @@ func Test_WALDatabaseCheckpoint_TruncateTimeout(t *testing.T) {
t.Fatalf("expected %s, got %s", exp, got)
}
if err := db.CheckpointWithTimeout(CheckpointTruncate, 250*time.Millisecond); err == nil {
t.Fatal("expected error due to failure to checkpoint")
meta, err := db.CheckpointWithTimeout(CheckpointRestart, 250*time.Millisecond)
if err != nil {
t.Fatal("expected no error due to failure to checkpoint due to COMMIT")
}
if meta.Success() {
t.Fatal("expected checkpoint to be unsuccessful")
}
postWALBytes := mustReadBytes(db.WALPath())
if !bytes.Equal(preWALBytes, postWALBytes) {
t.Fatalf("wal file should be unchanged after checkpoint failure")
@@ -248,7 +400,7 @@ func Test_WALDatabaseCheckpoint_TruncateTimeout(t *testing.T) {
}
blockingDB.Close()
if err := db.CheckpointWithTimeout(CheckpointTruncate, 250*time.Millisecond); err != nil {
if _, err := db.CheckpointWithTimeout(CheckpointTruncate, 250*time.Millisecond); err != nil {
t.Fatalf("failed to checkpoint database: %s", err.Error())
}
if mustFileSize(db.WALPath()) != 0 {

View File

@@ -166,7 +166,7 @@ func Test_WALNotCheckpointedOnClose(t *testing.T) {
}
// Checkpoint to table exists in main database file.
if err := db.Checkpoint(CheckpointTruncate); err != nil {
if _, err := db.Checkpoint(CheckpointTruncate); err != nil {
t.Fatalf("failed to checkpoint database: %s", err.Error())
}
@@ -299,7 +299,7 @@ func Test_TableCreation(t *testing.T) {
testQ()
// Confirm checkpoint works without error.
if err := db.Checkpoint(CheckpointRestart); err != nil {
if _, err := db.Checkpoint(CheckpointRestart); err != nil {
t.Fatalf("failed to checkpoint database: %s", err.Error())
}
testQ()
@@ -381,7 +381,7 @@ func Test_DBSums(t *testing.T) {
t.Fatalf("WAL sum did not change after insertion")
}
if err := db.Checkpoint(CheckpointRestart); err != nil {
if _, err := db.Checkpoint(CheckpointRestart); err != nil {
t.Fatalf("failed to checkpoint database: %s", err.Error())
}
@@ -439,7 +439,7 @@ func Test_DBLastModified(t *testing.T) {
// Checkpoint, and check time is later. On some platforms the time resolution isn't that
// high, so we sleep so the test won't suffer a false failure.
time.Sleep(1 * time.Second)
if err := db.Checkpoint(CheckpointRestart); err != nil {
if _, err := db.Checkpoint(CheckpointRestart); err != nil {
t.Fatalf("failed to checkpoint database: %s", err.Error())
}
lm3, err := db.LastModified()
@@ -944,14 +944,14 @@ func Test_WALDatabaseCreatedOK(t *testing.T) {
t.Fatalf("WAL file exists but is empty")
}
if err := db.Checkpoint(CheckpointTruncate); err != nil {
if _, err := db.Checkpoint(CheckpointTruncate); err != nil {
t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error())
}
if mustFileSize(walPath) != 0 {
t.Fatalf("WAL file exists but is non-empty")
}
// Checkpoint a second time, to ensure it's idempotent.
if err := db.Checkpoint(CheckpointTruncate); err != nil {
if _, err := db.Checkpoint(CheckpointTruncate); err != nil {
t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error())
}
}
@@ -1081,7 +1081,7 @@ func test_FileCreationOnDisk(t *testing.T, db *DB) {
// Confirm checkpoint works on all types of on-disk databases. Worst case, this
// should be ignored.
if err := db.Checkpoint(CheckpointRestart); err != nil {
if _, err := db.Checkpoint(CheckpointRestart); err != nil {
t.Fatalf("failed to checkpoint database in DELETE mode: %s", err.Error())
}
}

View File

@@ -606,7 +606,7 @@ func Test_WALReplayOK(t *testing.T) {
}
mustCopyFile(replayDBPath, dbPath)
mustCopyFile(filepath.Join(replayDir, walFile+"_001"), walPath)
if err := db.Checkpoint(CheckpointTruncate); err != nil {
if _, err := db.Checkpoint(CheckpointTruncate); err != nil {
t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error())
}
@@ -619,7 +619,7 @@ func Test_WALReplayOK(t *testing.T) {
t.Fatalf("WAL file at %s does not exist", walPath)
}
mustCopyFile(filepath.Join(replayDir, walFile+"_002"), walPath)
if err := db.Checkpoint(CheckpointTruncate); err != nil {
if _, err := db.Checkpoint(CheckpointTruncate); err != nil {
t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error())
}
@@ -705,7 +705,7 @@ func Test_WALReplayOK_Complex(t *testing.T) {
if _, err := srcDB.ExecuteStringStmt("CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"); err != nil {
t.Fatalf("failed to create table: %s", err.Error())
}
if err := srcDB.Checkpoint(CheckpointTruncate); err != nil {
if _, err := srcDB.Checkpoint(CheckpointTruncate); err != nil {
t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error())
}
mustCopyFile(dstPath, srcPath)
@@ -737,7 +737,7 @@ func Test_WALReplayOK_Complex(t *testing.T) {
dstWALPath := fmt.Sprintf("%s-%d", dstPath, i)
mustCopyFile(dstWALPath, srcWALPath)
dstWALs = append(dstWALs, dstWALPath)
if err := srcDB.Checkpoint(CheckpointTruncate); err != nil {
if _, err := srcDB.Checkpoint(CheckpointTruncate); err != nil {
t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error())
}
}
@@ -753,7 +753,7 @@ func Test_WALReplayOK_Complex(t *testing.T) {
dstWALPath := fmt.Sprintf("%s-postdelete", dstPath)
mustCopyFile(dstWALPath, srcWALPath)
dstWALs = append(dstWALs, dstWALPath)
if err := srcDB.Checkpoint(CheckpointTruncate); err != nil {
if _, err := srcDB.Checkpoint(CheckpointTruncate); err != nil {
t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error())
}
@@ -763,7 +763,7 @@ func Test_WALReplayOK_Complex(t *testing.T) {
dstWALPath = fmt.Sprintf("%s-postupdate", dstPath)
mustCopyFile(dstWALPath, srcWALPath)
dstWALs = append(dstWALs, dstWALPath)
if err := srcDB.Checkpoint(CheckpointTruncate); err != nil {
if _, err := srcDB.Checkpoint(CheckpointTruncate); err != nil {
t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error())
}
@@ -778,7 +778,7 @@ func Test_WALReplayOK_Complex(t *testing.T) {
dstWALPath = fmt.Sprintf("%s-create-tables", dstPath)
mustCopyFile(dstWALPath, srcWALPath)
dstWALs = append(dstWALs, dstWALPath)
if err := srcDB.Checkpoint(CheckpointTruncate); err != nil {
if _, err := srcDB.Checkpoint(CheckpointTruncate); err != nil {
t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error())
}
@@ -791,7 +791,7 @@ func Test_WALReplayOK_Complex(t *testing.T) {
dstWALPath = fmt.Sprintf("%s-post-create-tables", dstPath)
mustCopyFile(dstWALPath, srcWALPath)
dstWALs = append(dstWALs, dstWALPath)
if err := srcDB.Checkpoint(CheckpointTruncate); err != nil {
if _, err := srcDB.Checkpoint(CheckpointTruncate); err != nil {
t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error())
}

View File

@@ -135,7 +135,7 @@ func (s *SwappableDB) StmtReadOnly(sql string) (bool, error) {
}
// Checkpoint calls Checkpoint on the underlying database.
func (s *SwappableDB) Checkpoint(mode CheckpointMode) error {
func (s *SwappableDB) Checkpoint(mode CheckpointMode) (*CheckpointMeta, error) {
s.dbMu.RLock()
defer s.dbMu.RUnlock()
return s.db.Checkpoint(mode)

View File

@@ -210,9 +210,13 @@ func RecoverNode(dataDir string, extensions []string, logger *log.Logger, logs r
// Create a new snapshot, placing the configuration in as if it was
// committed at index 1.
if err := db.Checkpoint(sql.CheckpointTruncate); err != nil {
meta, err := db.Checkpoint(sql.CheckpointTruncate)
if err != nil {
return fmt.Errorf("failed to checkpoint database: %s", err)
}
if !meta.Success() {
return fmt.Errorf("database checkpoint was not successful: %s", meta.String())
}
tmpDBFD, err := os.Open(tmpDBPath)
if err != nil {
return fmt.Errorf("failed to open temporary database file: %s", err)

View File

@@ -133,6 +133,8 @@ const (
backupCASRetryDelay = 100 * time.Millisecond
connectionPoolCount = 5
connectionTimeout = 10 * time.Second
mustWALCheckpointDelay = 50 * time.Millisecond
mustWALCheckpointTimeout = 5 * time.Minute
raftLogCacheSize = 512
trailingScale = 1.25
observerChanLen = 50
@@ -154,6 +156,8 @@ const (
numSnapshotsIncremental = "num_snapshots_incremental"
numFullCheckpointFailed = "num_full_checkpoint_failed"
numWALCheckpointTruncateFailed = "num_wal_checkpoint_truncate_failed"
numWALCheckpointIncomplete = "num_wal_checkpoint_incomplete"
numWALMustCheckpoint = "num_wal_must_checkpoint"
numAutoVacuums = "num_auto_vacuums"
numAutoVacuumsFailed = "num_auto_vacuums_failed"
autoVacuumDuration = "auto_vacuum_duration"
@@ -221,6 +225,8 @@ func ResetStats() {
stats.Add(numSnapshotsIncremental, 0)
stats.Add(numFullCheckpointFailed, 0)
stats.Add(numWALCheckpointTruncateFailed, 0)
stats.Add(numWALCheckpointIncomplete, 0)
stats.Add(numWALMustCheckpoint, 0)
stats.Add(numAutoVacuums, 0)
stats.Add(numAutoVacuumsFailed, 0)
stats.Add(autoVacuumDuration, 0)
@@ -349,6 +355,10 @@ type Store struct {
fsmTerm atomic.Uint64
fsmUpdateTime *rsync.AtomicTime // This is node-local time.
// readerMu allows blocking of all reads. This is used to handle
// specific, very rare, edge cases around WAL checkpointing.
readerMu sync.RWMutex
// appendedAtTime is the Leader's clock time when that Leader appended the log entry.
// The Leader that actually appended the log entry is not necessarily the current Leader.
appendedAtTime *rsync.AtomicTime
@@ -1419,6 +1429,9 @@ func (s *Store) execute(ex *proto.ExecuteRequest) ([]*proto.ExecuteQueryResponse
// upgraded to STRONG if the Store determines that is necessary to guarantee
// a linearizable read.
func (s *Store) Query(qr *proto.QueryRequest) (rows []*proto.QueryRows, level proto.ConsistencyLevel, raftIndex uint64, retErr error) {
s.readerMu.RLock()
defer s.readerMu.RUnlock()
p := (*PragmaCheckRequest)(qr.Request)
if err := p.Check(); err != nil {
return nil, 0, 0, err
@@ -1530,6 +1543,9 @@ func (s *Store) VerifyLeader() (retErr error) {
// Request processes a request that may contain both Executes and Queries.
func (s *Store) Request(eqr *proto.ExecuteQueryRequest) ([]*proto.ExecuteQueryResponse, uint64, uint64, error) {
s.readerMu.RLock()
defer s.readerMu.RUnlock()
p := (*PragmaCheckRequest)(eqr.Request)
if err := p.Check(); err != nil {
return nil, 0, 0, err
@@ -1633,6 +1649,9 @@ func (s *Store) Request(eqr *proto.ExecuteQueryRequest) ([]*proto.ExecuteQueryRe
// will be written directly to that file. Otherwise a temporary file will be created,
// and that temporary file copied to dst.
func (s *Store) Backup(br *proto.BackupRequest, dst io.Writer) (retErr error) {
s.readerMu.RLock()
defer s.readerMu.RUnlock()
if !s.open.Is() {
return ErrNotOpen
}
@@ -1906,6 +1925,9 @@ func (s *Store) Vacuum() error {
// http://sqlite.org/howtocorrupt.html states it is safe to do this
// as long as the database is not written to during the call.
func (s *Store) Database(leader bool) ([]byte, error) {
s.readerMu.RLock()
defer s.readerMu.RUnlock()
if leader && s.raft.State() != raft.Leader {
return nil, ErrNotLeader
}
@@ -2506,9 +2528,19 @@ func (s *Store) fsmSnapshot() (fSnap raft.FSMSnapshot, retErr error) {
var fsmSnapshot raft.FSMSnapshot
if fullNeeded {
chkStartTime := time.Now()
if err := s.db.Checkpoint(sql.CheckpointTruncate); err != nil {
meta, err := s.db.Checkpoint(sql.CheckpointTruncate)
if err != nil {
stats.Add(numFullCheckpointFailed, 1)
return nil, err
return nil, fmt.Errorf("snapshot can't complete due to FULL Snapshot checkpoint error (will retry): %s",
err.Error())
}
if !meta.Success() {
if meta.Moved < meta.Pages {
stats.Add(numWALCheckpointIncomplete, 1)
return nil, fmt.Errorf("snapshot can't complete due to FULL Snapshot checkpoint incomplete (will retry): %s)",
meta.String())
}
s.mustTruncateCheckpoint()
}
stats.Get(snapshotCreateChkTruncateDuration).(*expvar.Int).Set(time.Since(chkStartTime).Milliseconds())
dbFD, err := os.Open(s.db.Path())
@@ -2567,11 +2599,20 @@ func (s *Store) fsmSnapshot() (fSnap raft.FSMSnapshot, retErr error) {
return nil, err
}
chkTStartTime := time.Now()
if err := s.db.Checkpoint(sql.CheckpointTruncate); err != nil {
meta, err := s.db.Checkpoint(sql.CheckpointTruncate)
if err != nil {
stats.Add(numWALCheckpointTruncateFailed, 1)
return nil, fmt.Errorf("snapshot can't complete due to WAL checkpoint failure (will retry): %s",
return nil, fmt.Errorf("snapshot can't complete due to WAL checkpoint error (will retry): %s",
err.Error())
}
if !meta.Success() {
if meta.Moved < meta.Pages {
stats.Add(numWALCheckpointIncomplete, 1)
return nil, fmt.Errorf("snapshot can't complete due to Snapshot checkpoint incomplete (will retry %s)",
meta.String())
}
s.mustTruncateCheckpoint()
}
stats.Get(snapshotCreateChkTruncateDuration).(*expvar.Int).Set(time.Since(chkTStartTime).Milliseconds())
stats.Get(snapshotPrecompactWALSize).(*expvar.Int).Set(walSzPre)
stats.Get(snapshotWALSize).(*expvar.Int).Set(walSzPost)
@@ -2850,6 +2891,42 @@ func (s *Store) runWALSnapshotting() (closeCh, doneCh chan struct{}) {
return closeCh, doneCh
}
// mustTruncateCheckpoint truncates the checkpointed WAL, retrying until successful or
// timing out.
//
// This should be called if we hit a specifc edge case where all pages were moved but some
// reader blocked truncation. The next write could start overwriting WAL frames at the start
// of the WAL which would mean we would lose WAL data, so we need to forcibly truncate here.
// We do this by blocking all readers (writes are already blocked). This handling is due to
// research into SQLite and not seen as of yet.
//
// Finally, we could still panic here if we timeout trying to truncate. This could happen if
// a reader external to rqlite just won't let go.
func (s *Store) mustTruncateCheckpoint() {
startT := time.Now()
defer func() {
s.logger.Printf("forced WAL truncate checkpoint took %s", time.Since(startT))
}()
stats.Add(numWALMustCheckpoint, 1)
s.readerMu.Lock()
defer s.readerMu.Unlock()
ticker := time.NewTicker(mustWALCheckpointDelay)
defer ticker.Stop()
for {
select {
case <-ticker.C:
meta, err := s.db.Checkpoint(sql.CheckpointTruncate)
if err == nil && meta.Success() {
return
}
case <-time.After(mustWALCheckpointTimeout):
panic("timed out trying to truncate checkpointed WAL")
}
}
}
// selfLeaderChange is called when this node detects that its leadership
// status has changed.
func (s *Store) selfLeaderChange(leader bool) {

View File

@@ -2652,6 +2652,54 @@ func Test_SingleNode_WALTriggeredSnapshot(t *testing.T) {
}
}
func Test_SingleNode_SnapshotFailRetry(t *testing.T) {
s, ln := mustNewStore(t)
defer ln.Close()
s.SnapshotThreshold = 8192
s.SnapshotInterval = time.Hour
s.NoSnapshotOnClose = true
if err := s.Open(); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error())
}
defer s.Close(true)
if err := s.Bootstrap(NewServer(s.ID(), s.Addr(), true)); err != nil {
t.Fatalf("failed to bootstrap single-node store: %s", err.Error())
}
if _, err := s.WaitForLeader(10 * time.Second); err != nil {
t.Fatalf("Error waiting for leader: %s", err)
}
er := executeRequestFromString(`CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`,
false, false)
_, _, err := s.Execute(er)
if err != nil {
t.Fatalf("failed to execute on single node: %s", err.Error())
}
er = executeRequestFromString(`INSERT INTO foo(name) VALUES("fiona")`, false, false)
_, _, err = s.Execute(er)
if err != nil {
t.Fatalf("failed to execute on single node: %s", err.Error())
}
go func() {
qr := queryRequestFromString("SELECT * FROM foo", false, false)
qr.GetRequest().Statements[0].ForceStall = true
s.Query(qr)
}()
time.Sleep(2 * time.Second)
er = executeRequestFromString(`INSERT INTO foo(name) VALUES("bob")`, false, false)
_, _, err = s.Execute(er)
if err != nil {
t.Fatalf("failed to execute on single node: %s", err.Error())
}
if err := s.Snapshot(0); err == nil {
t.Fatalf("expected error snapshotting single-node store with stalled query")
}
}
func Test_OpenStoreSingleNode_OptimizeTimes(t *testing.T) {
s0, ln0 := mustNewStore(t)
defer s0.Close(true)