diff --git a/cluster/proto/message.pb.go b/cluster/proto/message.pb.go index 56197d2a..3342d6fd 100644 --- a/cluster/proto/message.pb.go +++ b/cluster/proto/message.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.10 +// protoc-gen-go v1.36.11 // protoc v6.33.0 // source: message.proto diff --git a/command/proto/command.pb.go b/command/proto/command.pb.go index 1502ee7b..f6c826be 100644 --- a/command/proto/command.pb.go +++ b/command/proto/command.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.10 +// protoc-gen-go v1.36.11 // protoc v6.33.0 // source: command.proto @@ -439,6 +439,7 @@ type Statement struct { Sql string `protobuf:"bytes,1,opt,name=sql,proto3" json:"sql,omitempty"` Parameters []*Parameter `protobuf:"bytes,2,rep,name=parameters,proto3" json:"parameters,omitempty"` ForceQuery bool `protobuf:"varint,3,opt,name=forceQuery,proto3" json:"forceQuery,omitempty"` + ForceStall bool `protobuf:"varint,4,opt,name=forceStall,proto3" json:"forceStall,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -494,6 +495,13 @@ func (x *Statement) GetForceQuery() bool { return false } +func (x *Statement) GetForceStall() bool { + if x != nil { + return x.ForceStall + } + return false +} + type Request struct { state protoimpl.MessageState `protogen:"open.v1"` Transaction bool `protobuf:"varint,1,opt,name=transaction,proto3" json:"transaction,omitempty"` @@ -2086,7 +2094,7 @@ const file_command_proto_rawDesc = "" + "\x01y\x18\x04 \x01(\fH\x00R\x01y\x12\x0e\n" + "\x01s\x18\x05 \x01(\tH\x00R\x01s\x12\x12\n" + "\x04name\x18\x06 \x01(\tR\x04nameB\a\n" + - "\x05value\"q\n" + + "\x05value\"\x91\x01\n" + "\tStatement\x12\x10\n" + "\x03sql\x18\x01 \x01(\tR\x03sql\x122\n" + "\n" + @@ -2094,7 +2102,10 @@ const file_command_proto_rawDesc = "" + "parameters\x12\x1e\n" + "\n" + "forceQuery\x18\x03 \x01(\bR\n" + - "forceQuery\"\xa7\x01\n" + + "forceQuery\x12\x1e\n" + + "\n" + + "forceStall\x18\x04 \x01(\bR\n" + + "forceStall\"\xa7\x01\n" + "\aRequest\x12 \n" + "\vtransaction\x18\x01 \x01(\bR\vtransaction\x122\n" + "\n" + diff --git a/command/proto/command.proto b/command/proto/command.proto index e4604901..c0e52bb4 100644 --- a/command/proto/command.proto +++ b/command/proto/command.proto @@ -19,6 +19,7 @@ message Statement { string sql = 1; repeated Parameter parameters = 2; bool forceQuery = 3; + bool forceStall = 4; } message Request { diff --git a/db/db.go b/db/db.go index 5875efaf..a0e321b7 100644 --- a/db/db.go +++ b/db/db.go @@ -166,6 +166,23 @@ type PoolStats struct { MaxLifetimeClosed int64 `json:"max_lifetime_closed"` } +// CheckpointMeta contains metadata about a WAL checkpoint operation. +type CheckpointMeta struct { + Code int + Pages int + Moved int +} + +// String returns a string representation of the CheckpointMeta. +func (cm *CheckpointMeta) String() string { + return fmt.Sprintf("Code=%d, Pages=%d, Moved=%d", cm.Code, cm.Pages, cm.Moved) +} + +// Success returns true if the checkpoint completed successfully. +func (cm *CheckpointMeta) Success() bool { + return cm.Code == 0 +} + // Open opens a file-based database using the default driver. func Open(dbPath string, fkEnabled, wal bool) (retDB *DB, retErr error) { return OpenWithDriver(DefaultDriver(), dbPath, fkEnabled, wal) @@ -641,7 +658,7 @@ func (db *DB) BusyTimeout() (rwMs, roMs int, err error) { // Checkpoint checkpoints the WAL file. If the WAL file is not enabled, this // function is a no-op. -func (db *DB) Checkpoint(mode CheckpointMode) error { +func (db *DB) Checkpoint(mode CheckpointMode) (*CheckpointMeta, error) { return db.CheckpointWithTimeout(mode, 0) } @@ -649,7 +666,7 @@ func (db *DB) Checkpoint(mode CheckpointMode) error { // run to completion within the given duration, an error is returned. If the // duration is 0, the busy timeout is not modified before executing the // checkpoint. -func (db *DB) CheckpointWithTimeout(mode CheckpointMode, dur time.Duration) (err error) { +func (db *DB) CheckpointWithTimeout(mode CheckpointMode, dur time.Duration) (meta *CheckpointMeta, err error) { start := time.Now() defer func() { if err != nil { @@ -663,10 +680,10 @@ func (db *DB) CheckpointWithTimeout(mode CheckpointMode, dur time.Duration) (err if dur > 0 { rwBt, _, err := db.BusyTimeout() if err != nil { - return fmt.Errorf("failed to get busy_timeout on checkpointing connection: %s", err.Error()) + return nil, fmt.Errorf("failed to get busy_timeout on checkpointing connection: %s", err.Error()) } if err := db.SetBusyTimeout(int(dur.Milliseconds()), -1); err != nil { - return fmt.Errorf("failed to set busy_timeout on checkpointing connection: %s", err.Error()) + return nil, fmt.Errorf("failed to set busy_timeout on checkpointing connection: %s", err.Error()) } defer func() { // Reset back to default @@ -678,15 +695,15 @@ func (db *DB) CheckpointWithTimeout(mode CheckpointMode, dur time.Duration) (err ok, nPages, nMoved, err := checkpointDB(db.rwDB, mode) if err != nil { - return fmt.Errorf("error checkpointing WAL: %s", err.Error()) + return nil, fmt.Errorf("error checkpointing WAL: %s", err.Error()) } stats.Add(numCheckpointedPages, int64(nPages)) stats.Add(numCheckpointedMoves, int64(nMoved)) - if ok != 0 { - return fmt.Errorf("failed to completely checkpoint WAL (%d ok, %d pages, %d moved)", - ok, nPages, nMoved) - } - return nil + return &CheckpointMeta{ + Code: ok, + Pages: nPages, + Moved: nMoved, + }, nil } // DisableCheckpointing disables the automatic checkpointing that occurs when @@ -1233,6 +1250,13 @@ func (db *DB) queryStmtWithConn(ctx context.Context, stmt *command.Statement, xT Parameters: params, }) + // Check for slow query, blocked query, etc testing. This field + // should never set by production code and is only for fault-injection + // testing purposes. + if stmt.ForceStall { + <-make(chan struct{}) + } + // One-time population of any empty types. Best effort, ignore // error. if needsQueryTypes { diff --git a/db/db_checkpoint_test.go b/db/db_checkpoint_test.go index 534abf34..9eb4396a 100644 --- a/db/db_checkpoint_test.go +++ b/db/db_checkpoint_test.go @@ -7,7 +7,9 @@ import ( "testing" "time" + command "github.com/rqlite/rqlite/v9/command/proto" "github.com/rqlite/rqlite/v9/db/wal" + "github.com/rqlite/rqlite/v9/internal/rsum" ) // Test_WALDatabaseCheckpointOKNoWrites tests that a checkpoint succeeds @@ -22,9 +24,140 @@ func Test_WALDatabaseCheckpointOKNoWrites(t *testing.T) { t.Fatalf("failed to open database in WAL mode: %s", err.Error()) } defer db.Close() - if err := db.Checkpoint(CheckpointTruncate); err != nil { + meta, err := db.Checkpoint(CheckpointTruncate) + if err != nil { t.Fatalf("failed to checkpoint database in WAL mode with nonexistent WAL: %s", err.Error()) } + if !meta.Success() { + t.Fatalf("expected checkpoint to complete successfully") + } + if meta.Moved != 0 { + t.Fatalf("expected MOVED to be 0, got %d", meta.Moved) + } + if meta.Pages != 0 { + t.Fatalf("expected PAGES to be 0, got %d", meta.Pages) + } +} + +// Test_WALDatabaseCheckpointOK tests that a checkpoint succeeds +// with a write. +func Test_WALDatabaseCheckpointOK(t *testing.T) { + path := mustTempFile() + defer os.Remove(path) + + db, err := Open(path, false, true) + if err != nil { + t.Fatalf("failed to open database in WAL mode: %s", err.Error()) + } + defer db.Close() + + _, err = db.ExecuteStringStmt(`CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`) + if err != nil { + t.Fatalf("failed to execute on single node: %s", err.Error()) + } + + meta, err := db.Checkpoint(CheckpointTruncate) + if err != nil { + t.Fatalf("failed to checkpoint database: %s", err.Error()) + } + if !meta.Success() { + t.Fatalf("expected checkpoint to complete successfully") + } + if meta.Moved != 0 { + t.Fatalf("expected MOVED to be 0 since WAL was truncated, got %d", meta.Moved) + } + if meta.Pages != 0 { + t.Fatalf("expected PAGES to be 0 since WAL was truncated, got %d", meta.Pages) + } + + // Ensure idempotency by checkpointing again. + meta, err = db.Checkpoint(CheckpointTruncate) + if err != nil { + t.Fatalf("failed to checkpoint database: %s", err.Error()) + } + if !meta.Success() { + t.Fatalf("expected checkpoint to complete successfully") + } +} + +// Test_WALDatabaseCheckpointOK_NoWALChange tests that a checkpoint +// that is blocked by a long-running read does not result in a +// change to the WAL file. This is to show that we can safely retry +// the truncate checkpoint later. +func Test_WALDatabaseCheckpointOK_NoWALChange(t *testing.T) { + path := mustTempFile() + defer os.Remove(path) + + db, err := Open(path, false, true) + if err != nil { + t.Fatalf("failed to open database in WAL mode: %s", err.Error()) + } + defer db.Close() + + _, err = db.ExecuteStringStmt(`CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`) + if err != nil { + t.Fatalf("failed to execute on single node: %s", err.Error()) + } + _, err = db.ExecuteStringStmt(`INSERT INTO foo(name) VALUES("alice")`) + if err != nil { + t.Fatalf("failed to execute INSERT on single node: %s", err.Error()) + } + + // Issue a long-running read that should block the checkpoint. + qr := &command.Request{ + Statements: []*command.Statement{ + { + Sql: "SELECT * FROM foo", + ForceStall: true, + }, + }, + } + go func() { + db.Query(qr, false) + }() + time.Sleep(2 * time.Second) + + _, err = db.ExecuteStringStmt(`INSERT INTO foo(name) VALUES("alice")`) + if err != nil { + t.Fatalf("failed to execute INSERT on single node: %s", err.Error()) + } + + // Get the hash of the WAL file before the checkpoint. + h1, err := rsum.MD5(db.WALPath()) + if err != nil { + t.Fatalf("failed to hash WAL file: %s", err.Error()) + } + + _, err = db.ExecuteStringStmt(`PRAGMA BUSY_TIMEOUT = 1`) + if err != nil { + t.Fatalf("failed to execute on single node: %s", err.Error()) + } + meta, err := db.Checkpoint(CheckpointTruncate) + if err != nil { + t.Fatalf("failed to checkpoint database: %s", err.Error()) + } + if meta.Success() { + t.Fatalf("expected checkpoint to be unsuccessful due to blocking read") + } + if meta.Moved == 0 { + t.Fatalf("expected MOVED to be > 0 since some pages should have been moved") + } + if meta.Pages == 0 { + t.Fatalf("expected PAGES to be > 0 since WAL should have pages") + } + if meta.Moved >= meta.Pages { + t.Fatalf("expected MOVED to be < PAGES since checkpoint incomplete") + } + + // Check hash again. + h2, err := rsum.MD5(db.WALPath()) + if err != nil { + t.Fatalf("failed to hash WAL file: %s", err.Error()) + } + + if h1 != h2 { + t.Fatalf("expected WAL file to be unchanged after incomplete checkpoint") + } } // Test_WALDatabaseCheckpointOKDelete tests that a checkpoint returns no error @@ -41,7 +174,7 @@ func Test_WALDatabaseCheckpointOKDelete(t *testing.T) { t.Fatalf("WAL mode enabled") } defer db.Close() - if err := db.Checkpoint(CheckpointTruncate); err != nil { + if _, err := db.Checkpoint(CheckpointTruncate); err != nil { t.Fatalf("failed to checkpoint database in DELETE mode: %s", err.Error()) } } @@ -71,8 +204,14 @@ func Test_WALDatabaseCheckpoint_RestartTruncate(t *testing.T) { } walPreBytes := mustReadBytes(db.WALPath()) - if err := db.Checkpoint(CheckpointRestart); err != nil { + if meta, err := db.Checkpoint(CheckpointRestart); err != nil { t.Fatalf("failed to checkpoint database: %s", err.Error()) + } else if !meta.Success() { + t.Fatalf("expected checkpoint to complete successfully") + } else if meta.Moved == 0 { + t.Fatalf("expected some pages to be moved during RESTART checkpoint") + } else if meta.Pages == 0 { + t.Fatalf("expected some pages to be in the WAL during RESTART checkpoint") } walPostBytes := mustReadBytes(db.WALPath()) if !bytes.Equal(walPreBytes, walPostBytes) { @@ -88,8 +227,12 @@ func Test_WALDatabaseCheckpoint_RestartTruncate(t *testing.T) { t.Fatalf("expected %s, got %s", exp, got) } - if err := db.Checkpoint(CheckpointTruncate); err != nil { + if meta, err := db.Checkpoint(CheckpointTruncate); err != nil { t.Fatalf("failed to checkpoint database: %s", err.Error()) + } else if !meta.Success() { + t.Fatalf("expected checkpoint to complete successfully") + } else if meta.Moved != 0 { + t.Fatalf("expected 0 pages to be moved during checkpoint truncate since nowrite since restart checkpoint") } sz, err := fileSize(db.WALPath()) if err != nil { @@ -148,8 +291,12 @@ func Test_WALDatabaseCheckpoint_RestartTimeout(t *testing.T) { t.Fatalf("expected %s, got %s", exp, got) } - if err := db.CheckpointWithTimeout(CheckpointRestart, 250*time.Millisecond); err == nil { - t.Fatal("expected error due to failure to checkpoint") + meta, err := db.CheckpointWithTimeout(CheckpointRestart, 250*time.Millisecond) + if err != nil { + t.Fatal("expected no error when checkpoint times out due to a blocking read transaction") + } + if meta.Success() { + t.Fatal("expected checkpoint to be unsuccessful") } // Get some information on the WAL file before the checkpoint. The goal here is @@ -174,7 +321,7 @@ func Test_WALDatabaseCheckpoint_RestartTimeout(t *testing.T) { } blockingDB.Close() - if err := db.CheckpointWithTimeout(CheckpointRestart, 250*time.Millisecond); err != nil { + if _, err := db.CheckpointWithTimeout(CheckpointRestart, 250*time.Millisecond); err != nil { t.Fatalf("failed to checkpoint database: %s", err.Error()) } } @@ -220,9 +367,14 @@ func Test_WALDatabaseCheckpoint_TruncateTimeout(t *testing.T) { t.Fatalf("expected %s, got %s", exp, got) } - if err := db.CheckpointWithTimeout(CheckpointTruncate, 250*time.Millisecond); err == nil { - t.Fatal("expected error due to failure to checkpoint") + meta, err := db.CheckpointWithTimeout(CheckpointRestart, 250*time.Millisecond) + if err != nil { + t.Fatal("expected no error due to failure to checkpoint due to COMMIT") } + if meta.Success() { + t.Fatal("expected checkpoint to be unsuccessful") + } + postWALBytes := mustReadBytes(db.WALPath()) if !bytes.Equal(preWALBytes, postWALBytes) { t.Fatalf("wal file should be unchanged after checkpoint failure") @@ -248,7 +400,7 @@ func Test_WALDatabaseCheckpoint_TruncateTimeout(t *testing.T) { } blockingDB.Close() - if err := db.CheckpointWithTimeout(CheckpointTruncate, 250*time.Millisecond); err != nil { + if _, err := db.CheckpointWithTimeout(CheckpointTruncate, 250*time.Millisecond); err != nil { t.Fatalf("failed to checkpoint database: %s", err.Error()) } if mustFileSize(db.WALPath()) != 0 { diff --git a/db/db_test.go b/db/db_test.go index d1dd00d7..e1432832 100644 --- a/db/db_test.go +++ b/db/db_test.go @@ -166,7 +166,7 @@ func Test_WALNotCheckpointedOnClose(t *testing.T) { } // Checkpoint to table exists in main database file. - if err := db.Checkpoint(CheckpointTruncate); err != nil { + if _, err := db.Checkpoint(CheckpointTruncate); err != nil { t.Fatalf("failed to checkpoint database: %s", err.Error()) } @@ -299,7 +299,7 @@ func Test_TableCreation(t *testing.T) { testQ() // Confirm checkpoint works without error. - if err := db.Checkpoint(CheckpointRestart); err != nil { + if _, err := db.Checkpoint(CheckpointRestart); err != nil { t.Fatalf("failed to checkpoint database: %s", err.Error()) } testQ() @@ -381,7 +381,7 @@ func Test_DBSums(t *testing.T) { t.Fatalf("WAL sum did not change after insertion") } - if err := db.Checkpoint(CheckpointRestart); err != nil { + if _, err := db.Checkpoint(CheckpointRestart); err != nil { t.Fatalf("failed to checkpoint database: %s", err.Error()) } @@ -439,7 +439,7 @@ func Test_DBLastModified(t *testing.T) { // Checkpoint, and check time is later. On some platforms the time resolution isn't that // high, so we sleep so the test won't suffer a false failure. time.Sleep(1 * time.Second) - if err := db.Checkpoint(CheckpointRestart); err != nil { + if _, err := db.Checkpoint(CheckpointRestart); err != nil { t.Fatalf("failed to checkpoint database: %s", err.Error()) } lm3, err := db.LastModified() @@ -944,14 +944,14 @@ func Test_WALDatabaseCreatedOK(t *testing.T) { t.Fatalf("WAL file exists but is empty") } - if err := db.Checkpoint(CheckpointTruncate); err != nil { + if _, err := db.Checkpoint(CheckpointTruncate); err != nil { t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error()) } if mustFileSize(walPath) != 0 { t.Fatalf("WAL file exists but is non-empty") } // Checkpoint a second time, to ensure it's idempotent. - if err := db.Checkpoint(CheckpointTruncate); err != nil { + if _, err := db.Checkpoint(CheckpointTruncate); err != nil { t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error()) } } @@ -1081,7 +1081,7 @@ func test_FileCreationOnDisk(t *testing.T, db *DB) { // Confirm checkpoint works on all types of on-disk databases. Worst case, this // should be ignored. - if err := db.Checkpoint(CheckpointRestart); err != nil { + if _, err := db.Checkpoint(CheckpointRestart); err != nil { t.Fatalf("failed to checkpoint database in DELETE mode: %s", err.Error()) } } diff --git a/db/state_test.go b/db/state_test.go index 140ed4e5..09457b71 100644 --- a/db/state_test.go +++ b/db/state_test.go @@ -606,7 +606,7 @@ func Test_WALReplayOK(t *testing.T) { } mustCopyFile(replayDBPath, dbPath) mustCopyFile(filepath.Join(replayDir, walFile+"_001"), walPath) - if err := db.Checkpoint(CheckpointTruncate); err != nil { + if _, err := db.Checkpoint(CheckpointTruncate); err != nil { t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error()) } @@ -619,7 +619,7 @@ func Test_WALReplayOK(t *testing.T) { t.Fatalf("WAL file at %s does not exist", walPath) } mustCopyFile(filepath.Join(replayDir, walFile+"_002"), walPath) - if err := db.Checkpoint(CheckpointTruncate); err != nil { + if _, err := db.Checkpoint(CheckpointTruncate); err != nil { t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error()) } @@ -705,7 +705,7 @@ func Test_WALReplayOK_Complex(t *testing.T) { if _, err := srcDB.ExecuteStringStmt("CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"); err != nil { t.Fatalf("failed to create table: %s", err.Error()) } - if err := srcDB.Checkpoint(CheckpointTruncate); err != nil { + if _, err := srcDB.Checkpoint(CheckpointTruncate); err != nil { t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error()) } mustCopyFile(dstPath, srcPath) @@ -737,7 +737,7 @@ func Test_WALReplayOK_Complex(t *testing.T) { dstWALPath := fmt.Sprintf("%s-%d", dstPath, i) mustCopyFile(dstWALPath, srcWALPath) dstWALs = append(dstWALs, dstWALPath) - if err := srcDB.Checkpoint(CheckpointTruncate); err != nil { + if _, err := srcDB.Checkpoint(CheckpointTruncate); err != nil { t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error()) } } @@ -753,7 +753,7 @@ func Test_WALReplayOK_Complex(t *testing.T) { dstWALPath := fmt.Sprintf("%s-postdelete", dstPath) mustCopyFile(dstWALPath, srcWALPath) dstWALs = append(dstWALs, dstWALPath) - if err := srcDB.Checkpoint(CheckpointTruncate); err != nil { + if _, err := srcDB.Checkpoint(CheckpointTruncate); err != nil { t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error()) } @@ -763,7 +763,7 @@ func Test_WALReplayOK_Complex(t *testing.T) { dstWALPath = fmt.Sprintf("%s-postupdate", dstPath) mustCopyFile(dstWALPath, srcWALPath) dstWALs = append(dstWALs, dstWALPath) - if err := srcDB.Checkpoint(CheckpointTruncate); err != nil { + if _, err := srcDB.Checkpoint(CheckpointTruncate); err != nil { t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error()) } @@ -778,7 +778,7 @@ func Test_WALReplayOK_Complex(t *testing.T) { dstWALPath = fmt.Sprintf("%s-create-tables", dstPath) mustCopyFile(dstWALPath, srcWALPath) dstWALs = append(dstWALs, dstWALPath) - if err := srcDB.Checkpoint(CheckpointTruncate); err != nil { + if _, err := srcDB.Checkpoint(CheckpointTruncate); err != nil { t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error()) } @@ -791,7 +791,7 @@ func Test_WALReplayOK_Complex(t *testing.T) { dstWALPath = fmt.Sprintf("%s-post-create-tables", dstPath) mustCopyFile(dstWALPath, srcWALPath) dstWALs = append(dstWALs, dstWALPath) - if err := srcDB.Checkpoint(CheckpointTruncate); err != nil { + if _, err := srcDB.Checkpoint(CheckpointTruncate); err != nil { t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error()) } diff --git a/db/swappable_db.go b/db/swappable_db.go index 45cb9be6..58b6facc 100644 --- a/db/swappable_db.go +++ b/db/swappable_db.go @@ -135,7 +135,7 @@ func (s *SwappableDB) StmtReadOnly(sql string) (bool, error) { } // Checkpoint calls Checkpoint on the underlying database. -func (s *SwappableDB) Checkpoint(mode CheckpointMode) error { +func (s *SwappableDB) Checkpoint(mode CheckpointMode) (*CheckpointMeta, error) { s.dbMu.RLock() defer s.dbMu.RUnlock() return s.db.Checkpoint(mode) diff --git a/store/state.go b/store/state.go index 06cc5269..974bf484 100644 --- a/store/state.go +++ b/store/state.go @@ -210,9 +210,13 @@ func RecoverNode(dataDir string, extensions []string, logger *log.Logger, logs r // Create a new snapshot, placing the configuration in as if it was // committed at index 1. - if err := db.Checkpoint(sql.CheckpointTruncate); err != nil { + meta, err := db.Checkpoint(sql.CheckpointTruncate) + if err != nil { return fmt.Errorf("failed to checkpoint database: %s", err) } + if !meta.Success() { + return fmt.Errorf("database checkpoint was not successful: %s", meta.String()) + } tmpDBFD, err := os.Open(tmpDBPath) if err != nil { return fmt.Errorf("failed to open temporary database file: %s", err) diff --git a/store/store.go b/store/store.go index 31de057d..63d576b0 100644 --- a/store/store.go +++ b/store/store.go @@ -133,6 +133,8 @@ const ( backupCASRetryDelay = 100 * time.Millisecond connectionPoolCount = 5 connectionTimeout = 10 * time.Second + mustWALCheckpointDelay = 50 * time.Millisecond + mustWALCheckpointTimeout = 5 * time.Minute raftLogCacheSize = 512 trailingScale = 1.25 observerChanLen = 50 @@ -154,6 +156,8 @@ const ( numSnapshotsIncremental = "num_snapshots_incremental" numFullCheckpointFailed = "num_full_checkpoint_failed" numWALCheckpointTruncateFailed = "num_wal_checkpoint_truncate_failed" + numWALCheckpointIncomplete = "num_wal_checkpoint_incomplete" + numWALMustCheckpoint = "num_wal_must_checkpoint" numAutoVacuums = "num_auto_vacuums" numAutoVacuumsFailed = "num_auto_vacuums_failed" autoVacuumDuration = "auto_vacuum_duration" @@ -221,6 +225,8 @@ func ResetStats() { stats.Add(numSnapshotsIncremental, 0) stats.Add(numFullCheckpointFailed, 0) stats.Add(numWALCheckpointTruncateFailed, 0) + stats.Add(numWALCheckpointIncomplete, 0) + stats.Add(numWALMustCheckpoint, 0) stats.Add(numAutoVacuums, 0) stats.Add(numAutoVacuumsFailed, 0) stats.Add(autoVacuumDuration, 0) @@ -349,6 +355,10 @@ type Store struct { fsmTerm atomic.Uint64 fsmUpdateTime *rsync.AtomicTime // This is node-local time. + // readerMu allows blocking of all reads. This is used to handle + // specific, very rare, edge cases around WAL checkpointing. + readerMu sync.RWMutex + // appendedAtTime is the Leader's clock time when that Leader appended the log entry. // The Leader that actually appended the log entry is not necessarily the current Leader. appendedAtTime *rsync.AtomicTime @@ -1419,6 +1429,9 @@ func (s *Store) execute(ex *proto.ExecuteRequest) ([]*proto.ExecuteQueryResponse // upgraded to STRONG if the Store determines that is necessary to guarantee // a linearizable read. func (s *Store) Query(qr *proto.QueryRequest) (rows []*proto.QueryRows, level proto.ConsistencyLevel, raftIndex uint64, retErr error) { + s.readerMu.RLock() + defer s.readerMu.RUnlock() + p := (*PragmaCheckRequest)(qr.Request) if err := p.Check(); err != nil { return nil, 0, 0, err @@ -1530,6 +1543,9 @@ func (s *Store) VerifyLeader() (retErr error) { // Request processes a request that may contain both Executes and Queries. func (s *Store) Request(eqr *proto.ExecuteQueryRequest) ([]*proto.ExecuteQueryResponse, uint64, uint64, error) { + s.readerMu.RLock() + defer s.readerMu.RUnlock() + p := (*PragmaCheckRequest)(eqr.Request) if err := p.Check(); err != nil { return nil, 0, 0, err @@ -1633,6 +1649,9 @@ func (s *Store) Request(eqr *proto.ExecuteQueryRequest) ([]*proto.ExecuteQueryRe // will be written directly to that file. Otherwise a temporary file will be created, // and that temporary file copied to dst. func (s *Store) Backup(br *proto.BackupRequest, dst io.Writer) (retErr error) { + s.readerMu.RLock() + defer s.readerMu.RUnlock() + if !s.open.Is() { return ErrNotOpen } @@ -1906,6 +1925,9 @@ func (s *Store) Vacuum() error { // http://sqlite.org/howtocorrupt.html states it is safe to do this // as long as the database is not written to during the call. func (s *Store) Database(leader bool) ([]byte, error) { + s.readerMu.RLock() + defer s.readerMu.RUnlock() + if leader && s.raft.State() != raft.Leader { return nil, ErrNotLeader } @@ -2506,9 +2528,19 @@ func (s *Store) fsmSnapshot() (fSnap raft.FSMSnapshot, retErr error) { var fsmSnapshot raft.FSMSnapshot if fullNeeded { chkStartTime := time.Now() - if err := s.db.Checkpoint(sql.CheckpointTruncate); err != nil { + meta, err := s.db.Checkpoint(sql.CheckpointTruncate) + if err != nil { stats.Add(numFullCheckpointFailed, 1) - return nil, err + return nil, fmt.Errorf("snapshot can't complete due to FULL Snapshot checkpoint error (will retry): %s", + err.Error()) + } + if !meta.Success() { + if meta.Moved < meta.Pages { + stats.Add(numWALCheckpointIncomplete, 1) + return nil, fmt.Errorf("snapshot can't complete due to FULL Snapshot checkpoint incomplete (will retry): %s)", + meta.String()) + } + s.mustTruncateCheckpoint() } stats.Get(snapshotCreateChkTruncateDuration).(*expvar.Int).Set(time.Since(chkStartTime).Milliseconds()) dbFD, err := os.Open(s.db.Path()) @@ -2567,11 +2599,20 @@ func (s *Store) fsmSnapshot() (fSnap raft.FSMSnapshot, retErr error) { return nil, err } chkTStartTime := time.Now() - if err := s.db.Checkpoint(sql.CheckpointTruncate); err != nil { + meta, err := s.db.Checkpoint(sql.CheckpointTruncate) + if err != nil { stats.Add(numWALCheckpointTruncateFailed, 1) - return nil, fmt.Errorf("snapshot can't complete due to WAL checkpoint failure (will retry): %s", + return nil, fmt.Errorf("snapshot can't complete due to WAL checkpoint error (will retry): %s", err.Error()) } + if !meta.Success() { + if meta.Moved < meta.Pages { + stats.Add(numWALCheckpointIncomplete, 1) + return nil, fmt.Errorf("snapshot can't complete due to Snapshot checkpoint incomplete (will retry %s)", + meta.String()) + } + s.mustTruncateCheckpoint() + } stats.Get(snapshotCreateChkTruncateDuration).(*expvar.Int).Set(time.Since(chkTStartTime).Milliseconds()) stats.Get(snapshotPrecompactWALSize).(*expvar.Int).Set(walSzPre) stats.Get(snapshotWALSize).(*expvar.Int).Set(walSzPost) @@ -2850,6 +2891,42 @@ func (s *Store) runWALSnapshotting() (closeCh, doneCh chan struct{}) { return closeCh, doneCh } +// mustTruncateCheckpoint truncates the checkpointed WAL, retrying until successful or +// timing out. +// +// This should be called if we hit a specifc edge case where all pages were moved but some +// reader blocked truncation. The next write could start overwriting WAL frames at the start +// of the WAL which would mean we would lose WAL data, so we need to forcibly truncate here. +// We do this by blocking all readers (writes are already blocked). This handling is due to +// research into SQLite and not seen as of yet. +// +// Finally, we could still panic here if we timeout trying to truncate. This could happen if +// a reader external to rqlite just won't let go. +func (s *Store) mustTruncateCheckpoint() { + startT := time.Now() + defer func() { + s.logger.Printf("forced WAL truncate checkpoint took %s", time.Since(startT)) + }() + + stats.Add(numWALMustCheckpoint, 1) + s.readerMu.Lock() + defer s.readerMu.Unlock() + + ticker := time.NewTicker(mustWALCheckpointDelay) + defer ticker.Stop() + for { + select { + case <-ticker.C: + meta, err := s.db.Checkpoint(sql.CheckpointTruncate) + if err == nil && meta.Success() { + return + } + case <-time.After(mustWALCheckpointTimeout): + panic("timed out trying to truncate checkpointed WAL") + } + } +} + // selfLeaderChange is called when this node detects that its leadership // status has changed. func (s *Store) selfLeaderChange(leader bool) { diff --git a/store/store_test.go b/store/store_test.go index 27e99853..eae1472d 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -2652,6 +2652,54 @@ func Test_SingleNode_WALTriggeredSnapshot(t *testing.T) { } } +func Test_SingleNode_SnapshotFailRetry(t *testing.T) { + s, ln := mustNewStore(t) + defer ln.Close() + + s.SnapshotThreshold = 8192 + s.SnapshotInterval = time.Hour + s.NoSnapshotOnClose = true + if err := s.Open(); err != nil { + t.Fatalf("failed to open single-node store: %s", err.Error()) + } + defer s.Close(true) + if err := s.Bootstrap(NewServer(s.ID(), s.Addr(), true)); err != nil { + t.Fatalf("failed to bootstrap single-node store: %s", err.Error()) + } + if _, err := s.WaitForLeader(10 * time.Second); err != nil { + t.Fatalf("Error waiting for leader: %s", err) + } + er := executeRequestFromString(`CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`, + false, false) + _, _, err := s.Execute(er) + if err != nil { + t.Fatalf("failed to execute on single node: %s", err.Error()) + } + + er = executeRequestFromString(`INSERT INTO foo(name) VALUES("fiona")`, false, false) + _, _, err = s.Execute(er) + if err != nil { + t.Fatalf("failed to execute on single node: %s", err.Error()) + } + + go func() { + qr := queryRequestFromString("SELECT * FROM foo", false, false) + qr.GetRequest().Statements[0].ForceStall = true + s.Query(qr) + }() + + time.Sleep(2 * time.Second) + er = executeRequestFromString(`INSERT INTO foo(name) VALUES("bob")`, false, false) + _, _, err = s.Execute(er) + if err != nil { + t.Fatalf("failed to execute on single node: %s", err.Error()) + } + + if err := s.Snapshot(0); err == nil { + t.Fatalf("expected error snapshotting single-node store with stalled query") + } +} + func Test_OpenStoreSingleNode_OptimizeTimes(t *testing.T) { s0, ln0 := mustNewStore(t) defer s0.Close(true)