mirror of
https://github.com/rqlite/rqlite.git
synced 2026-01-25 04:16:26 +00:00
Unit test forced checkpoint and truncation
This commit is contained in:
@@ -2,6 +2,7 @@
|
||||
### Implementation changes and bug fixes
|
||||
- [PR #2423](https://github.com/rqlite/rqlite/pull/2423): Handle possible WAL checkpoint failure.
|
||||
- [PR #2424](https://github.com/rqlite/rqlite/pull/2424): Add `QueryWithContext()` to DB layer.
|
||||
- [PR #2425](https://github.com/rqlite/rqlite/pull/2425): Unit test forced checkpoint and truncation at Store level.
|
||||
|
||||
## v9.3.8 (January 3rd 2026)
|
||||
### Implementation changes and bug fixes
|
||||
|
||||
12
db/db.go
12
db/db.go
@@ -659,7 +659,7 @@ func (db *DB) BusyTimeout() (rwMs, roMs int, err error) {
|
||||
// Checkpoint checkpoints the WAL file. If the WAL file is not enabled, this
|
||||
// function is a no-op.
|
||||
func (db *DB) Checkpoint(mode CheckpointMode) (*CheckpointMeta, error) {
|
||||
return db.CheckpointWithTimeout(mode, 0)
|
||||
return db.CheckpointWithTimeout(mode, 100)
|
||||
}
|
||||
|
||||
// CheckpointWithTimeout performs a WAL checkpoint. If the checkpoint does not
|
||||
@@ -1204,6 +1204,7 @@ func (db *DB) queryStmtWithConn(ctx context.Context, stmt *command.Statement, xT
|
||||
}()
|
||||
rows := &command.QueryRows{}
|
||||
start := time.Now()
|
||||
forceStall := stmt.ForceStall
|
||||
|
||||
parameters, err := parametersToValues(stmt.Parameters)
|
||||
if err != nil {
|
||||
@@ -1257,8 +1258,13 @@ func (db *DB) queryStmtWithConn(ctx context.Context, stmt *command.Statement, xT
|
||||
// Check for slow query, blocked query, etc testing. This field
|
||||
// should never set by production code and is only for fault-injection
|
||||
// testing purposes.
|
||||
if stmt.ForceStall {
|
||||
<-make(chan struct{})
|
||||
if forceStall {
|
||||
select {
|
||||
case <-make(chan struct{}):
|
||||
case <-ctx.Done():
|
||||
db.logger.Printf("forced stall on query cancelled: %s", ctx.Err().Error())
|
||||
forceStall = false
|
||||
}
|
||||
}
|
||||
|
||||
// One-time population of any empty types. Best effort, ignore
|
||||
|
||||
@@ -2,6 +2,7 @@ package db
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"os"
|
||||
"testing"
|
||||
@@ -80,6 +81,60 @@ func Test_WALDatabaseCheckpointOK(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func Test_WALDatabaseCheckpointFail_Blocked(t *testing.T) {
|
||||
path := mustTempFile()
|
||||
defer os.Remove(path)
|
||||
|
||||
db, err := Open(path, false, true)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to open database in WAL mode: %s", err.Error())
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
_, err = db.ExecuteStringStmt(`CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to execute on single node: %s", err.Error())
|
||||
}
|
||||
_, err = db.ExecuteStringStmt(`INSERT INTO foo(name) VALUES("alice")`)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to execute INSERT on single node: %s", err.Error())
|
||||
}
|
||||
|
||||
// Issue a long-running read that should block the checkpoint.
|
||||
qr := &command.Request{
|
||||
Statements: []*command.Statement{
|
||||
{
|
||||
Sql: "SELECT * FROM foo",
|
||||
ForceStall: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
ctx, cancelFunc := context.WithCancel(context.Background())
|
||||
go func() {
|
||||
db.QueryWithContext(ctx, qr, false)
|
||||
}()
|
||||
time.Sleep(2 * time.Second)
|
||||
meta, err := db.Checkpoint(CheckpointTruncate)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to checkpoint database: %s", err.Error())
|
||||
}
|
||||
if meta.Success() {
|
||||
t.Fatalf("expected checkpoint to be unsuccessful due to blocking read")
|
||||
}
|
||||
|
||||
// Cancel the blocking read, and try again.
|
||||
cancelFunc()
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
meta, err = db.Checkpoint(CheckpointTruncate)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to checkpoint database: %s", err.Error())
|
||||
}
|
||||
if !meta.Success() {
|
||||
t.Fatalf("expected checkpoint to be successful after blocking read was cancelled")
|
||||
}
|
||||
}
|
||||
|
||||
// Test_WALDatabaseCheckpointOK_NoWALChange tests that a checkpoint
|
||||
// that is blocked by a long-running read does not result in a
|
||||
// change to the WAL file. This is to show that we can safely retry
|
||||
|
||||
@@ -2,6 +2,7 @@ package store
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"errors"
|
||||
"fmt"
|
||||
@@ -2652,7 +2653,7 @@ func Test_SingleNode_WALTriggeredSnapshot(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func Test_SingleNode_SnapshotFailRetry(t *testing.T) {
|
||||
func Test_SingleNode_SnapshotFail_Blocked(t *testing.T) {
|
||||
s, ln := mustNewStore(t)
|
||||
defer ln.Close()
|
||||
|
||||
@@ -2700,6 +2701,115 @@ func Test_SingleNode_SnapshotFailRetry(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// Test_SingleNode_SnapshotFail_Blocked_Retry tests that a snapshot operation
|
||||
// that requires a forced checkpoint and truncation does succeed once the
|
||||
// blocking query unblocks.
|
||||
func Test_SingleNode_SnapshotFail_Blocked_Retry(t *testing.T) {
|
||||
s, ln := mustNewStore(t)
|
||||
defer ln.Close()
|
||||
|
||||
s.SnapshotThreshold = 8192
|
||||
s.SnapshotInterval = time.Hour
|
||||
s.NoSnapshotOnClose = true
|
||||
if err := s.Open(); err != nil {
|
||||
t.Fatalf("failed to open single-node store: %s", err.Error())
|
||||
}
|
||||
defer s.Close(true)
|
||||
if err := s.Bootstrap(NewServer(s.ID(), s.Addr(), true)); err != nil {
|
||||
t.Fatalf("failed to bootstrap single-node store: %s", err.Error())
|
||||
}
|
||||
if _, err := s.WaitForLeader(10 * time.Second); err != nil {
|
||||
t.Fatalf("Error waiting for leader: %s", err)
|
||||
}
|
||||
er := executeRequestFromString(`CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`,
|
||||
false, false)
|
||||
_, _, err := s.Execute(er)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to execute on single node: %s", err.Error())
|
||||
}
|
||||
|
||||
er = executeRequestFromString(`INSERT INTO foo(name) VALUES("fiona")`, false, false)
|
||||
_, _, err = s.Execute(er)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to execute on single node: %s", err.Error())
|
||||
}
|
||||
|
||||
ctx, cancelFunc := context.WithCancel(context.Background())
|
||||
go func() {
|
||||
qr := queryRequestFromString("SELECT * FROM foo", false, false)
|
||||
qr.GetRequest().Statements[0].ForceStall = true
|
||||
|
||||
blockingDB, err := db.Open(s.dbPath, false, true)
|
||||
if err != nil {
|
||||
t.Errorf("failed to open blocking DB connection: %s", err.Error())
|
||||
}
|
||||
defer blockingDB.Close()
|
||||
|
||||
_, err = blockingDB.QueryWithContext(ctx, qr.GetRequest(), false)
|
||||
if err != nil {
|
||||
t.Errorf("failed to execute stalled query on blocking DB connection: %s", err.Error())
|
||||
}
|
||||
}()
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
success := false
|
||||
var wg sync.WaitGroup
|
||||
wg.Go(func() {
|
||||
if err := s.Snapshot(0); err != nil {
|
||||
t.Errorf("failed to snapshot single-node store with released stalled query: %s", err.Error())
|
||||
} else {
|
||||
success = true
|
||||
}
|
||||
})
|
||||
time.Sleep(1 * time.Second)
|
||||
cancelFunc()
|
||||
wg.Wait()
|
||||
if !success {
|
||||
t.Fatalf("expected snapshot to succeed after blocking query released")
|
||||
}
|
||||
|
||||
// Again, this time with a persistent snapshot.
|
||||
er = executeRequestFromString(`INSERT INTO foo(name) VALUES("fiona")`, false, false)
|
||||
_, _, err = s.Execute(er)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to execute on single node: %s", err.Error())
|
||||
}
|
||||
|
||||
ctx, cancelFunc = context.WithCancel(context.Background())
|
||||
go func() {
|
||||
qr := queryRequestFromString("SELECT * FROM foo", false, false)
|
||||
qr.GetRequest().Statements[0].ForceStall = true
|
||||
|
||||
blockingDB, err := db.Open(s.dbPath, false, true)
|
||||
if err != nil {
|
||||
t.Errorf("failed to open blocking DB connection: %s", err.Error())
|
||||
}
|
||||
defer blockingDB.Close()
|
||||
|
||||
_, err = blockingDB.QueryWithContext(ctx, qr.GetRequest(), false)
|
||||
if err != nil {
|
||||
t.Errorf("failed to execute stalled query on blocking DB connection: %s", err.Error())
|
||||
}
|
||||
}()
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
success = false
|
||||
var wg2 sync.WaitGroup
|
||||
wg2.Go(func() {
|
||||
if err := s.Snapshot(0); err != nil {
|
||||
t.Errorf("failed to snapshot single-node store with second released stalled query: %s", err.Error())
|
||||
} else {
|
||||
success = true
|
||||
}
|
||||
})
|
||||
time.Sleep(1 * time.Second)
|
||||
cancelFunc()
|
||||
wg2.Wait()
|
||||
if !success {
|
||||
t.Fatalf("expected snapshot to succeed after blocking query released")
|
||||
}
|
||||
}
|
||||
|
||||
func Test_OpenStoreSingleNode_OptimizeTimes(t *testing.T) {
|
||||
s0, ln0 := mustNewStore(t)
|
||||
defer s0.Close(true)
|
||||
|
||||
Reference in New Issue
Block a user