From b65e4fe4873867ae1c4329d1349f070f711c6aa0 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Fri, 9 Jan 2026 16:13:52 -0600 Subject: [PATCH] fix: improve error recovery for missing/corrupted LTX files (#963) Co-authored-by: Claude Opus 4.5 --- cmd/litestream/main.go | 16 +++++ cmd/litestream/reset.go | 134 ++++++++++++++++++++++++++++++++++++++++ db.go | 34 +++++++++- db_test.go | 51 +++++++++++++++ file/replica_client.go | 8 ++- litestream.go | 44 +++++++++++++ litestream_test.go | 67 ++++++++++++++++++++ replica.go | 56 ++++++++++++++--- replica_client_test.go | 9 +-- 9 files changed, 403 insertions(+), 16 deletions(-) create mode 100644 cmd/litestream/reset.go diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index 2a1cbf0..28177dc 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -157,6 +157,8 @@ func (m *Main) Run(ctx context.Context, args []string) (err error) { slog.Info("litestream shut down") return err + case "reset": + return (&ResetCommand{}).Run(ctx, args) case "restore": return (&RestoreCommand{}).Run(ctx, args) case "status": @@ -192,6 +194,7 @@ The commands are: databases list databases specified in config file ltx list available LTX files for a database replicate runs a server to replicate databases + reset reset local state for a database restore recovers database backup from a replica status display replication status for databases version prints the binary version @@ -920,6 +923,11 @@ type ReplicaSettings struct { SyncInterval *time.Duration `yaml:"sync-interval"` ValidationInterval *time.Duration `yaml:"validation-interval"` + // If true, automatically reset local state when LTX errors are detected. + // This allows recovery from corrupted/missing LTX files by forcing a fresh sync. + // Disabled by default to prevent silent data loss scenarios. + AutoRecover *bool `yaml:"auto-recover"` + // S3 settings AccessKeyID string `yaml:"access-key-id"` SecretAccessKey string `yaml:"secret-access-key"` @@ -995,6 +1003,11 @@ func (rs *ReplicaSettings) SetDefaults(src *ReplicaSettings) { rs.ValidationInterval = src.ValidationInterval } + // Recovery settings + if rs.AutoRecover == nil && src.AutoRecover != nil { + rs.AutoRecover = src.AutoRecover + } + // S3 settings if rs.AccessKeyID == "" { rs.AccessKeyID = src.AccessKeyID @@ -1144,6 +1157,9 @@ func NewReplicaFromConfig(c *ReplicaConfig, db *litestream.DB) (_ *litestream.Re if v := c.SyncInterval; v != nil { r.SyncInterval = *v } + if v := c.AutoRecover; v != nil { + r.AutoRecoverEnabled = *v + } // Build and set client on replica. switch c.ReplicaType() { diff --git a/cmd/litestream/reset.go b/cmd/litestream/reset.go new file mode 100644 index 0000000..c63631d --- /dev/null +++ b/cmd/litestream/reset.go @@ -0,0 +1,134 @@ +package main + +import ( + "context" + "flag" + "fmt" + "os" + "path/filepath" + + "github.com/benbjohnson/litestream" +) + +// ResetCommand is a command for resetting local Litestream state for a database. +type ResetCommand struct{} + +// Run executes the command. +func (c *ResetCommand) Run(ctx context.Context, args []string) (err error) { + fs := flag.NewFlagSet("litestream-reset", flag.ContinueOnError) + configPath, noExpandEnv := registerConfigFlag(fs) + fs.Usage = c.Usage + if err := fs.Parse(args); err != nil { + return err + } + + // Validate arguments - need exactly one database path + if fs.NArg() == 0 { + return fmt.Errorf("database path required") + } else if fs.NArg() > 1 { + return fmt.Errorf("too many arguments") + } + + dbPath := fs.Arg(0) + + // Make absolute if needed + if !filepath.IsAbs(dbPath) { + if dbPath, err = filepath.Abs(dbPath); err != nil { + return err + } + } + + // Load configuration to find the database (if config exists) + var dbConfig *DBConfig + if *configPath != "" { + config, configErr := ReadConfigFile(*configPath, !*noExpandEnv) + if configErr != nil { + return fmt.Errorf("cannot read config: %w", configErr) + } + + // Find database config + for _, dbc := range config.DBs { + expandedPath := dbc.Path + if !filepath.IsAbs(expandedPath) { + expandedPath, _ = filepath.Abs(expandedPath) + } + if expandedPath == dbPath { + dbConfig = dbc + break + } + } + } + + // If no config found, check if database file exists + if _, err := os.Stat(dbPath); os.IsNotExist(err) { + return fmt.Errorf("database does not exist: %s", dbPath) + } else if err != nil { + return fmt.Errorf("cannot access database: %w", err) + } + + // Create DB instance + var db *litestream.DB + if dbConfig != nil { + db, err = NewDBFromConfig(dbConfig) + if err != nil { + return fmt.Errorf("cannot create database from config: %w", err) + } + } else { + db = litestream.NewDB(dbPath) + } + + // Check if meta path exists + metaPath := db.MetaPath() + if _, err := os.Stat(metaPath); os.IsNotExist(err) { + fmt.Printf("No local state to reset for %s\n", dbPath) + fmt.Printf("Meta directory does not exist: %s\n", metaPath) + return nil + } + + // Perform the reset + fmt.Printf("Resetting local Litestream state for: %s\n", dbPath) + fmt.Printf("Removing: %s\n", db.LTXDir()) + + if err := db.ResetLocalState(ctx); err != nil { + return fmt.Errorf("reset failed: %w", err) + } + + fmt.Println("Reset complete. Next replication sync will create a fresh snapshot.") + return nil +} + +// Usage prints the help screen to STDOUT. +func (c *ResetCommand) Usage() { + fmt.Printf(` +The reset command clears local Litestream state for a database. + +This is useful for recovering from corrupted or missing LTX files. The reset +removes local LTX files from the metadata directory, forcing Litestream to +create a fresh snapshot on the next sync. The database file itself is not +modified. + +Usage: + + litestream reset [arguments] + +Arguments: + + -config PATH + Specifies the configuration file. + Defaults to %s + + -no-expand-env + Disables environment variable expansion in configuration file. + +Examples: + + # Reset local state for a specific database + litestream reset /path/to/database.db + + # Reset using a specific configuration file + litestream reset -config /etc/litestream.yml /path/to/database.db + +`[1:], + DefaultConfigPath(), + ) +} diff --git a/db.go b/db.go index 7903b56..5cae409 100644 --- a/db.go +++ b/db.go @@ -253,6 +253,28 @@ func (db *DB) LTXDir() string { return filepath.Join(db.metaPath, "ltx") } +// ResetLocalState removes local LTX files, forcing a fresh snapshot on next sync. +// This is useful for recovering from corrupted or missing LTX files. +// The database file itself is not modified. +func (db *DB) ResetLocalState(ctx context.Context) error { + db.Logger.Info("resetting local litestream state", + "meta_path", db.metaPath, + "ltx_dir", db.LTXDir()) + + // Remove all LTX files + if err := os.RemoveAll(db.LTXDir()); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("remove ltx directory: %w", err) + } + + // Clear cached LTX file info + db.maxLTXFileInfos.Lock() + db.maxLTXFileInfos.m = make(map[int]*ltx.FileInfo) + db.maxLTXFileInfos.Unlock() + + db.Logger.Info("local state reset complete, next sync will create fresh snapshot") + return nil +} + // LTXLevelDir returns path of the given LTX compaction level. // Panics if level is negative. func (db *DB) LTXLevelDir(level int) string { @@ -1041,15 +1063,21 @@ func (db *DB) verify(ctx context.Context) (info syncInfo, err error) { } // Determine last WAL offset we save from. - ltxFile, err := os.Open(db.LTXPath(0, pos.TXID, pos.TXID)) + ltxPath := db.LTXPath(0, pos.TXID, pos.TXID) + ltxFile, err := os.Open(ltxPath) if err != nil { - return info, fmt.Errorf("open ltx file: %w", err) + if os.IsNotExist(err) { + return info, NewLTXError("open", ltxPath, 0, uint64(pos.TXID), uint64(pos.TXID), err) + } + return info, fmt.Errorf("open ltx file %s: %w", ltxPath, err) } defer func() { _ = ltxFile.Close() }() dec := ltx.NewDecoder(ltxFile) if err := dec.DecodeHeader(); err != nil { - return info, fmt.Errorf("decode ltx file: %w", err) + // Decode failure indicates corruption + ltxErr := NewLTXError("decode", ltxPath, 0, uint64(pos.TXID), uint64(pos.TXID), fmt.Errorf("%w: %w", ErrLTXCorrupted, err)) + return info, ltxErr } info.offset = dec.Header().WALOffset + dec.Header().WALSize info.salt1 = dec.Header().WALSalt1 diff --git a/db_test.go b/db_test.go index 28f0689..0c54676 100644 --- a/db_test.go +++ b/db_test.go @@ -1209,3 +1209,54 @@ func TestDB_DelayedCheckpointAfterWrite(t *testing.T) { posAfterInsert.TXID, posAfterDelayedCheckpoint.TXID) } } + +// TestDB_ResetLocalState verifies that ResetLocalState clears the LTX directory. +func TestDB_ResetLocalState(t *testing.T) { + db, sqldb := testingutil.MustOpenDBs(t) + defer testingutil.MustCloseDBs(t, db, sqldb) + + // Create table and insert some data to create LTX files + if _, err := sqldb.Exec(`CREATE TABLE t (x TEXT)`); err != nil { + t.Fatal(err) + } + if _, err := sqldb.Exec(`INSERT INTO t (x) VALUES ('foo')`); err != nil { + t.Fatal(err) + } + if err := db.Sync(t.Context()); err != nil { + t.Fatal(err) + } + + // Verify LTX directory exists and has files + ltxDir := db.LTXDir() + if _, err := os.Stat(ltxDir); os.IsNotExist(err) { + t.Fatal("LTX directory should exist after sync") + } + + // Get position before reset + posBefore, err := db.Pos() + if err != nil { + t.Fatal(err) + } + if posBefore.TXID == 0 { + t.Fatal("expected non-zero TXID before reset") + } + + // Reset local state + if err := db.ResetLocalState(t.Context()); err != nil { + t.Fatal(err) + } + + // Verify LTX directory is gone + if _, err := os.Stat(ltxDir); !os.IsNotExist(err) { + t.Fatal("LTX directory should not exist after reset") + } + + // Get position after reset - should be zero since no LTX files + posAfter, err := db.Pos() + if err != nil { + t.Fatal(err) + } + if posAfter.TXID != 0 { + t.Fatalf("expected zero TXID after reset, got %d", posAfter.TXID) + } +} diff --git a/file/replica_client.go b/file/replica_client.go index 3f04d4d..2365401 100644 --- a/file/replica_client.go +++ b/file/replica_client.go @@ -127,9 +127,13 @@ func (c *ReplicaClient) LTXFiles(ctx context.Context, level int, seek ltx.TXID, // OpenLTXFile returns a reader for an LTX file at the given position. // Returns os.ErrNotExist if no matching index/offset is found. func (c *ReplicaClient) OpenLTXFile(ctx context.Context, level int, minTXID, maxTXID ltx.TXID, offset, size int64) (io.ReadCloser, error) { - f, err := os.Open(c.LTXFilePath(level, minTXID, maxTXID)) + path := c.LTXFilePath(level, minTXID, maxTXID) + f, err := os.Open(path) if err != nil { - return nil, err + if os.IsNotExist(err) { + return nil, litestream.NewLTXError("open", path, level, uint64(minTXID), uint64(maxTXID), err) + } + return nil, fmt.Errorf("open ltx file %s: %w", path, err) } if offset > 0 { diff --git a/litestream.go b/litestream.go index b27de3c..337056c 100644 --- a/litestream.go +++ b/litestream.go @@ -32,8 +32,52 @@ const ( var ( ErrNoSnapshots = errors.New("no snapshots available") ErrChecksumMismatch = errors.New("invalid replica, checksum mismatch") + ErrLTXCorrupted = errors.New("ltx file corrupted") + ErrLTXMissing = errors.New("ltx file missing") ) +// LTXError provides detailed context for LTX file errors with recovery hints. +type LTXError struct { + Op string // Operation that failed (e.g., "open", "read", "validate") + Path string // File path + Level int // LTX level (0 = L0, etc.) + MinTXID uint64 // Minimum transaction ID + MaxTXID uint64 // Maximum transaction ID + Err error // Underlying error + Hint string // Recovery hint for users +} + +func (e *LTXError) Error() string { + if e.Path != "" { + return e.Op + " ltx file " + e.Path + ": " + e.Err.Error() + } + return e.Op + " ltx file: " + e.Err.Error() +} + +func (e *LTXError) Unwrap() error { return e.Err } + +// NewLTXError creates a new LTX error with appropriate hints based on the error type. +func NewLTXError(op, path string, level int, minTXID, maxTXID uint64, err error) *LTXError { + ltxErr := <XError{ + Op: op, + Path: path, + Level: level, + MinTXID: minTXID, + MaxTXID: maxTXID, + Err: err, + } + + // Set appropriate hint based on error type + if os.IsNotExist(err) || errors.Is(err, ErrLTXMissing) { + ltxErr.Hint = "LTX file is missing. This can happen after VACUUM, manual checkpoint, or state corruption. " + + "Run 'litestream reset ' or delete the .sqlite-litestream directory and restart." + } else if errors.Is(err, ErrLTXCorrupted) || errors.Is(err, ErrChecksumMismatch) { + ltxErr.Hint = "LTX file is corrupted. Delete the .sqlite-litestream directory and restart to recover from replica." + } + + return ltxErr +} + // SQLite WAL constants. const ( WALHeaderChecksumOffset = 24 diff --git a/litestream_test.go b/litestream_test.go index fe6d089..1043aca 100644 --- a/litestream_test.go +++ b/litestream_test.go @@ -3,6 +3,9 @@ package litestream_test import ( "encoding/binary" "encoding/hex" + "errors" + "os" + "strings" "testing" "github.com/superfly/ltx" @@ -67,3 +70,67 @@ func MustDecodeHexString(s string) []byte { } return b } + +func TestNewLTXError(t *testing.T) { + t.Run("MissingFile", func(t *testing.T) { + err := litestream.NewLTXError("open", "/path/to/file.ltx", 0, 1, 1, os.ErrNotExist) + if err.Hint == "" { + t.Fatal("expected hint for missing file error") + } + if !strings.Contains(err.Hint, "missing") { + t.Errorf("hint should mention missing file, got: %s", err.Hint) + } + if !strings.Contains(err.Hint, "litestream reset") { + t.Errorf("hint should mention reset command, got: %s", err.Hint) + } + }) + + t.Run("CorruptedFile", func(t *testing.T) { + err := litestream.NewLTXError("decode", "/path/to/file.ltx", 0, 1, 1, litestream.ErrLTXCorrupted) + if err.Hint == "" { + t.Fatal("expected hint for corrupted file error") + } + if !strings.Contains(err.Hint, "corrupted") { + t.Errorf("hint should mention corruption, got: %s", err.Hint) + } + }) + + t.Run("ChecksumMismatch", func(t *testing.T) { + err := litestream.NewLTXError("validate", "/path/to/file.ltx", 0, 1, 1, litestream.ErrChecksumMismatch) + if err.Hint == "" { + t.Fatal("expected hint for checksum mismatch error") + } + }) + + t.Run("ErrorString", func(t *testing.T) { + err := litestream.NewLTXError("open", "/path/to/file.ltx", 0, 1, 1, os.ErrNotExist) + errStr := err.Error() + if !strings.Contains(errStr, "open") { + t.Errorf("error should contain operation, got: %s", errStr) + } + if !strings.Contains(errStr, "/path/to/file.ltx") { + t.Errorf("error should contain path, got: %s", errStr) + } + }) + + t.Run("Unwrap", func(t *testing.T) { + underlying := errors.New("underlying error") + err := litestream.NewLTXError("read", "/path/to/file.ltx", 0, 1, 1, underlying) + if !errors.Is(err, underlying) { + t.Error("LTXError should unwrap to underlying error") + } + }) +} + +func TestLTXErrorHints(t *testing.T) { + // Test that ErrLTXMissing also triggers appropriate hints + t.Run("ErrLTXMissing", func(t *testing.T) { + err := litestream.NewLTXError("open", "/path/to/file.ltx", 0, 1, 1, litestream.ErrLTXMissing) + if err.Hint == "" { + t.Fatal("expected hint for ErrLTXMissing") + } + if !strings.Contains(err.Hint, "litestream reset") { + t.Errorf("hint should mention reset command, got: %s", err.Hint) + } + }) +} diff --git a/replica.go b/replica.go index 3ca4848..2b18f97 100644 --- a/replica.go +++ b/replica.go @@ -47,6 +47,12 @@ type Replica struct { // If true, replica monitors database for changes automatically. // Set to false if replica is being used synchronously (such as in tests). MonitorEnabled bool + + // If true, automatically reset local state when LTX errors are detected. + // This allows recovery from corrupted/missing LTX files by resetting + // the position file and removing local LTX files, forcing a fresh sync. + // Disabled by default to prevent silent data loss scenarios. + AutoRecoverEnabled bool } func NewReplica(db *DB) *Replica { @@ -363,13 +369,49 @@ func (r *Replica) monitor(ctx context.Context) { } } - // Log with rate limiting to avoid log spam during persistent errors. - if time.Since(lastLogTime) >= SyncErrorLogInterval { - r.Logger().Error("monitor error", - "error", err, - "consecutive_errors", consecutiveErrs, - "backoff", backoff) - lastLogTime = time.Now() + // Check for LTX errors and include recovery hints + var ltxErr *LTXError + if errors.As(err, <xErr) { + // Log with rate limiting to avoid log spam during persistent errors. + if time.Since(lastLogTime) >= SyncErrorLogInterval { + if ltxErr.Hint != "" { + r.Logger().Error("monitor error", + "error", err, + "path", ltxErr.Path, + "hint", ltxErr.Hint, + "consecutive_errors", consecutiveErrs, + "backoff", backoff) + } else { + r.Logger().Error("monitor error", + "error", err, + "path", ltxErr.Path, + "consecutive_errors", consecutiveErrs, + "backoff", backoff) + } + lastLogTime = time.Now() + } + + // Attempt auto-recovery if enabled + if r.AutoRecoverEnabled { + r.Logger().Warn("auto-recovery enabled, resetting local state") + if resetErr := r.db.ResetLocalState(ctx); resetErr != nil { + r.Logger().Error("auto-recovery failed", "error", resetErr) + } else { + r.Logger().Info("auto-recovery complete, resuming replication") + // Reset backoff after successful recovery + backoff = 0 + consecutiveErrs = 0 + } + } + } else { + // Log with rate limiting to avoid log spam during persistent errors. + if time.Since(lastLogTime) >= SyncErrorLogInterval { + r.Logger().Error("monitor error", + "error", err, + "consecutive_errors", consecutiveErrs, + "backoff", backoff) + lastLogTime = time.Now() + } } } continue diff --git a/replica_client_test.go b/replica_client_test.go index 06aa042..507719d 100644 --- a/replica_client_test.go +++ b/replica_client_test.go @@ -3,6 +3,7 @@ package litestream_test import ( "bytes" "context" + "errors" "fmt" "io" "log/slog" @@ -191,7 +192,7 @@ func TestReplicaClient_OpenLTXFile(t *testing.T) { t.Helper() t.Parallel() - if _, err := c.OpenLTXFile(context.Background(), 0, ltx.TXID(1), ltx.TXID(1), 0, 0); !os.IsNotExist(err) { + if _, err := c.OpenLTXFile(context.Background(), 0, ltx.TXID(1), ltx.TXID(1), 0, 0); !errors.Is(err, os.ErrNotExist) { t.Fatalf("expected not exist, got %#v", err) } }) @@ -216,10 +217,10 @@ func TestReplicaClient_DeleteWALSegments(t *testing.T) { t.Fatal(err) } - if _, err := c.OpenLTXFile(context.Background(), 0, ltx.TXID(1), ltx.TXID(2), 0, 0); !os.IsNotExist(err) { + if _, err := c.OpenLTXFile(context.Background(), 0, ltx.TXID(1), ltx.TXID(2), 0, 0); !errors.Is(err, os.ErrNotExist) { t.Fatalf("expected not exist, got %#v", err) } - if _, err := c.OpenLTXFile(context.Background(), 0, ltx.TXID(3), ltx.TXID(4), 0, 0); !os.IsNotExist(err) { + if _, err := c.OpenLTXFile(context.Background(), 0, ltx.TXID(3), ltx.TXID(4), 0, 0); !errors.Is(err, os.ErrNotExist) { t.Fatalf("expected not exist, got %#v", err) } }) @@ -388,7 +389,7 @@ func TestReplicaClient_S3_ErrorContext(t *testing.T) { } // Should return os.ErrNotExist for S3 NoSuchKey - if !os.IsNotExist(err) { + if !errors.Is(err, os.ErrNotExist) { t.Errorf("expected os.ErrNotExist, got %v", err) } })