mirror of
https://github.com/benbjohnson/litestream.git
synced 2026-01-25 05:06:30 +00:00
feat(vfs): support creating new databases in write mode (#972)
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -142,7 +142,7 @@ echo "=== Testing Builds ==="
|
||||
go build -o bin/litestream ./cmd/litestream
|
||||
|
||||
# Test VFS build (requires CGO)
|
||||
CGO_ENABLED=1 go build -tags vfs -o bin/litestream-vfs ./cmd/litestream-vfs
|
||||
make vfs
|
||||
|
||||
# Test cross-compilation
|
||||
GOOS=linux GOARCH=amd64 go build -o bin/litestream-linux-amd64 ./cmd/litestream
|
||||
|
||||
77
vfs.go
77
vfs.go
@@ -30,6 +30,7 @@ import (
|
||||
const (
|
||||
DefaultPollInterval = 1 * time.Second
|
||||
DefaultCacheSize = 10 * 1024 * 1024 // 10MB
|
||||
DefaultPageSize = 4096 // SQLite default page size
|
||||
|
||||
pageFetchRetryAttempts = 6
|
||||
pageFetchRetryDelay = 15 * time.Millisecond
|
||||
@@ -545,8 +546,15 @@ func (f *VFSFile) hasTargetTime() bool {
|
||||
func (f *VFSFile) Open() error {
|
||||
f.logger.Debug("opening file")
|
||||
|
||||
// Try to get restore plan. For write-enabled VFS, we can create a new database
|
||||
// if no LTX files exist yet.
|
||||
infos, err := f.waitForRestorePlan()
|
||||
if err != nil {
|
||||
// If write mode is enabled and no files exist, we can create a new database
|
||||
if f.writeEnabled && errors.Is(err, ErrTxNotAvailable) {
|
||||
f.logger.Info("no existing database found, creating new database")
|
||||
return f.openNewDatabase()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -607,6 +615,55 @@ func (f *VFSFile) Open() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// openNewDatabase initializes the VFSFile for a brand new database with no existing data.
|
||||
// This is called when write mode is enabled and no LTX files exist yet.
|
||||
func (f *VFSFile) openNewDatabase() error {
|
||||
f.logger.Debug("initializing new database")
|
||||
|
||||
// Use default page size for new databases
|
||||
f.pageSize = DefaultPageSize
|
||||
|
||||
// Initialize page cache
|
||||
cacheEntries := f.CacheSize / int(f.pageSize)
|
||||
if cacheEntries < 1 {
|
||||
cacheEntries = 1
|
||||
}
|
||||
cache, err := lru.New[uint32, []byte](cacheEntries)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create page cache: %w", err)
|
||||
}
|
||||
f.cache = cache
|
||||
|
||||
// Initialize empty index - no pages exist yet
|
||||
f.index = make(map[uint32]ltx.PageIndexElem)
|
||||
f.pending = make(map[uint32]ltx.PageIndexElem)
|
||||
f.pos = ltx.Pos{TXID: 0}
|
||||
f.commit = 0
|
||||
|
||||
// Initialize write support for new database
|
||||
f.expectedTXID = 0
|
||||
f.pendingTXID = 1
|
||||
f.logger.Debug("write support enabled for new database", "expectedTXID", f.expectedTXID, "pendingTXID", f.pendingTXID)
|
||||
|
||||
// Initialize write buffer file for durability
|
||||
if err := f.initWriteBuffer(); err != nil {
|
||||
f.logger.Warn("failed to initialize write buffer", "error", err)
|
||||
}
|
||||
|
||||
// Start monitoring for new LTX files (in case another writer creates the database)
|
||||
f.wg.Add(1)
|
||||
go func() { defer f.wg.Done(); f.monitorReplicaClient(f.ctx) }()
|
||||
|
||||
// Start periodic sync goroutine
|
||||
if f.syncInterval > 0 {
|
||||
f.syncTicker = time.NewTicker(f.syncInterval)
|
||||
f.wg.Add(1)
|
||||
go func() { defer f.wg.Done(); f.syncLoop() }()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetTargetTime rebuilds the page index to view the database at a specific time.
|
||||
func (f *VFSFile) SetTargetTime(ctx context.Context, timestamp time.Time) error {
|
||||
if timestamp.IsZero() {
|
||||
@@ -804,6 +861,15 @@ func (f *VFSFile) ReadAt(p []byte, off int64) (n int, err error) {
|
||||
f.mu.Unlock()
|
||||
|
||||
if !ok {
|
||||
// For write-enabled VFS with a new database (no existing pages),
|
||||
// return zeros to indicate empty page. SQLite will initialize the database.
|
||||
if f.writeEnabled {
|
||||
f.logger.Debug("page not found, returning zeros for new database", "page", pgno)
|
||||
for i := range p {
|
||||
p[i] = 0
|
||||
}
|
||||
return len(p), nil
|
||||
}
|
||||
f.logger.Error("page not found", "page", pgno)
|
||||
return 0, fmt.Errorf("page not found: %d", pgno)
|
||||
}
|
||||
@@ -1733,6 +1799,17 @@ func isSupportedPageSize(pageSize uint32) bool {
|
||||
}
|
||||
|
||||
func (f *VFSFile) waitForRestorePlan() ([]*ltx.FileInfo, error) {
|
||||
// If write mode is enabled, don't wait - return immediately so we can
|
||||
// create a new database if no files exist.
|
||||
if f.writeEnabled {
|
||||
infos, err := CalcRestorePlan(f.ctx, f.client, 0, time.Time{}, f.logger)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return infos, nil
|
||||
}
|
||||
|
||||
// For read-only mode, wait for files to become available
|
||||
for {
|
||||
infos, err := CalcRestorePlan(f.ctx, f.client, 0, time.Time{}, f.logger)
|
||||
if err == nil {
|
||||
|
||||
@@ -638,3 +638,190 @@ func TestVFSFile_WriteBufferClearAfterSync(t *testing.T) {
|
||||
t.Errorf("buffer should be empty after sync, got size %d", stat.Size())
|
||||
}
|
||||
}
|
||||
|
||||
func TestVFSFile_OpenNewDatabase(t *testing.T) {
|
||||
// Test opening a VFSFile with write mode enabled when no LTX files exist (new database)
|
||||
client := newWriteTestReplicaClient()
|
||||
// Note: No LTX files created - simulating a brand new database
|
||||
|
||||
// Create temp directory for buffer
|
||||
tmpDir := t.TempDir()
|
||||
bufferPath := tmpDir + "/.litestream-write-buffer"
|
||||
|
||||
// Create VFSFile with write support - no existing data
|
||||
logger := slog.Default()
|
||||
f := NewVFSFile(client, "new.db", logger)
|
||||
f.writeEnabled = true
|
||||
f.dirty = make(map[uint32]int64)
|
||||
f.syncInterval = 0
|
||||
f.bufferPath = bufferPath
|
||||
|
||||
if err := f.Open(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
// Verify it opened successfully as a new database
|
||||
if f.pageSize != DefaultPageSize {
|
||||
t.Errorf("expected page size %d, got %d", DefaultPageSize, f.pageSize)
|
||||
}
|
||||
|
||||
if f.pos.TXID != 0 {
|
||||
t.Errorf("expected TXID 0 for new database, got %d", f.pos.TXID)
|
||||
}
|
||||
|
||||
if f.expectedTXID != 0 {
|
||||
t.Errorf("expected expectedTXID 0, got %d", f.expectedTXID)
|
||||
}
|
||||
|
||||
if f.pendingTXID != 1 {
|
||||
t.Errorf("expected pendingTXID 1, got %d", f.pendingTXID)
|
||||
}
|
||||
|
||||
if f.commit != 0 {
|
||||
t.Errorf("expected commit 0 for new database, got %d", f.commit)
|
||||
}
|
||||
}
|
||||
|
||||
func TestVFSFile_NewDatabase_ReadReturnsZeros(t *testing.T) {
|
||||
// Test that reading from a new database returns zeros
|
||||
client := newWriteTestReplicaClient()
|
||||
|
||||
tmpDir := t.TempDir()
|
||||
bufferPath := tmpDir + "/.litestream-write-buffer"
|
||||
|
||||
logger := slog.Default()
|
||||
f := NewVFSFile(client, "new.db", logger)
|
||||
f.writeEnabled = true
|
||||
f.dirty = make(map[uint32]int64)
|
||||
f.syncInterval = 0
|
||||
f.bufferPath = bufferPath
|
||||
|
||||
if err := f.Open(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
// Read page 1 - should return zeros for new database
|
||||
readBuf := make([]byte, 100)
|
||||
n, err := f.ReadAt(readBuf, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("expected no error reading from new database, got: %v", err)
|
||||
}
|
||||
if n != len(readBuf) {
|
||||
t.Errorf("expected %d bytes, got %d", len(readBuf), n)
|
||||
}
|
||||
|
||||
// Verify all zeros
|
||||
for i, b := range readBuf {
|
||||
if b != 0 {
|
||||
t.Errorf("expected zero at position %d, got %d", i, b)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestVFSFile_NewDatabase_WriteAndSync(t *testing.T) {
|
||||
// Test writing to a new database and syncing to remote
|
||||
client := newWriteTestReplicaClient()
|
||||
|
||||
tmpDir := t.TempDir()
|
||||
bufferPath := tmpDir + "/.litestream-write-buffer"
|
||||
|
||||
logger := slog.Default()
|
||||
f := NewVFSFile(client, "new.db", logger)
|
||||
f.writeEnabled = true
|
||||
f.dirty = make(map[uint32]int64)
|
||||
f.syncInterval = 0
|
||||
f.bufferPath = bufferPath
|
||||
|
||||
if err := f.Open(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
// Write data to page 1
|
||||
writeData := []byte("new database content")
|
||||
n, err := f.WriteAt(writeData, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if n != len(writeData) {
|
||||
t.Errorf("expected %d bytes written, got %d", len(writeData), n)
|
||||
}
|
||||
|
||||
// Verify dirty page exists
|
||||
if len(f.dirty) != 1 {
|
||||
t.Errorf("expected 1 dirty page, got %d", len(f.dirty))
|
||||
}
|
||||
|
||||
// Sync to remote
|
||||
if err := f.Sync(0); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Verify TXID advanced
|
||||
if f.expectedTXID != 1 {
|
||||
t.Errorf("expected expectedTXID 1 after sync, got %d", f.expectedTXID)
|
||||
}
|
||||
if f.pendingTXID != 2 {
|
||||
t.Errorf("expected pendingTXID 2 after sync, got %d", f.pendingTXID)
|
||||
}
|
||||
|
||||
// Verify LTX file was written
|
||||
client.mu.Lock()
|
||||
if len(client.ltxFiles[0]) != 1 {
|
||||
t.Errorf("expected 1 LTX file after sync, got %d", len(client.ltxFiles[0]))
|
||||
}
|
||||
if len(client.ltxFiles[0]) > 0 {
|
||||
info := client.ltxFiles[0][0]
|
||||
if info.MinTXID != 1 || info.MaxTXID != 1 {
|
||||
t.Errorf("expected TXID 1, got min=%d max=%d", info.MinTXID, info.MaxTXID)
|
||||
}
|
||||
}
|
||||
client.mu.Unlock()
|
||||
}
|
||||
|
||||
func TestVFSFile_NewDatabase_FileSize(t *testing.T) {
|
||||
// Test that FileSize returns 0 for a new empty database
|
||||
client := newWriteTestReplicaClient()
|
||||
|
||||
tmpDir := t.TempDir()
|
||||
bufferPath := tmpDir + "/.litestream-write-buffer"
|
||||
|
||||
logger := slog.Default()
|
||||
f := NewVFSFile(client, "new.db", logger)
|
||||
f.writeEnabled = true
|
||||
f.dirty = make(map[uint32]int64)
|
||||
f.syncInterval = 0
|
||||
f.bufferPath = bufferPath
|
||||
|
||||
if err := f.Open(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
// FileSize should be 0 for empty database
|
||||
size, err := f.FileSize()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if size != 0 {
|
||||
t.Errorf("expected size 0 for new database, got %d", size)
|
||||
}
|
||||
|
||||
// Write a page
|
||||
data := make([]byte, DefaultPageSize)
|
||||
if _, err := f.WriteAt(data, 0); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// FileSize should now reflect the dirty page
|
||||
size, err = f.FileSize()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if size != int64(DefaultPageSize) {
|
||||
t.Errorf("expected size %d after write, got %d", DefaultPageSize, size)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user