diff --git a/cmd/litestream-vfs/vfs_write_integration_test.go b/cmd/litestream-vfs/vfs_write_integration_test.go index cf9150a..723aa8b 100644 --- a/cmd/litestream-vfs/vfs_write_integration_test.go +++ b/cmd/litestream-vfs/vfs_write_integration_test.go @@ -971,6 +971,238 @@ func TestVFS_InvalidPageSize(t *testing.T) { require.NoError(t, err) } +// TestVFS_RollbackRestoresOriginalState tests that rolling back a transaction +// restores the database to its original state, even after large writes. +func TestVFS_RollbackRestoresOriginalState(t *testing.T) { + replicaDir := t.TempDir() + client := file.NewReplicaClient(replicaDir) + + // Create database directly via writable VFS (no external setup needed) + vfs := newWritableVFS(t, client, 100*time.Millisecond, t.TempDir()) + vfsName := fmt.Sprintf("litestream-rollback-%d", time.Now().UnixNano()) + require.NoError(t, sqlite3vfs.RegisterVFS(vfsName, vfs)) + + sqldb, err := sql.Open("sqlite3", fmt.Sprintf("file:test.db?vfs=%s", vfsName)) + require.NoError(t, err) + defer sqldb.Close() + + // Create initial data via the VFS + _, err = sqldb.Exec("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)") + require.NoError(t, err) + _, err = sqldb.Exec("INSERT INTO users (id, name) VALUES (1, 'Alice')") + require.NoError(t, err) + + // Verify initial state + var initialCount int + err = sqldb.QueryRow("SELECT COUNT(*) FROM users").Scan(&initialCount) + require.NoError(t, err) + require.Equal(t, 1, initialCount, "expected 1 initial row") + + // Start a transaction and write a large amount of data (spanning multiple pages) + _, err = sqldb.Exec("BEGIN") + require.NoError(t, err) + + // Insert 1000 rows with large payloads (~1KB each) to span multiple pages + for i := 2; i <= 1001; i++ { + payload := fmt.Sprintf("rollback_test_user_%d_%s", i, string(make([]byte, 900))) + _, err = sqldb.Exec("INSERT INTO users (id, name) VALUES (?, ?)", i, payload) + require.NoError(t, err) + } + + // Verify the data is visible within the transaction + var countDuringTx int + err = sqldb.QueryRow("SELECT COUNT(*) FROM users").Scan(&countDuringTx) + require.NoError(t, err) + require.Equal(t, 1001, countDuringTx, "expected 1001 rows during transaction") + + // ROLLBACK the transaction + _, err = sqldb.Exec("ROLLBACK") + require.NoError(t, err) + + // Verify original state is restored + var afterRollbackCount int + err = sqldb.QueryRow("SELECT COUNT(*) FROM users").Scan(&afterRollbackCount) + require.NoError(t, err) + require.Equal(t, 1, afterRollbackCount, "expected 1 row after rollback") + + // Verify original data is intact + var afterRollbackName string + err = sqldb.QueryRow("SELECT name FROM users WHERE id = 1").Scan(&afterRollbackName) + require.NoError(t, err) + require.Equal(t, "Alice", afterRollbackName, "original data should be intact after rollback") +} + +// TestVFS_RollbackAfterUpdate tests that rolling back UPDATE operations +// restores the original values. +func TestVFS_RollbackAfterUpdate(t *testing.T) { + replicaDir := t.TempDir() + client := file.NewReplicaClient(replicaDir) + + // Create database directly via writable VFS + vfs := newWritableVFS(t, client, 100*time.Millisecond, t.TempDir()) + vfsName := fmt.Sprintf("litestream-rollback-update-%d", time.Now().UnixNano()) + require.NoError(t, sqlite3vfs.RegisterVFS(vfsName, vfs)) + + sqldb, err := sql.Open("sqlite3", fmt.Sprintf("file:test.db?vfs=%s", vfsName)) + require.NoError(t, err) + defer sqldb.Close() + + // Create initial data via the VFS + _, err = sqldb.Exec("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)") + require.NoError(t, err) + _, err = sqldb.Exec("INSERT INTO users (id, name) VALUES (1, 'Alice')") + require.NoError(t, err) + + // Get original value + var originalName string + err = sqldb.QueryRow("SELECT name FROM users WHERE id = 1").Scan(&originalName) + require.NoError(t, err) + require.Equal(t, "Alice", originalName) + + // Start transaction and update + _, err = sqldb.Exec("BEGIN") + require.NoError(t, err) + + _, err = sqldb.Exec("UPDATE users SET name = 'MODIFIED_' || name") + require.NoError(t, err) + + // Verify modification is visible + var modifiedName string + err = sqldb.QueryRow("SELECT name FROM users WHERE id = 1").Scan(&modifiedName) + require.NoError(t, err) + require.Equal(t, "MODIFIED_Alice", modifiedName) + + // ROLLBACK + _, err = sqldb.Exec("ROLLBACK") + require.NoError(t, err) + + // Verify original value is restored + var restoredName string + err = sqldb.QueryRow("SELECT name FROM users WHERE id = 1").Scan(&restoredName) + require.NoError(t, err) + require.Equal(t, "Alice", restoredName, "name should be restored after rollback") +} + +// TestVFS_RollbackAfterDelete tests that rolling back DELETE operations +// restores the deleted rows. +func TestVFS_RollbackAfterDelete(t *testing.T) { + replicaDir := t.TempDir() + client := file.NewReplicaClient(replicaDir) + + // Create database directly via writable VFS + vfs := newWritableVFS(t, client, 100*time.Millisecond, t.TempDir()) + vfsName := fmt.Sprintf("litestream-rollback-delete-%d", time.Now().UnixNano()) + require.NoError(t, sqlite3vfs.RegisterVFS(vfsName, vfs)) + + sqldb, err := sql.Open("sqlite3", fmt.Sprintf("file:test.db?vfs=%s", vfsName)) + require.NoError(t, err) + defer sqldb.Close() + + // Create initial data via the VFS + _, err = sqldb.Exec("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)") + require.NoError(t, err) + _, err = sqldb.Exec("INSERT INTO users (id, name) VALUES (1, 'Alice')") + require.NoError(t, err) + + // Verify initial count + var initialCount int + err = sqldb.QueryRow("SELECT COUNT(*) FROM users").Scan(&initialCount) + require.NoError(t, err) + require.Equal(t, 1, initialCount) + + // Start transaction and delete all rows + _, err = sqldb.Exec("BEGIN") + require.NoError(t, err) + + _, err = sqldb.Exec("DELETE FROM users") + require.NoError(t, err) + + // Verify deletion + var countAfterDelete int + err = sqldb.QueryRow("SELECT COUNT(*) FROM users").Scan(&countAfterDelete) + require.NoError(t, err) + require.Equal(t, 0, countAfterDelete) + + // ROLLBACK + _, err = sqldb.Exec("ROLLBACK") + require.NoError(t, err) + + // Verify rows are restored + var restoredCount int + err = sqldb.QueryRow("SELECT COUNT(*) FROM users").Scan(&restoredCount) + require.NoError(t, err) + require.Equal(t, 1, restoredCount, "all rows should be restored after rollback") + + // Verify the data itself is correct + var name string + err = sqldb.QueryRow("SELECT name FROM users WHERE id = 1").Scan(&name) + require.NoError(t, err) + require.Equal(t, "Alice", name) +} + +// TestVFS_CommitAfterRollbackWorks tests that commits work correctly after +// a previous rollback in the same session. +func TestVFS_CommitAfterRollbackWorks(t *testing.T) { + replicaDir := t.TempDir() + client := file.NewReplicaClient(replicaDir) + + // Create database directly via writable VFS + vfs := newWritableVFS(t, client, 100*time.Millisecond, t.TempDir()) + vfsName := fmt.Sprintf("litestream-commit-after-rollback-%d", time.Now().UnixNano()) + require.NoError(t, sqlite3vfs.RegisterVFS(vfsName, vfs)) + + sqldb, err := sql.Open("sqlite3", fmt.Sprintf("file:test.db?vfs=%s", vfsName)) + require.NoError(t, err) + defer sqldb.Close() + + // Create initial data via the VFS + _, err = sqldb.Exec("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)") + require.NoError(t, err) + _, err = sqldb.Exec("INSERT INTO users (id, name) VALUES (1, 'Alice')") + require.NoError(t, err) + + // Transaction 1: Insert and rollback + _, err = sqldb.Exec("BEGIN") + require.NoError(t, err) + _, err = sqldb.Exec("INSERT INTO users (id, name) VALUES (100, 'should_not_exist')") + require.NoError(t, err) + _, err = sqldb.Exec("ROLLBACK") + require.NoError(t, err) + + // Transaction 2: Insert and commit + _, err = sqldb.Exec("BEGIN") + require.NoError(t, err) + _, err = sqldb.Exec("INSERT INTO users (id, name) VALUES (200, 'should_exist')") + require.NoError(t, err) + _, err = sqldb.Exec("COMMIT") + require.NoError(t, err) + + // Verify only the committed data exists + var count int + err = sqldb.QueryRow("SELECT COUNT(*) FROM users WHERE name = 'should_not_exist'").Scan(&count) + require.NoError(t, err) + require.Equal(t, 0, count, "rolled back data should not exist") + + err = sqldb.QueryRow("SELECT COUNT(*) FROM users WHERE name = 'should_exist'").Scan(&count) + require.NoError(t, err) + require.Equal(t, 1, count, "committed data should exist") + + // Close and reopen to verify persistence + sqldb.Close() + + vfs2 := newWritableVFS(t, client, 0, "") + vfsName2 := fmt.Sprintf("litestream-verify-%d", time.Now().UnixNano()) + require.NoError(t, sqlite3vfs.RegisterVFS(vfsName2, vfs2)) + + sqldb2, err := sql.Open("sqlite3", fmt.Sprintf("file:test.db?vfs=%s", vfsName2)) + require.NoError(t, err) + defer sqldb2.Close() + + err = sqldb2.QueryRow("SELECT COUNT(*) FROM users WHERE name = 'should_exist'").Scan(&count) + require.NoError(t, err) + require.Equal(t, 1, count, "committed data should persist") +} + // ============================================================================= // Helper Functions // ============================================================================= @@ -987,7 +1219,10 @@ func newWritableVFS(tb testing.TB, client litestream.ReplicaClient, syncInterval vfs.PollInterval = 100 * time.Millisecond vfs.WriteEnabled = true vfs.WriteSyncInterval = syncInterval - vfs.WriteBufferPath = localPath + // If localPath is provided as a directory, append a buffer filename + if localPath != "" { + vfs.WriteBufferPath = filepath.Join(localPath, ".litestream-buffer") + } return vfs } @@ -1012,10 +1247,10 @@ func setupInitialDB(t *testing.T, client litestream.ReplicaClient) { dbDir := t.TempDir() db := testingutil.NewDB(t, filepath.Join(dbDir, "source.db")) - db.MonitorInterval = 100 * time.Millisecond + db.MonitorInterval = 50 * time.Millisecond db.Replica = litestream.NewReplica(db) db.Replica.Client = client - db.Replica.SyncInterval = 100 * time.Millisecond + db.Replica.SyncInterval = 50 * time.Millisecond require.NoError(t, db.Open()) sqldb := testingutil.MustOpenSQLDB(t, db.Path()) @@ -1025,8 +1260,13 @@ func setupInitialDB(t *testing.T, client litestream.ReplicaClient) { _, err = sqldb.Exec("INSERT INTO users (id, name) VALUES (1, 'Alice')") require.NoError(t, err) + // Wait for initial LTX file waitForLTXFiles(t, client, 10*time.Second, db.MonitorInterval) + // Force a DB sync and then replica sync to ensure all data is uploaded + require.NoError(t, db.Sync(context.Background())) + require.NoError(t, db.Replica.Sync(context.Background())) + require.NoError(t, db.Replica.Stop(false)) testingutil.MustCloseSQLDB(t, sqldb) require.NoError(t, db.Close(context.Background()))