mirror of
https://github.com/benbjohnson/litestream.git
synced 2026-01-25 05:06:30 +00:00
test(vfs): add rollback verification tests for writable VFS (#1005)
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -971,6 +971,238 @@ func TestVFS_InvalidPageSize(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// TestVFS_RollbackRestoresOriginalState tests that rolling back a transaction
|
||||
// restores the database to its original state, even after large writes.
|
||||
func TestVFS_RollbackRestoresOriginalState(t *testing.T) {
|
||||
replicaDir := t.TempDir()
|
||||
client := file.NewReplicaClient(replicaDir)
|
||||
|
||||
// Create database directly via writable VFS (no external setup needed)
|
||||
vfs := newWritableVFS(t, client, 100*time.Millisecond, t.TempDir())
|
||||
vfsName := fmt.Sprintf("litestream-rollback-%d", time.Now().UnixNano())
|
||||
require.NoError(t, sqlite3vfs.RegisterVFS(vfsName, vfs))
|
||||
|
||||
sqldb, err := sql.Open("sqlite3", fmt.Sprintf("file:test.db?vfs=%s", vfsName))
|
||||
require.NoError(t, err)
|
||||
defer sqldb.Close()
|
||||
|
||||
// Create initial data via the VFS
|
||||
_, err = sqldb.Exec("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
|
||||
require.NoError(t, err)
|
||||
_, err = sqldb.Exec("INSERT INTO users (id, name) VALUES (1, 'Alice')")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify initial state
|
||||
var initialCount int
|
||||
err = sqldb.QueryRow("SELECT COUNT(*) FROM users").Scan(&initialCount)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, initialCount, "expected 1 initial row")
|
||||
|
||||
// Start a transaction and write a large amount of data (spanning multiple pages)
|
||||
_, err = sqldb.Exec("BEGIN")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Insert 1000 rows with large payloads (~1KB each) to span multiple pages
|
||||
for i := 2; i <= 1001; i++ {
|
||||
payload := fmt.Sprintf("rollback_test_user_%d_%s", i, string(make([]byte, 900)))
|
||||
_, err = sqldb.Exec("INSERT INTO users (id, name) VALUES (?, ?)", i, payload)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// Verify the data is visible within the transaction
|
||||
var countDuringTx int
|
||||
err = sqldb.QueryRow("SELECT COUNT(*) FROM users").Scan(&countDuringTx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1001, countDuringTx, "expected 1001 rows during transaction")
|
||||
|
||||
// ROLLBACK the transaction
|
||||
_, err = sqldb.Exec("ROLLBACK")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify original state is restored
|
||||
var afterRollbackCount int
|
||||
err = sqldb.QueryRow("SELECT COUNT(*) FROM users").Scan(&afterRollbackCount)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, afterRollbackCount, "expected 1 row after rollback")
|
||||
|
||||
// Verify original data is intact
|
||||
var afterRollbackName string
|
||||
err = sqldb.QueryRow("SELECT name FROM users WHERE id = 1").Scan(&afterRollbackName)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "Alice", afterRollbackName, "original data should be intact after rollback")
|
||||
}
|
||||
|
||||
// TestVFS_RollbackAfterUpdate tests that rolling back UPDATE operations
|
||||
// restores the original values.
|
||||
func TestVFS_RollbackAfterUpdate(t *testing.T) {
|
||||
replicaDir := t.TempDir()
|
||||
client := file.NewReplicaClient(replicaDir)
|
||||
|
||||
// Create database directly via writable VFS
|
||||
vfs := newWritableVFS(t, client, 100*time.Millisecond, t.TempDir())
|
||||
vfsName := fmt.Sprintf("litestream-rollback-update-%d", time.Now().UnixNano())
|
||||
require.NoError(t, sqlite3vfs.RegisterVFS(vfsName, vfs))
|
||||
|
||||
sqldb, err := sql.Open("sqlite3", fmt.Sprintf("file:test.db?vfs=%s", vfsName))
|
||||
require.NoError(t, err)
|
||||
defer sqldb.Close()
|
||||
|
||||
// Create initial data via the VFS
|
||||
_, err = sqldb.Exec("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
|
||||
require.NoError(t, err)
|
||||
_, err = sqldb.Exec("INSERT INTO users (id, name) VALUES (1, 'Alice')")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Get original value
|
||||
var originalName string
|
||||
err = sqldb.QueryRow("SELECT name FROM users WHERE id = 1").Scan(&originalName)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "Alice", originalName)
|
||||
|
||||
// Start transaction and update
|
||||
_, err = sqldb.Exec("BEGIN")
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = sqldb.Exec("UPDATE users SET name = 'MODIFIED_' || name")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify modification is visible
|
||||
var modifiedName string
|
||||
err = sqldb.QueryRow("SELECT name FROM users WHERE id = 1").Scan(&modifiedName)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "MODIFIED_Alice", modifiedName)
|
||||
|
||||
// ROLLBACK
|
||||
_, err = sqldb.Exec("ROLLBACK")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify original value is restored
|
||||
var restoredName string
|
||||
err = sqldb.QueryRow("SELECT name FROM users WHERE id = 1").Scan(&restoredName)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "Alice", restoredName, "name should be restored after rollback")
|
||||
}
|
||||
|
||||
// TestVFS_RollbackAfterDelete tests that rolling back DELETE operations
|
||||
// restores the deleted rows.
|
||||
func TestVFS_RollbackAfterDelete(t *testing.T) {
|
||||
replicaDir := t.TempDir()
|
||||
client := file.NewReplicaClient(replicaDir)
|
||||
|
||||
// Create database directly via writable VFS
|
||||
vfs := newWritableVFS(t, client, 100*time.Millisecond, t.TempDir())
|
||||
vfsName := fmt.Sprintf("litestream-rollback-delete-%d", time.Now().UnixNano())
|
||||
require.NoError(t, sqlite3vfs.RegisterVFS(vfsName, vfs))
|
||||
|
||||
sqldb, err := sql.Open("sqlite3", fmt.Sprintf("file:test.db?vfs=%s", vfsName))
|
||||
require.NoError(t, err)
|
||||
defer sqldb.Close()
|
||||
|
||||
// Create initial data via the VFS
|
||||
_, err = sqldb.Exec("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
|
||||
require.NoError(t, err)
|
||||
_, err = sqldb.Exec("INSERT INTO users (id, name) VALUES (1, 'Alice')")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify initial count
|
||||
var initialCount int
|
||||
err = sqldb.QueryRow("SELECT COUNT(*) FROM users").Scan(&initialCount)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, initialCount)
|
||||
|
||||
// Start transaction and delete all rows
|
||||
_, err = sqldb.Exec("BEGIN")
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = sqldb.Exec("DELETE FROM users")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify deletion
|
||||
var countAfterDelete int
|
||||
err = sqldb.QueryRow("SELECT COUNT(*) FROM users").Scan(&countAfterDelete)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, countAfterDelete)
|
||||
|
||||
// ROLLBACK
|
||||
_, err = sqldb.Exec("ROLLBACK")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify rows are restored
|
||||
var restoredCount int
|
||||
err = sqldb.QueryRow("SELECT COUNT(*) FROM users").Scan(&restoredCount)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, restoredCount, "all rows should be restored after rollback")
|
||||
|
||||
// Verify the data itself is correct
|
||||
var name string
|
||||
err = sqldb.QueryRow("SELECT name FROM users WHERE id = 1").Scan(&name)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "Alice", name)
|
||||
}
|
||||
|
||||
// TestVFS_CommitAfterRollbackWorks tests that commits work correctly after
|
||||
// a previous rollback in the same session.
|
||||
func TestVFS_CommitAfterRollbackWorks(t *testing.T) {
|
||||
replicaDir := t.TempDir()
|
||||
client := file.NewReplicaClient(replicaDir)
|
||||
|
||||
// Create database directly via writable VFS
|
||||
vfs := newWritableVFS(t, client, 100*time.Millisecond, t.TempDir())
|
||||
vfsName := fmt.Sprintf("litestream-commit-after-rollback-%d", time.Now().UnixNano())
|
||||
require.NoError(t, sqlite3vfs.RegisterVFS(vfsName, vfs))
|
||||
|
||||
sqldb, err := sql.Open("sqlite3", fmt.Sprintf("file:test.db?vfs=%s", vfsName))
|
||||
require.NoError(t, err)
|
||||
defer sqldb.Close()
|
||||
|
||||
// Create initial data via the VFS
|
||||
_, err = sqldb.Exec("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
|
||||
require.NoError(t, err)
|
||||
_, err = sqldb.Exec("INSERT INTO users (id, name) VALUES (1, 'Alice')")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Transaction 1: Insert and rollback
|
||||
_, err = sqldb.Exec("BEGIN")
|
||||
require.NoError(t, err)
|
||||
_, err = sqldb.Exec("INSERT INTO users (id, name) VALUES (100, 'should_not_exist')")
|
||||
require.NoError(t, err)
|
||||
_, err = sqldb.Exec("ROLLBACK")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Transaction 2: Insert and commit
|
||||
_, err = sqldb.Exec("BEGIN")
|
||||
require.NoError(t, err)
|
||||
_, err = sqldb.Exec("INSERT INTO users (id, name) VALUES (200, 'should_exist')")
|
||||
require.NoError(t, err)
|
||||
_, err = sqldb.Exec("COMMIT")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify only the committed data exists
|
||||
var count int
|
||||
err = sqldb.QueryRow("SELECT COUNT(*) FROM users WHERE name = 'should_not_exist'").Scan(&count)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, count, "rolled back data should not exist")
|
||||
|
||||
err = sqldb.QueryRow("SELECT COUNT(*) FROM users WHERE name = 'should_exist'").Scan(&count)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, count, "committed data should exist")
|
||||
|
||||
// Close and reopen to verify persistence
|
||||
sqldb.Close()
|
||||
|
||||
vfs2 := newWritableVFS(t, client, 0, "")
|
||||
vfsName2 := fmt.Sprintf("litestream-verify-%d", time.Now().UnixNano())
|
||||
require.NoError(t, sqlite3vfs.RegisterVFS(vfsName2, vfs2))
|
||||
|
||||
sqldb2, err := sql.Open("sqlite3", fmt.Sprintf("file:test.db?vfs=%s", vfsName2))
|
||||
require.NoError(t, err)
|
||||
defer sqldb2.Close()
|
||||
|
||||
err = sqldb2.QueryRow("SELECT COUNT(*) FROM users WHERE name = 'should_exist'").Scan(&count)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, count, "committed data should persist")
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// Helper Functions
|
||||
// =============================================================================
|
||||
@@ -987,7 +1219,10 @@ func newWritableVFS(tb testing.TB, client litestream.ReplicaClient, syncInterval
|
||||
vfs.PollInterval = 100 * time.Millisecond
|
||||
vfs.WriteEnabled = true
|
||||
vfs.WriteSyncInterval = syncInterval
|
||||
vfs.WriteBufferPath = localPath
|
||||
// If localPath is provided as a directory, append a buffer filename
|
||||
if localPath != "" {
|
||||
vfs.WriteBufferPath = filepath.Join(localPath, ".litestream-buffer")
|
||||
}
|
||||
|
||||
return vfs
|
||||
}
|
||||
@@ -1012,10 +1247,10 @@ func setupInitialDB(t *testing.T, client litestream.ReplicaClient) {
|
||||
|
||||
dbDir := t.TempDir()
|
||||
db := testingutil.NewDB(t, filepath.Join(dbDir, "source.db"))
|
||||
db.MonitorInterval = 100 * time.Millisecond
|
||||
db.MonitorInterval = 50 * time.Millisecond
|
||||
db.Replica = litestream.NewReplica(db)
|
||||
db.Replica.Client = client
|
||||
db.Replica.SyncInterval = 100 * time.Millisecond
|
||||
db.Replica.SyncInterval = 50 * time.Millisecond
|
||||
require.NoError(t, db.Open())
|
||||
|
||||
sqldb := testingutil.MustOpenSQLDB(t, db.Path())
|
||||
@@ -1025,8 +1260,13 @@ func setupInitialDB(t *testing.T, client litestream.ReplicaClient) {
|
||||
_, err = sqldb.Exec("INSERT INTO users (id, name) VALUES (1, 'Alice')")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Wait for initial LTX file
|
||||
waitForLTXFiles(t, client, 10*time.Second, db.MonitorInterval)
|
||||
|
||||
// Force a DB sync and then replica sync to ensure all data is uploaded
|
||||
require.NoError(t, db.Sync(context.Background()))
|
||||
require.NoError(t, db.Replica.Sync(context.Background()))
|
||||
|
||||
require.NoError(t, db.Replica.Stop(false))
|
||||
testingutil.MustCloseSQLDB(t, sqldb)
|
||||
require.NoError(t, db.Close(context.Background()))
|
||||
|
||||
Reference in New Issue
Block a user