diff --git a/.claude/commands/run-comprehensive-tests.md b/.claude/commands/run-comprehensive-tests.md index 158a58e..9dd9963 100644 --- a/.claude/commands/run-comprehensive-tests.md +++ b/.claude/commands/run-comprehensive-tests.md @@ -142,7 +142,7 @@ echo "=== Testing Builds ===" go build -o bin/litestream ./cmd/litestream # Test VFS build (requires CGO) -CGO_ENABLED=1 go build -tags vfs -o bin/litestream-vfs ./cmd/litestream-vfs +make vfs # Test cross-compilation GOOS=linux GOARCH=amd64 go build -o bin/litestream-linux-amd64 ./cmd/litestream diff --git a/vfs.go b/vfs.go index c842eda..cac6f2b 100644 --- a/vfs.go +++ b/vfs.go @@ -30,6 +30,7 @@ import ( const ( DefaultPollInterval = 1 * time.Second DefaultCacheSize = 10 * 1024 * 1024 // 10MB + DefaultPageSize = 4096 // SQLite default page size pageFetchRetryAttempts = 6 pageFetchRetryDelay = 15 * time.Millisecond @@ -545,8 +546,15 @@ func (f *VFSFile) hasTargetTime() bool { func (f *VFSFile) Open() error { f.logger.Debug("opening file") + // Try to get restore plan. For write-enabled VFS, we can create a new database + // if no LTX files exist yet. infos, err := f.waitForRestorePlan() if err != nil { + // If write mode is enabled and no files exist, we can create a new database + if f.writeEnabled && errors.Is(err, ErrTxNotAvailable) { + f.logger.Info("no existing database found, creating new database") + return f.openNewDatabase() + } return err } @@ -607,6 +615,55 @@ func (f *VFSFile) Open() error { return nil } +// openNewDatabase initializes the VFSFile for a brand new database with no existing data. +// This is called when write mode is enabled and no LTX files exist yet. +func (f *VFSFile) openNewDatabase() error { + f.logger.Debug("initializing new database") + + // Use default page size for new databases + f.pageSize = DefaultPageSize + + // Initialize page cache + cacheEntries := f.CacheSize / int(f.pageSize) + if cacheEntries < 1 { + cacheEntries = 1 + } + cache, err := lru.New[uint32, []byte](cacheEntries) + if err != nil { + return fmt.Errorf("create page cache: %w", err) + } + f.cache = cache + + // Initialize empty index - no pages exist yet + f.index = make(map[uint32]ltx.PageIndexElem) + f.pending = make(map[uint32]ltx.PageIndexElem) + f.pos = ltx.Pos{TXID: 0} + f.commit = 0 + + // Initialize write support for new database + f.expectedTXID = 0 + f.pendingTXID = 1 + f.logger.Debug("write support enabled for new database", "expectedTXID", f.expectedTXID, "pendingTXID", f.pendingTXID) + + // Initialize write buffer file for durability + if err := f.initWriteBuffer(); err != nil { + f.logger.Warn("failed to initialize write buffer", "error", err) + } + + // Start monitoring for new LTX files (in case another writer creates the database) + f.wg.Add(1) + go func() { defer f.wg.Done(); f.monitorReplicaClient(f.ctx) }() + + // Start periodic sync goroutine + if f.syncInterval > 0 { + f.syncTicker = time.NewTicker(f.syncInterval) + f.wg.Add(1) + go func() { defer f.wg.Done(); f.syncLoop() }() + } + + return nil +} + // SetTargetTime rebuilds the page index to view the database at a specific time. func (f *VFSFile) SetTargetTime(ctx context.Context, timestamp time.Time) error { if timestamp.IsZero() { @@ -804,6 +861,15 @@ func (f *VFSFile) ReadAt(p []byte, off int64) (n int, err error) { f.mu.Unlock() if !ok { + // For write-enabled VFS with a new database (no existing pages), + // return zeros to indicate empty page. SQLite will initialize the database. + if f.writeEnabled { + f.logger.Debug("page not found, returning zeros for new database", "page", pgno) + for i := range p { + p[i] = 0 + } + return len(p), nil + } f.logger.Error("page not found", "page", pgno) return 0, fmt.Errorf("page not found: %d", pgno) } @@ -1733,6 +1799,17 @@ func isSupportedPageSize(pageSize uint32) bool { } func (f *VFSFile) waitForRestorePlan() ([]*ltx.FileInfo, error) { + // If write mode is enabled, don't wait - return immediately so we can + // create a new database if no files exist. + if f.writeEnabled { + infos, err := CalcRestorePlan(f.ctx, f.client, 0, time.Time{}, f.logger) + if err != nil { + return nil, err + } + return infos, nil + } + + // For read-only mode, wait for files to become available for { infos, err := CalcRestorePlan(f.ctx, f.client, 0, time.Time{}, f.logger) if err == nil { diff --git a/vfs_write_test.go b/vfs_write_test.go index e8f7a26..dcb7567 100644 --- a/vfs_write_test.go +++ b/vfs_write_test.go @@ -638,3 +638,190 @@ func TestVFSFile_WriteBufferClearAfterSync(t *testing.T) { t.Errorf("buffer should be empty after sync, got size %d", stat.Size()) } } + +func TestVFSFile_OpenNewDatabase(t *testing.T) { + // Test opening a VFSFile with write mode enabled when no LTX files exist (new database) + client := newWriteTestReplicaClient() + // Note: No LTX files created - simulating a brand new database + + // Create temp directory for buffer + tmpDir := t.TempDir() + bufferPath := tmpDir + "/.litestream-write-buffer" + + // Create VFSFile with write support - no existing data + logger := slog.Default() + f := NewVFSFile(client, "new.db", logger) + f.writeEnabled = true + f.dirty = make(map[uint32]int64) + f.syncInterval = 0 + f.bufferPath = bufferPath + + if err := f.Open(); err != nil { + t.Fatal(err) + } + defer f.Close() + + // Verify it opened successfully as a new database + if f.pageSize != DefaultPageSize { + t.Errorf("expected page size %d, got %d", DefaultPageSize, f.pageSize) + } + + if f.pos.TXID != 0 { + t.Errorf("expected TXID 0 for new database, got %d", f.pos.TXID) + } + + if f.expectedTXID != 0 { + t.Errorf("expected expectedTXID 0, got %d", f.expectedTXID) + } + + if f.pendingTXID != 1 { + t.Errorf("expected pendingTXID 1, got %d", f.pendingTXID) + } + + if f.commit != 0 { + t.Errorf("expected commit 0 for new database, got %d", f.commit) + } +} + +func TestVFSFile_NewDatabase_ReadReturnsZeros(t *testing.T) { + // Test that reading from a new database returns zeros + client := newWriteTestReplicaClient() + + tmpDir := t.TempDir() + bufferPath := tmpDir + "/.litestream-write-buffer" + + logger := slog.Default() + f := NewVFSFile(client, "new.db", logger) + f.writeEnabled = true + f.dirty = make(map[uint32]int64) + f.syncInterval = 0 + f.bufferPath = bufferPath + + if err := f.Open(); err != nil { + t.Fatal(err) + } + defer f.Close() + + // Read page 1 - should return zeros for new database + readBuf := make([]byte, 100) + n, err := f.ReadAt(readBuf, 0) + if err != nil { + t.Fatalf("expected no error reading from new database, got: %v", err) + } + if n != len(readBuf) { + t.Errorf("expected %d bytes, got %d", len(readBuf), n) + } + + // Verify all zeros + for i, b := range readBuf { + if b != 0 { + t.Errorf("expected zero at position %d, got %d", i, b) + break + } + } +} + +func TestVFSFile_NewDatabase_WriteAndSync(t *testing.T) { + // Test writing to a new database and syncing to remote + client := newWriteTestReplicaClient() + + tmpDir := t.TempDir() + bufferPath := tmpDir + "/.litestream-write-buffer" + + logger := slog.Default() + f := NewVFSFile(client, "new.db", logger) + f.writeEnabled = true + f.dirty = make(map[uint32]int64) + f.syncInterval = 0 + f.bufferPath = bufferPath + + if err := f.Open(); err != nil { + t.Fatal(err) + } + defer f.Close() + + // Write data to page 1 + writeData := []byte("new database content") + n, err := f.WriteAt(writeData, 0) + if err != nil { + t.Fatal(err) + } + if n != len(writeData) { + t.Errorf("expected %d bytes written, got %d", len(writeData), n) + } + + // Verify dirty page exists + if len(f.dirty) != 1 { + t.Errorf("expected 1 dirty page, got %d", len(f.dirty)) + } + + // Sync to remote + if err := f.Sync(0); err != nil { + t.Fatal(err) + } + + // Verify TXID advanced + if f.expectedTXID != 1 { + t.Errorf("expected expectedTXID 1 after sync, got %d", f.expectedTXID) + } + if f.pendingTXID != 2 { + t.Errorf("expected pendingTXID 2 after sync, got %d", f.pendingTXID) + } + + // Verify LTX file was written + client.mu.Lock() + if len(client.ltxFiles[0]) != 1 { + t.Errorf("expected 1 LTX file after sync, got %d", len(client.ltxFiles[0])) + } + if len(client.ltxFiles[0]) > 0 { + info := client.ltxFiles[0][0] + if info.MinTXID != 1 || info.MaxTXID != 1 { + t.Errorf("expected TXID 1, got min=%d max=%d", info.MinTXID, info.MaxTXID) + } + } + client.mu.Unlock() +} + +func TestVFSFile_NewDatabase_FileSize(t *testing.T) { + // Test that FileSize returns 0 for a new empty database + client := newWriteTestReplicaClient() + + tmpDir := t.TempDir() + bufferPath := tmpDir + "/.litestream-write-buffer" + + logger := slog.Default() + f := NewVFSFile(client, "new.db", logger) + f.writeEnabled = true + f.dirty = make(map[uint32]int64) + f.syncInterval = 0 + f.bufferPath = bufferPath + + if err := f.Open(); err != nil { + t.Fatal(err) + } + defer f.Close() + + // FileSize should be 0 for empty database + size, err := f.FileSize() + if err != nil { + t.Fatal(err) + } + if size != 0 { + t.Errorf("expected size 0 for new database, got %d", size) + } + + // Write a page + data := make([]byte, DefaultPageSize) + if _, err := f.WriteAt(data, 0); err != nil { + t.Fatal(err) + } + + // FileSize should now reflect the dirty page + size, err = f.FileSize() + if err != nil { + t.Fatal(err) + } + if size != int64(DefaultPageSize) { + t.Errorf("expected size %d after write, got %d", DefaultPageSize, size) + } +}