mirror of
https://github.com/benbjohnson/litestream.git
synced 2026-01-24 20:56:48 +00:00
fix(wal): filter stale pages when database shrinks (#881)
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
66
db_test.go
66
db_test.go
@@ -1005,3 +1005,69 @@ func TestDB_EnforceL0RetentionByTime(t *testing.T) {
|
||||
}
|
||||
checkExists(true)
|
||||
}
|
||||
|
||||
// TestDB_SyncAfterVacuum verifies that syncing works correctly after a database
|
||||
// shrinks via VACUUM. This tests the fix for issue #875 where page numbers from
|
||||
// earlier transactions in the WAL could exceed the new commit size after shrinking.
|
||||
func TestDB_SyncAfterVacuum(t *testing.T) {
|
||||
db, sqldb := testingutil.MustOpenDBs(t)
|
||||
defer testingutil.MustCloseDBs(t, db, sqldb)
|
||||
|
||||
// Create a table and insert enough data to create multiple pages
|
||||
if _, err := sqldb.ExecContext(t.Context(), `CREATE TABLE t (id INTEGER PRIMARY KEY, data BLOB)`); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Insert enough rows to create many pages
|
||||
for i := 0; i < 100; i++ {
|
||||
if _, err := sqldb.ExecContext(t.Context(), `INSERT INTO t (data) VALUES (?)`, strings.Repeat("x", 4000)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Initial sync
|
||||
if err := db.Sync(t.Context()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Get initial page count
|
||||
var initialPageCount int
|
||||
if err := sqldb.QueryRowContext(t.Context(), `PRAGMA page_count`).Scan(&initialPageCount); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Logf("Initial page count: %d", initialPageCount)
|
||||
|
||||
// Delete most data and VACUUM to shrink the database
|
||||
if _, err := sqldb.ExecContext(t.Context(), `DELETE FROM t WHERE id > 10`); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := sqldb.ExecContext(t.Context(), `VACUUM`); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Get new page count
|
||||
var newPageCount int
|
||||
if err := sqldb.QueryRowContext(t.Context(), `PRAGMA page_count`).Scan(&newPageCount); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Logf("Page count after VACUUM: %d", newPageCount)
|
||||
|
||||
if newPageCount >= initialPageCount {
|
||||
t.Skip("VACUUM did not shrink database, skipping test")
|
||||
}
|
||||
|
||||
// This sync should succeed without "page number out-of-bounds" error
|
||||
if err := db.Sync(t.Context()); err != nil {
|
||||
t.Fatalf("sync after VACUUM failed: %v", err)
|
||||
}
|
||||
|
||||
// Verify position advanced
|
||||
pos, err := db.Pos()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if pos.TXID < 2 {
|
||||
t.Fatalf("expected TXID >= 2, got %d", pos.TXID)
|
||||
}
|
||||
t.Logf("Final position: TXID=%d", pos.TXID)
|
||||
}
|
||||
|
||||
@@ -215,6 +215,14 @@ func (r *WALReader) PageMap(ctx context.Context) (m map[uint32]int64, maxOffset
|
||||
}
|
||||
}
|
||||
|
||||
// Remove pages that exceed the final commit size. This can occur when the
|
||||
// database shrinks (e.g., via VACUUM) between transactions in the WAL.
|
||||
for pgno := range m {
|
||||
if pgno > commit {
|
||||
delete(m, pgno)
|
||||
}
|
||||
}
|
||||
|
||||
// If full transactions available, return the original offset.
|
||||
if len(m) == 0 {
|
||||
return m, 0, 0, nil
|
||||
|
||||
Reference in New Issue
Block a user