feat(vfs): add background database hydration for improved read performance (#978)

Co-authored-by: Sprite <sprite@example.com>
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Ben Johnson
2026-01-09 14:17:32 -07:00
committed by GitHub
parent 6d2d87f1d6
commit e45a2922b9
3 changed files with 589 additions and 0 deletions

1
.gitignore vendored
View File

@@ -2,6 +2,7 @@
/src/litestream-vfs.h
/dist
.vscode
.sprite
# Claude-related files (force include despite global gitignore)
!.claude/

415
vfs.go
View File

@@ -74,6 +74,15 @@ type VFS struct {
// If empty, uses a temp file.
WriteBufferPath string
// HydrationEnabled activates background hydration of the database to a local file.
// When enabled, the VFS will restore the database in the background and serve
// reads from the local file once complete, eliminating remote fetch latency.
HydrationEnabled bool
// HydrationPath is the file path for local hydration file.
// If empty and HydrationEnabled is true, a temp file will be used.
HydrationPath string
tempDirOnce sync.Once
tempDir string
tempDirErr error
@@ -130,6 +139,21 @@ func (vfs *VFS) openMainDB(name string, flags sqlite3vfs.OpenFlag) (sqlite3vfs.F
}
}
// Initialize hydration support if enabled
if vfs.HydrationEnabled {
if vfs.HydrationPath != "" {
// Use provided path directly
f.hydrationPath = vfs.HydrationPath
} else {
// Use a temp file if no path specified
dir, err := vfs.ensureTempDir()
if err != nil {
return nil, 0, fmt.Errorf("create temp dir for hydration: %w", err)
}
f.hydrationPath = filepath.Join(dir, "hydration.db")
}
}
if err := f.Open(); err != nil {
return nil, 0, err
}
@@ -467,6 +491,9 @@ type VFSFile struct {
syncInterval time.Duration // Interval for periodic sync
inTransaction bool // True during active write transaction
hydrator *Hydrator // Background hydration (nil if disabled)
hydrationPath string // Path for hydration file (set during Open)
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
@@ -477,6 +504,279 @@ type VFSFile struct {
CacheSize int
}
// Hydrator handles background hydration of the database to a local file.
type Hydrator struct {
path string // Full path to hydration file
file *os.File // Local database file
complete atomic.Bool // True when restore completes
txid ltx.TXID // TXID the hydrated file is at
mu sync.Mutex // Protects hydration file writes
err error // Stores fatal hydration error
pageSize uint32 // Page size of the database
client ReplicaClient
logger *slog.Logger
}
// NewHydrator creates a new Hydrator instance.
func NewHydrator(path string, pageSize uint32, client ReplicaClient, logger *slog.Logger) *Hydrator {
return &Hydrator{
path: path,
pageSize: pageSize,
client: client,
logger: logger,
}
}
// Init opens or creates the hydration file.
func (h *Hydrator) Init() error {
// Create parent directory if needed
if err := os.MkdirAll(filepath.Dir(h.path), 0755); err != nil {
return fmt.Errorf("create hydration directory: %w", err)
}
// Create/truncate local database file
file, err := os.OpenFile(h.path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
if err != nil {
return fmt.Errorf("create hydration file: %w", err)
}
h.file = file
return nil
}
// Complete returns true if hydration has completed.
func (h *Hydrator) Complete() bool {
return h.complete.Load()
}
// SetComplete marks hydration as complete.
func (h *Hydrator) SetComplete() {
h.complete.Store(true)
}
// Disable temporarily disables hydrated reads (used during time travel).
func (h *Hydrator) Disable() {
h.complete.Store(false)
}
// TXID returns the current hydration TXID.
func (h *Hydrator) TXID() ltx.TXID {
h.mu.Lock()
defer h.mu.Unlock()
return h.txid
}
// SetTXID sets the hydration TXID.
func (h *Hydrator) SetTXID(txid ltx.TXID) {
h.mu.Lock()
defer h.mu.Unlock()
h.txid = txid
}
// Err returns any fatal hydration error.
func (h *Hydrator) Err() error {
h.mu.Lock()
defer h.mu.Unlock()
return h.err
}
// SetErr sets a fatal hydration error.
func (h *Hydrator) SetErr(err error) {
h.mu.Lock()
defer h.mu.Unlock()
h.err = err
}
// Restore restores the database from LTX files to the hydration file.
func (h *Hydrator) Restore(ctx context.Context, infos []*ltx.FileInfo) error {
// Open all LTX files as readers
rdrs := make([]io.Reader, 0, len(infos))
defer func() {
for _, rd := range rdrs {
if closer, ok := rd.(io.Closer); ok {
_ = closer.Close()
}
}
}()
for _, info := range infos {
h.logger.Debug("opening ltx file for hydration", "level", info.Level, "min", info.MinTXID, "max", info.MaxTXID)
rc, err := h.client.OpenLTXFile(ctx, info.Level, info.MinTXID, info.MaxTXID, 0, 0)
if err != nil {
return fmt.Errorf("open ltx file: %w", err)
}
rdrs = append(rdrs, rc)
}
if len(rdrs) == 0 {
return fmt.Errorf("no ltx files for hydration")
}
// Compact and decode using io.Pipe pattern
pr, pw := io.Pipe()
go func() {
c, err := ltx.NewCompactor(pw, rdrs)
if err != nil {
pw.CloseWithError(fmt.Errorf("new ltx compactor: %w", err))
return
}
c.HeaderFlags = ltx.HeaderFlagNoChecksum
_ = pw.CloseWithError(c.Compact(ctx))
}()
h.mu.Lock()
defer h.mu.Unlock()
dec := ltx.NewDecoder(pr)
if err := dec.DecodeDatabaseTo(h.file); err != nil {
return fmt.Errorf("decode database: %w", err)
}
h.txid = infos[len(infos)-1].MaxTXID
return nil
}
// CatchUp applies updates from LTX files between fromTXID and toTXID.
func (h *Hydrator) CatchUp(ctx context.Context, fromTXID, toTXID ltx.TXID) error {
h.logger.Debug("catching up hydration", "from", fromTXID, "to", toTXID)
// Fetch LTX files from fromTXID+1 to toTXID
itr, err := h.client.LTXFiles(ctx, 0, fromTXID+1, false)
if err != nil {
return fmt.Errorf("list ltx files for catch-up: %w", err)
}
defer itr.Close()
for itr.Next() {
info := itr.Item()
if info.MaxTXID > toTXID {
break
}
if err := h.ApplyLTX(ctx, info); err != nil {
return fmt.Errorf("apply ltx to hydrated file: %w", err)
}
h.mu.Lock()
h.txid = info.MaxTXID
h.mu.Unlock()
}
return nil
}
// ApplyLTX fetches an entire LTX file and applies its pages to the hydration file.
func (h *Hydrator) ApplyLTX(ctx context.Context, info *ltx.FileInfo) error {
h.logger.Debug("applying ltx to hydration file", "level", info.Level, "min", info.MinTXID, "max", info.MaxTXID)
// Fetch entire LTX file
rc, err := h.client.OpenLTXFile(ctx, info.Level, info.MinTXID, info.MaxTXID, 0, 0)
if err != nil {
return fmt.Errorf("open ltx file: %w", err)
}
defer rc.Close()
dec := ltx.NewDecoder(rc)
if err := dec.DecodeHeader(); err != nil {
return fmt.Errorf("decode header: %w", err)
}
h.mu.Lock()
defer h.mu.Unlock()
// Apply each page to the hydration file
for {
var phdr ltx.PageHeader
data := make([]byte, h.pageSize)
if err := dec.DecodePage(&phdr, data); err == io.EOF {
break
} else if err != nil {
return fmt.Errorf("decode page: %w", err)
}
off := int64(phdr.Pgno-1) * int64(h.pageSize)
if _, err := h.file.WriteAt(data, off); err != nil {
return fmt.Errorf("write page %d: %w", phdr.Pgno, err)
}
}
return nil
}
// ReadAt reads data from the hydrated local file.
func (h *Hydrator) ReadAt(p []byte, off int64) (int, error) {
h.mu.Lock()
n, err := h.file.ReadAt(p, off)
h.mu.Unlock()
if err != nil && err != io.EOF {
return n, fmt.Errorf("read hydrated file: %w", err)
}
// Update the first page to pretend like we are in journal mode
if off == 0 && len(p) >= 28 {
p[18], p[19] = 0x01, 0x01
_, _ = rand.Read(p[24:28])
}
return n, nil
}
// ApplyUpdates fetches updated pages and writes them to the hydration file.
func (h *Hydrator) ApplyUpdates(ctx context.Context, updates map[uint32]ltx.PageIndexElem) error {
h.mu.Lock()
defer h.mu.Unlock()
for pgno, elem := range updates {
_, data, err := FetchPage(ctx, h.client, elem.Level, elem.MinTXID, elem.MaxTXID, elem.Offset, elem.Size)
if err != nil {
return fmt.Errorf("fetch updated page %d: %w", pgno, err)
}
off := int64(pgno-1) * int64(h.pageSize)
if _, err := h.file.WriteAt(data, off); err != nil {
return fmt.Errorf("write updated page %d: %w", pgno, err)
}
}
return nil
}
// WritePage writes a single page to the hydration file.
func (h *Hydrator) WritePage(pgno uint32, data []byte) error {
h.mu.Lock()
defer h.mu.Unlock()
off := int64(pgno-1) * int64(h.pageSize)
if _, err := h.file.WriteAt(data, off); err != nil {
return fmt.Errorf("write page %d to hydrated file: %w", pgno, err)
}
return nil
}
// Truncate truncates the hydration file to the specified size.
func (h *Hydrator) Truncate(size int64) error {
h.mu.Lock()
defer h.mu.Unlock()
return h.file.Truncate(size)
}
// Close closes and removes the hydration file.
func (h *Hydrator) Close() error {
if h.file == nil {
return nil
}
if err := h.file.Close(); err != nil {
return err
}
if err := os.Remove(h.path); err != nil && !os.IsNotExist(err) {
return err
}
return nil
}
func NewVFSFile(client ReplicaClient, name string, logger *slog.Logger) *VFSFile {
f := &VFSFile{
client: client,
@@ -601,6 +901,14 @@ func (f *VFSFile) Open() error {
return fmt.Errorf("cannot build index: %w", err)
}
// Start background hydration if enabled
if f.hydrationPath != "" {
if err := f.initHydration(infos); err != nil {
f.logger.Warn("hydration initialization failed, continuing without hydration", "error", err)
f.hydrationPath = ""
}
}
// Continuously monitor the replica client for new LTX files.
f.wg.Add(1)
go func() { defer f.wg.Done(); f.monitorReplicaClient(f.ctx) }()
@@ -677,6 +985,12 @@ func (f *VFSFile) SetTargetTime(ctx context.Context, timestamp time.Time) error
return fmt.Errorf("no backup files available")
}
// Disable hydrated reads during time travel - hydrated file is at latest state
if f.hydrator != nil && f.hydrator.Complete() {
f.hydrator.Disable()
f.logger.Debug("hydration disabled for time travel", "target", timestamp)
}
return f.rebuildIndex(ctx, infos, &timestamp)
}
@@ -780,6 +1094,70 @@ func (f *VFSFile) buildIndex(ctx context.Context, infos []*ltx.FileInfo) error {
return f.rebuildIndex(ctx, infos, nil)
}
// initHydration starts the background hydration process.
func (f *VFSFile) initHydration(infos []*ltx.FileInfo) error {
f.hydrator = NewHydrator(f.hydrationPath, f.pageSize, f.client, f.logger)
if err := f.hydrator.Init(); err != nil {
return err
}
// Start background restore
f.wg.Add(1)
go f.runHydration(infos)
return nil
}
// runHydration performs the background hydration process.
func (f *VFSFile) runHydration(infos []*ltx.FileInfo) {
defer f.wg.Done()
if err := f.hydrator.Restore(f.ctx, infos); err != nil {
f.hydrator.SetErr(err)
f.logger.Error("hydration failed", "error", err)
return
}
// Check if we need to catch up with polling
f.mu.Lock()
currentTXID := f.pos.TXID
f.mu.Unlock()
hydrationTXID := f.hydrator.TXID()
if currentTXID > hydrationTXID {
if err := f.hydrator.CatchUp(f.ctx, hydrationTXID, currentTXID); err != nil {
f.hydrator.SetErr(err)
f.logger.Error("hydration catch-up failed", "error", err)
return
}
}
f.hydrator.SetComplete()
// Clear cache since we'll now read from hydration file
f.cache.Purge()
f.logger.Info("hydration complete", "path", f.hydrationPath, "txid", f.hydrator.TXID().String())
}
// applySyncedPagesToHydratedFile writes synced dirty pages to the hydrated file.
// Must be called with f.mu held.
func (f *VFSFile) applySyncedPagesToHydratedFile() error {
for pgno, bufferOff := range f.dirty {
data := make([]byte, f.pageSize)
if _, err := f.bufferFile.ReadAt(data, bufferOff); err != nil {
return fmt.Errorf("read dirty page %d from buffer: %w", pgno, err)
}
if err := f.hydrator.WritePage(pgno, data); err != nil {
return err
}
}
f.hydrator.SetTXID(f.expectedTXID)
return nil
}
func (f *VFSFile) Close() error {
f.logger.Debug("closing file")
@@ -803,6 +1181,13 @@ func (f *VFSFile) Close() error {
f.bufferFile.Close()
}
// Close and remove hydration file
if f.hydrator != nil {
if err := f.hydrator.Close(); err != nil {
f.logger.Warn("failed to close hydration file", "error", err)
}
}
return nil
}
@@ -841,6 +1226,11 @@ func (f *VFSFile) ReadAt(p []byte, off int64) (n int, err error) {
f.mu.Unlock()
}
// If hydration complete, read from local file
if f.hydrator != nil && f.hydrator.Complete() {
return f.hydrator.ReadAt(p, off)
}
// Check cache (cache is thread-safe)
if data, ok := f.cache.Get(pgno); ok {
n = copy(p, data[pageOffset:])
@@ -1037,6 +1427,15 @@ func (f *VFSFile) Truncate(size int64) error {
f.commit = newCommit
f.logger.Debug("truncated", "newCommit", newCommit)
// Truncate hydrated file if hydration is complete
if f.hydrator != nil && f.hydrator.Complete() {
if err := f.hydrator.Truncate(size); err != nil {
f.logger.Error("failed to truncate hydration file", "error", err)
// Don't fail the operation - continue with degraded performance
}
}
return nil
}
@@ -1126,6 +1525,15 @@ func (f *VFSFile) syncToRemote() error {
f.cache.Add(pgno, cachedData)
}
// Apply synced pages to hydrated file if hydration is complete
// Must be done before clearing f.dirty since we need the page offsets
if f.hydrator != nil && f.hydrator.Complete() {
if err := f.applySyncedPagesToHydratedFile(); err != nil {
f.logger.Error("failed to apply synced pages to hydrated file", "error", err)
// Don't fail the sync - hydration will catch up on next poll
}
}
// Clear dirty pages
f.dirty = make(map[uint32]int64)
@@ -1676,6 +2084,13 @@ func (f *VFSFile) pollReplicaClient(ctx context.Context) error {
f.maxTXID1 = maxTXID1
f.logger.Debug("txid updated", "txid", f.pos.TXID.String(), "maxTXID1", f.maxTXID1.String())
// Apply updates to hydrated file if hydration is complete
if f.hydrator != nil && f.hydrator.Complete() && len(combined) > 0 {
if err := f.hydrator.ApplyUpdates(f.ctx, combined); err != nil {
f.logger.Error("failed to apply updates to hydrated file", "error", err)
}
}
return nil
}

View File

@@ -1043,3 +1043,176 @@ func buildLTXFixtureWithPages(tb testing.TB, txid ltx.TXID, pageSize uint32, pgn
return &ltxFixture{info: info, data: buf.Bytes()}
}
// TestVFSFile_Hydration_Basic tests that hydration completes and reads from local file.
func TestVFSFile_Hydration_Basic(t *testing.T) {
client := newMockReplicaClient()
client.addFixture(t, buildLTXFixture(t, 1, 'a'))
// Create temp directory for hydration
hydrationDir := t.TempDir()
// Create VFSFile with hydration enabled
f := NewVFSFile(client, "test.db", slog.Default())
f.hydrationPath = filepath.Join(hydrationDir, "test.db.hydration.db")
f.PollInterval = 100 * time.Millisecond
if err := f.Open(); err != nil {
t.Fatalf("open vfs file: %v", err)
}
defer f.Close()
// Wait for hydration to complete
deadline := time.Now().Add(5 * time.Second)
for f.hydrator == nil || !f.hydrator.Complete() {
if time.Now().After(deadline) {
t.Fatalf("hydration did not complete in time")
}
time.Sleep(10 * time.Millisecond)
}
// Verify hydration file exists
if _, err := os.Stat(f.hydrationPath); err != nil {
t.Fatalf("hydration file not found: %v", err)
}
// Read a page - should come from hydrated file
buf := make([]byte, 4096)
if _, err := f.ReadAt(buf, 0); err != nil {
t.Fatalf("read at: %v", err)
}
// Check that the data matches (excluding modified header bytes)
for i := 28; i < len(buf); i++ {
if buf[i] != 'a' {
t.Fatalf("expected byte 'a' at position %d, got %q", i, buf[i])
}
}
}
// TestVFSFile_Hydration_ReadsDuringHydration tests that reads work via cache/remote during hydration.
func TestVFSFile_Hydration_ReadsDuringHydration(t *testing.T) {
client := newMockReplicaClient()
client.addFixture(t, buildLTXFixture(t, 1, 'b'))
hydrationDir := t.TempDir()
f := NewVFSFile(client, "test.db", slog.Default())
f.hydrationPath = filepath.Join(hydrationDir, "test.db.hydration.db")
f.PollInterval = 100 * time.Millisecond
if err := f.Open(); err != nil {
t.Fatalf("open vfs file: %v", err)
}
defer f.Close()
// Read immediately - should work even if hydration is still in progress
buf := make([]byte, 4096)
if _, err := f.ReadAt(buf, 0); err != nil {
t.Fatalf("read at during hydration: %v", err)
}
// Data should be correct regardless of hydration status
for i := 28; i < len(buf); i++ {
if buf[i] != 'b' {
t.Fatalf("expected byte 'b' at position %d, got %q", i, buf[i])
}
}
}
// TestVFSFile_Hydration_CloseEarly tests clean shutdown during hydration.
func TestVFSFile_Hydration_CloseEarly(t *testing.T) {
client := newMockReplicaClient()
client.addFixture(t, buildLTXFixture(t, 1, 'c'))
hydrationDir := t.TempDir()
f := NewVFSFile(client, "test.db", slog.Default())
f.hydrationPath = filepath.Join(hydrationDir, "test.db.hydration.db")
f.PollInterval = 100 * time.Millisecond
if err := f.Open(); err != nil {
t.Fatalf("open vfs file: %v", err)
}
// Close immediately without waiting for hydration
if err := f.Close(); err != nil {
t.Fatalf("close: %v", err)
}
// Hydration file should be removed
if _, err := os.Stat(f.hydrationPath); !os.IsNotExist(err) {
t.Fatalf("hydration file should be removed after close")
}
}
// TestVFSFile_Hydration_Disabled tests that hydration has no effect when disabled.
func TestVFSFile_Hydration_Disabled(t *testing.T) {
client := newMockReplicaClient()
client.addFixture(t, buildLTXFixture(t, 1, 'd'))
f := NewVFSFile(client, "test.db", slog.Default())
// hydrationPath is empty by default (hydration disabled)
f.PollInterval = 100 * time.Millisecond
if err := f.Open(); err != nil {
t.Fatalf("open vfs file: %v", err)
}
defer f.Close()
// Hydrator should be nil when hydration is disabled
if f.hydrator != nil {
t.Fatalf("hydrator should be nil when disabled")
}
// Reads should still work via cache/remote
buf := make([]byte, 4096)
if _, err := f.ReadAt(buf, 0); err != nil {
t.Fatalf("read at: %v", err)
}
}
// TestVFSFile_Hydration_IncrementalUpdates tests that new LTX files are applied to hydrated file.
func TestVFSFile_Hydration_IncrementalUpdates(t *testing.T) {
client := newMockReplicaClient()
client.addFixture(t, buildLTXFixture(t, 1, 'e'))
hydrationDir := t.TempDir()
f := NewVFSFile(client, "test.db", slog.Default())
f.hydrationPath = filepath.Join(hydrationDir, "test.db.hydration.db")
f.PollInterval = 50 * time.Millisecond
if err := f.Open(); err != nil {
t.Fatalf("open vfs file: %v", err)
}
defer f.Close()
// Wait for hydration to complete
deadline := time.Now().Add(5 * time.Second)
for f.hydrator == nil || !f.hydrator.Complete() {
if time.Now().After(deadline) {
t.Fatalf("hydration did not complete in time")
}
time.Sleep(10 * time.Millisecond)
}
// Add a new LTX file
client.addFixture(t, buildLTXFixture(t, 2, 'f'))
// Wait for poll to pick up the update
time.Sleep(200 * time.Millisecond)
// Read the page - should have updated data
buf := make([]byte, 4096)
if _, err := f.ReadAt(buf, 0); err != nil {
t.Fatalf("read at: %v", err)
}
// Data should be updated (excluding header bytes)
for i := 28; i < len(buf); i++ {
if buf[i] != 'f' {
t.Fatalf("expected byte 'f' at position %d, got %q", i, buf[i])
}
}
}