mirror of
https://github.com/benbjohnson/litestream.git
synced 2026-01-25 05:06:30 +00:00
fix: improve error recovery for missing/corrupted LTX files (#963)
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -157,6 +157,8 @@ func (m *Main) Run(ctx context.Context, args []string) (err error) {
|
|||||||
slog.Info("litestream shut down")
|
slog.Info("litestream shut down")
|
||||||
return err
|
return err
|
||||||
|
|
||||||
|
case "reset":
|
||||||
|
return (&ResetCommand{}).Run(ctx, args)
|
||||||
case "restore":
|
case "restore":
|
||||||
return (&RestoreCommand{}).Run(ctx, args)
|
return (&RestoreCommand{}).Run(ctx, args)
|
||||||
case "status":
|
case "status":
|
||||||
@@ -192,6 +194,7 @@ The commands are:
|
|||||||
databases list databases specified in config file
|
databases list databases specified in config file
|
||||||
ltx list available LTX files for a database
|
ltx list available LTX files for a database
|
||||||
replicate runs a server to replicate databases
|
replicate runs a server to replicate databases
|
||||||
|
reset reset local state for a database
|
||||||
restore recovers database backup from a replica
|
restore recovers database backup from a replica
|
||||||
status display replication status for databases
|
status display replication status for databases
|
||||||
version prints the binary version
|
version prints the binary version
|
||||||
@@ -920,6 +923,11 @@ type ReplicaSettings struct {
|
|||||||
SyncInterval *time.Duration `yaml:"sync-interval"`
|
SyncInterval *time.Duration `yaml:"sync-interval"`
|
||||||
ValidationInterval *time.Duration `yaml:"validation-interval"`
|
ValidationInterval *time.Duration `yaml:"validation-interval"`
|
||||||
|
|
||||||
|
// If true, automatically reset local state when LTX errors are detected.
|
||||||
|
// This allows recovery from corrupted/missing LTX files by forcing a fresh sync.
|
||||||
|
// Disabled by default to prevent silent data loss scenarios.
|
||||||
|
AutoRecover *bool `yaml:"auto-recover"`
|
||||||
|
|
||||||
// S3 settings
|
// S3 settings
|
||||||
AccessKeyID string `yaml:"access-key-id"`
|
AccessKeyID string `yaml:"access-key-id"`
|
||||||
SecretAccessKey string `yaml:"secret-access-key"`
|
SecretAccessKey string `yaml:"secret-access-key"`
|
||||||
@@ -995,6 +1003,11 @@ func (rs *ReplicaSettings) SetDefaults(src *ReplicaSettings) {
|
|||||||
rs.ValidationInterval = src.ValidationInterval
|
rs.ValidationInterval = src.ValidationInterval
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Recovery settings
|
||||||
|
if rs.AutoRecover == nil && src.AutoRecover != nil {
|
||||||
|
rs.AutoRecover = src.AutoRecover
|
||||||
|
}
|
||||||
|
|
||||||
// S3 settings
|
// S3 settings
|
||||||
if rs.AccessKeyID == "" {
|
if rs.AccessKeyID == "" {
|
||||||
rs.AccessKeyID = src.AccessKeyID
|
rs.AccessKeyID = src.AccessKeyID
|
||||||
@@ -1144,6 +1157,9 @@ func NewReplicaFromConfig(c *ReplicaConfig, db *litestream.DB) (_ *litestream.Re
|
|||||||
if v := c.SyncInterval; v != nil {
|
if v := c.SyncInterval; v != nil {
|
||||||
r.SyncInterval = *v
|
r.SyncInterval = *v
|
||||||
}
|
}
|
||||||
|
if v := c.AutoRecover; v != nil {
|
||||||
|
r.AutoRecoverEnabled = *v
|
||||||
|
}
|
||||||
|
|
||||||
// Build and set client on replica.
|
// Build and set client on replica.
|
||||||
switch c.ReplicaType() {
|
switch c.ReplicaType() {
|
||||||
|
|||||||
134
cmd/litestream/reset.go
Normal file
134
cmd/litestream/reset.go
Normal file
@@ -0,0 +1,134 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
|
||||||
|
"github.com/benbjohnson/litestream"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ResetCommand is a command for resetting local Litestream state for a database.
|
||||||
|
type ResetCommand struct{}
|
||||||
|
|
||||||
|
// Run executes the command.
|
||||||
|
func (c *ResetCommand) Run(ctx context.Context, args []string) (err error) {
|
||||||
|
fs := flag.NewFlagSet("litestream-reset", flag.ContinueOnError)
|
||||||
|
configPath, noExpandEnv := registerConfigFlag(fs)
|
||||||
|
fs.Usage = c.Usage
|
||||||
|
if err := fs.Parse(args); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate arguments - need exactly one database path
|
||||||
|
if fs.NArg() == 0 {
|
||||||
|
return fmt.Errorf("database path required")
|
||||||
|
} else if fs.NArg() > 1 {
|
||||||
|
return fmt.Errorf("too many arguments")
|
||||||
|
}
|
||||||
|
|
||||||
|
dbPath := fs.Arg(0)
|
||||||
|
|
||||||
|
// Make absolute if needed
|
||||||
|
if !filepath.IsAbs(dbPath) {
|
||||||
|
if dbPath, err = filepath.Abs(dbPath); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Load configuration to find the database (if config exists)
|
||||||
|
var dbConfig *DBConfig
|
||||||
|
if *configPath != "" {
|
||||||
|
config, configErr := ReadConfigFile(*configPath, !*noExpandEnv)
|
||||||
|
if configErr != nil {
|
||||||
|
return fmt.Errorf("cannot read config: %w", configErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Find database config
|
||||||
|
for _, dbc := range config.DBs {
|
||||||
|
expandedPath := dbc.Path
|
||||||
|
if !filepath.IsAbs(expandedPath) {
|
||||||
|
expandedPath, _ = filepath.Abs(expandedPath)
|
||||||
|
}
|
||||||
|
if expandedPath == dbPath {
|
||||||
|
dbConfig = dbc
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If no config found, check if database file exists
|
||||||
|
if _, err := os.Stat(dbPath); os.IsNotExist(err) {
|
||||||
|
return fmt.Errorf("database does not exist: %s", dbPath)
|
||||||
|
} else if err != nil {
|
||||||
|
return fmt.Errorf("cannot access database: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create DB instance
|
||||||
|
var db *litestream.DB
|
||||||
|
if dbConfig != nil {
|
||||||
|
db, err = NewDBFromConfig(dbConfig)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cannot create database from config: %w", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
db = litestream.NewDB(dbPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if meta path exists
|
||||||
|
metaPath := db.MetaPath()
|
||||||
|
if _, err := os.Stat(metaPath); os.IsNotExist(err) {
|
||||||
|
fmt.Printf("No local state to reset for %s\n", dbPath)
|
||||||
|
fmt.Printf("Meta directory does not exist: %s\n", metaPath)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Perform the reset
|
||||||
|
fmt.Printf("Resetting local Litestream state for: %s\n", dbPath)
|
||||||
|
fmt.Printf("Removing: %s\n", db.LTXDir())
|
||||||
|
|
||||||
|
if err := db.ResetLocalState(ctx); err != nil {
|
||||||
|
return fmt.Errorf("reset failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Println("Reset complete. Next replication sync will create a fresh snapshot.")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Usage prints the help screen to STDOUT.
|
||||||
|
func (c *ResetCommand) Usage() {
|
||||||
|
fmt.Printf(`
|
||||||
|
The reset command clears local Litestream state for a database.
|
||||||
|
|
||||||
|
This is useful for recovering from corrupted or missing LTX files. The reset
|
||||||
|
removes local LTX files from the metadata directory, forcing Litestream to
|
||||||
|
create a fresh snapshot on the next sync. The database file itself is not
|
||||||
|
modified.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
|
||||||
|
litestream reset [arguments] <path>
|
||||||
|
|
||||||
|
Arguments:
|
||||||
|
|
||||||
|
-config PATH
|
||||||
|
Specifies the configuration file.
|
||||||
|
Defaults to %s
|
||||||
|
|
||||||
|
-no-expand-env
|
||||||
|
Disables environment variable expansion in configuration file.
|
||||||
|
|
||||||
|
Examples:
|
||||||
|
|
||||||
|
# Reset local state for a specific database
|
||||||
|
litestream reset /path/to/database.db
|
||||||
|
|
||||||
|
# Reset using a specific configuration file
|
||||||
|
litestream reset -config /etc/litestream.yml /path/to/database.db
|
||||||
|
|
||||||
|
`[1:],
|
||||||
|
DefaultConfigPath(),
|
||||||
|
)
|
||||||
|
}
|
||||||
34
db.go
34
db.go
@@ -253,6 +253,28 @@ func (db *DB) LTXDir() string {
|
|||||||
return filepath.Join(db.metaPath, "ltx")
|
return filepath.Join(db.metaPath, "ltx")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ResetLocalState removes local LTX files, forcing a fresh snapshot on next sync.
|
||||||
|
// This is useful for recovering from corrupted or missing LTX files.
|
||||||
|
// The database file itself is not modified.
|
||||||
|
func (db *DB) ResetLocalState(ctx context.Context) error {
|
||||||
|
db.Logger.Info("resetting local litestream state",
|
||||||
|
"meta_path", db.metaPath,
|
||||||
|
"ltx_dir", db.LTXDir())
|
||||||
|
|
||||||
|
// Remove all LTX files
|
||||||
|
if err := os.RemoveAll(db.LTXDir()); err != nil && !os.IsNotExist(err) {
|
||||||
|
return fmt.Errorf("remove ltx directory: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clear cached LTX file info
|
||||||
|
db.maxLTXFileInfos.Lock()
|
||||||
|
db.maxLTXFileInfos.m = make(map[int]*ltx.FileInfo)
|
||||||
|
db.maxLTXFileInfos.Unlock()
|
||||||
|
|
||||||
|
db.Logger.Info("local state reset complete, next sync will create fresh snapshot")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// LTXLevelDir returns path of the given LTX compaction level.
|
// LTXLevelDir returns path of the given LTX compaction level.
|
||||||
// Panics if level is negative.
|
// Panics if level is negative.
|
||||||
func (db *DB) LTXLevelDir(level int) string {
|
func (db *DB) LTXLevelDir(level int) string {
|
||||||
@@ -1041,15 +1063,21 @@ func (db *DB) verify(ctx context.Context) (info syncInfo, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Determine last WAL offset we save from.
|
// Determine last WAL offset we save from.
|
||||||
ltxFile, err := os.Open(db.LTXPath(0, pos.TXID, pos.TXID))
|
ltxPath := db.LTXPath(0, pos.TXID, pos.TXID)
|
||||||
|
ltxFile, err := os.Open(ltxPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return info, fmt.Errorf("open ltx file: %w", err)
|
if os.IsNotExist(err) {
|
||||||
|
return info, NewLTXError("open", ltxPath, 0, uint64(pos.TXID), uint64(pos.TXID), err)
|
||||||
|
}
|
||||||
|
return info, fmt.Errorf("open ltx file %s: %w", ltxPath, err)
|
||||||
}
|
}
|
||||||
defer func() { _ = ltxFile.Close() }()
|
defer func() { _ = ltxFile.Close() }()
|
||||||
|
|
||||||
dec := ltx.NewDecoder(ltxFile)
|
dec := ltx.NewDecoder(ltxFile)
|
||||||
if err := dec.DecodeHeader(); err != nil {
|
if err := dec.DecodeHeader(); err != nil {
|
||||||
return info, fmt.Errorf("decode ltx file: %w", err)
|
// Decode failure indicates corruption
|
||||||
|
ltxErr := NewLTXError("decode", ltxPath, 0, uint64(pos.TXID), uint64(pos.TXID), fmt.Errorf("%w: %w", ErrLTXCorrupted, err))
|
||||||
|
return info, ltxErr
|
||||||
}
|
}
|
||||||
info.offset = dec.Header().WALOffset + dec.Header().WALSize
|
info.offset = dec.Header().WALOffset + dec.Header().WALSize
|
||||||
info.salt1 = dec.Header().WALSalt1
|
info.salt1 = dec.Header().WALSalt1
|
||||||
|
|||||||
51
db_test.go
51
db_test.go
@@ -1209,3 +1209,54 @@ func TestDB_DelayedCheckpointAfterWrite(t *testing.T) {
|
|||||||
posAfterInsert.TXID, posAfterDelayedCheckpoint.TXID)
|
posAfterInsert.TXID, posAfterDelayedCheckpoint.TXID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestDB_ResetLocalState verifies that ResetLocalState clears the LTX directory.
|
||||||
|
func TestDB_ResetLocalState(t *testing.T) {
|
||||||
|
db, sqldb := testingutil.MustOpenDBs(t)
|
||||||
|
defer testingutil.MustCloseDBs(t, db, sqldb)
|
||||||
|
|
||||||
|
// Create table and insert some data to create LTX files
|
||||||
|
if _, err := sqldb.Exec(`CREATE TABLE t (x TEXT)`); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if _, err := sqldb.Exec(`INSERT INTO t (x) VALUES ('foo')`); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if err := db.Sync(t.Context()); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify LTX directory exists and has files
|
||||||
|
ltxDir := db.LTXDir()
|
||||||
|
if _, err := os.Stat(ltxDir); os.IsNotExist(err) {
|
||||||
|
t.Fatal("LTX directory should exist after sync")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get position before reset
|
||||||
|
posBefore, err := db.Pos()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if posBefore.TXID == 0 {
|
||||||
|
t.Fatal("expected non-zero TXID before reset")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset local state
|
||||||
|
if err := db.ResetLocalState(t.Context()); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify LTX directory is gone
|
||||||
|
if _, err := os.Stat(ltxDir); !os.IsNotExist(err) {
|
||||||
|
t.Fatal("LTX directory should not exist after reset")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get position after reset - should be zero since no LTX files
|
||||||
|
posAfter, err := db.Pos()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if posAfter.TXID != 0 {
|
||||||
|
t.Fatalf("expected zero TXID after reset, got %d", posAfter.TXID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -127,9 +127,13 @@ func (c *ReplicaClient) LTXFiles(ctx context.Context, level int, seek ltx.TXID,
|
|||||||
// OpenLTXFile returns a reader for an LTX file at the given position.
|
// OpenLTXFile returns a reader for an LTX file at the given position.
|
||||||
// Returns os.ErrNotExist if no matching index/offset is found.
|
// Returns os.ErrNotExist if no matching index/offset is found.
|
||||||
func (c *ReplicaClient) OpenLTXFile(ctx context.Context, level int, minTXID, maxTXID ltx.TXID, offset, size int64) (io.ReadCloser, error) {
|
func (c *ReplicaClient) OpenLTXFile(ctx context.Context, level int, minTXID, maxTXID ltx.TXID, offset, size int64) (io.ReadCloser, error) {
|
||||||
f, err := os.Open(c.LTXFilePath(level, minTXID, maxTXID))
|
path := c.LTXFilePath(level, minTXID, maxTXID)
|
||||||
|
f, err := os.Open(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
if os.IsNotExist(err) {
|
||||||
|
return nil, litestream.NewLTXError("open", path, level, uint64(minTXID), uint64(maxTXID), err)
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("open ltx file %s: %w", path, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if offset > 0 {
|
if offset > 0 {
|
||||||
|
|||||||
@@ -32,8 +32,52 @@ const (
|
|||||||
var (
|
var (
|
||||||
ErrNoSnapshots = errors.New("no snapshots available")
|
ErrNoSnapshots = errors.New("no snapshots available")
|
||||||
ErrChecksumMismatch = errors.New("invalid replica, checksum mismatch")
|
ErrChecksumMismatch = errors.New("invalid replica, checksum mismatch")
|
||||||
|
ErrLTXCorrupted = errors.New("ltx file corrupted")
|
||||||
|
ErrLTXMissing = errors.New("ltx file missing")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// LTXError provides detailed context for LTX file errors with recovery hints.
|
||||||
|
type LTXError struct {
|
||||||
|
Op string // Operation that failed (e.g., "open", "read", "validate")
|
||||||
|
Path string // File path
|
||||||
|
Level int // LTX level (0 = L0, etc.)
|
||||||
|
MinTXID uint64 // Minimum transaction ID
|
||||||
|
MaxTXID uint64 // Maximum transaction ID
|
||||||
|
Err error // Underlying error
|
||||||
|
Hint string // Recovery hint for users
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *LTXError) Error() string {
|
||||||
|
if e.Path != "" {
|
||||||
|
return e.Op + " ltx file " + e.Path + ": " + e.Err.Error()
|
||||||
|
}
|
||||||
|
return e.Op + " ltx file: " + e.Err.Error()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *LTXError) Unwrap() error { return e.Err }
|
||||||
|
|
||||||
|
// NewLTXError creates a new LTX error with appropriate hints based on the error type.
|
||||||
|
func NewLTXError(op, path string, level int, minTXID, maxTXID uint64, err error) *LTXError {
|
||||||
|
ltxErr := <XError{
|
||||||
|
Op: op,
|
||||||
|
Path: path,
|
||||||
|
Level: level,
|
||||||
|
MinTXID: minTXID,
|
||||||
|
MaxTXID: maxTXID,
|
||||||
|
Err: err,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set appropriate hint based on error type
|
||||||
|
if os.IsNotExist(err) || errors.Is(err, ErrLTXMissing) {
|
||||||
|
ltxErr.Hint = "LTX file is missing. This can happen after VACUUM, manual checkpoint, or state corruption. " +
|
||||||
|
"Run 'litestream reset <db>' or delete the .sqlite-litestream directory and restart."
|
||||||
|
} else if errors.Is(err, ErrLTXCorrupted) || errors.Is(err, ErrChecksumMismatch) {
|
||||||
|
ltxErr.Hint = "LTX file is corrupted. Delete the .sqlite-litestream directory and restart to recover from replica."
|
||||||
|
}
|
||||||
|
|
||||||
|
return ltxErr
|
||||||
|
}
|
||||||
|
|
||||||
// SQLite WAL constants.
|
// SQLite WAL constants.
|
||||||
const (
|
const (
|
||||||
WALHeaderChecksumOffset = 24
|
WALHeaderChecksumOffset = 24
|
||||||
|
|||||||
@@ -3,6 +3,9 @@ package litestream_test
|
|||||||
import (
|
import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
|
"errors"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/superfly/ltx"
|
"github.com/superfly/ltx"
|
||||||
@@ -67,3 +70,67 @@ func MustDecodeHexString(s string) []byte {
|
|||||||
}
|
}
|
||||||
return b
|
return b
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestNewLTXError(t *testing.T) {
|
||||||
|
t.Run("MissingFile", func(t *testing.T) {
|
||||||
|
err := litestream.NewLTXError("open", "/path/to/file.ltx", 0, 1, 1, os.ErrNotExist)
|
||||||
|
if err.Hint == "" {
|
||||||
|
t.Fatal("expected hint for missing file error")
|
||||||
|
}
|
||||||
|
if !strings.Contains(err.Hint, "missing") {
|
||||||
|
t.Errorf("hint should mention missing file, got: %s", err.Hint)
|
||||||
|
}
|
||||||
|
if !strings.Contains(err.Hint, "litestream reset") {
|
||||||
|
t.Errorf("hint should mention reset command, got: %s", err.Hint)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("CorruptedFile", func(t *testing.T) {
|
||||||
|
err := litestream.NewLTXError("decode", "/path/to/file.ltx", 0, 1, 1, litestream.ErrLTXCorrupted)
|
||||||
|
if err.Hint == "" {
|
||||||
|
t.Fatal("expected hint for corrupted file error")
|
||||||
|
}
|
||||||
|
if !strings.Contains(err.Hint, "corrupted") {
|
||||||
|
t.Errorf("hint should mention corruption, got: %s", err.Hint)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("ChecksumMismatch", func(t *testing.T) {
|
||||||
|
err := litestream.NewLTXError("validate", "/path/to/file.ltx", 0, 1, 1, litestream.ErrChecksumMismatch)
|
||||||
|
if err.Hint == "" {
|
||||||
|
t.Fatal("expected hint for checksum mismatch error")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("ErrorString", func(t *testing.T) {
|
||||||
|
err := litestream.NewLTXError("open", "/path/to/file.ltx", 0, 1, 1, os.ErrNotExist)
|
||||||
|
errStr := err.Error()
|
||||||
|
if !strings.Contains(errStr, "open") {
|
||||||
|
t.Errorf("error should contain operation, got: %s", errStr)
|
||||||
|
}
|
||||||
|
if !strings.Contains(errStr, "/path/to/file.ltx") {
|
||||||
|
t.Errorf("error should contain path, got: %s", errStr)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Unwrap", func(t *testing.T) {
|
||||||
|
underlying := errors.New("underlying error")
|
||||||
|
err := litestream.NewLTXError("read", "/path/to/file.ltx", 0, 1, 1, underlying)
|
||||||
|
if !errors.Is(err, underlying) {
|
||||||
|
t.Error("LTXError should unwrap to underlying error")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLTXErrorHints(t *testing.T) {
|
||||||
|
// Test that ErrLTXMissing also triggers appropriate hints
|
||||||
|
t.Run("ErrLTXMissing", func(t *testing.T) {
|
||||||
|
err := litestream.NewLTXError("open", "/path/to/file.ltx", 0, 1, 1, litestream.ErrLTXMissing)
|
||||||
|
if err.Hint == "" {
|
||||||
|
t.Fatal("expected hint for ErrLTXMissing")
|
||||||
|
}
|
||||||
|
if !strings.Contains(err.Hint, "litestream reset") {
|
||||||
|
t.Errorf("hint should mention reset command, got: %s", err.Hint)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|||||||
56
replica.go
56
replica.go
@@ -47,6 +47,12 @@ type Replica struct {
|
|||||||
// If true, replica monitors database for changes automatically.
|
// If true, replica monitors database for changes automatically.
|
||||||
// Set to false if replica is being used synchronously (such as in tests).
|
// Set to false if replica is being used synchronously (such as in tests).
|
||||||
MonitorEnabled bool
|
MonitorEnabled bool
|
||||||
|
|
||||||
|
// If true, automatically reset local state when LTX errors are detected.
|
||||||
|
// This allows recovery from corrupted/missing LTX files by resetting
|
||||||
|
// the position file and removing local LTX files, forcing a fresh sync.
|
||||||
|
// Disabled by default to prevent silent data loss scenarios.
|
||||||
|
AutoRecoverEnabled bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewReplica(db *DB) *Replica {
|
func NewReplica(db *DB) *Replica {
|
||||||
@@ -363,13 +369,49 @@ func (r *Replica) monitor(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Log with rate limiting to avoid log spam during persistent errors.
|
// Check for LTX errors and include recovery hints
|
||||||
if time.Since(lastLogTime) >= SyncErrorLogInterval {
|
var ltxErr *LTXError
|
||||||
r.Logger().Error("monitor error",
|
if errors.As(err, <xErr) {
|
||||||
"error", err,
|
// Log with rate limiting to avoid log spam during persistent errors.
|
||||||
"consecutive_errors", consecutiveErrs,
|
if time.Since(lastLogTime) >= SyncErrorLogInterval {
|
||||||
"backoff", backoff)
|
if ltxErr.Hint != "" {
|
||||||
lastLogTime = time.Now()
|
r.Logger().Error("monitor error",
|
||||||
|
"error", err,
|
||||||
|
"path", ltxErr.Path,
|
||||||
|
"hint", ltxErr.Hint,
|
||||||
|
"consecutive_errors", consecutiveErrs,
|
||||||
|
"backoff", backoff)
|
||||||
|
} else {
|
||||||
|
r.Logger().Error("monitor error",
|
||||||
|
"error", err,
|
||||||
|
"path", ltxErr.Path,
|
||||||
|
"consecutive_errors", consecutiveErrs,
|
||||||
|
"backoff", backoff)
|
||||||
|
}
|
||||||
|
lastLogTime = time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Attempt auto-recovery if enabled
|
||||||
|
if r.AutoRecoverEnabled {
|
||||||
|
r.Logger().Warn("auto-recovery enabled, resetting local state")
|
||||||
|
if resetErr := r.db.ResetLocalState(ctx); resetErr != nil {
|
||||||
|
r.Logger().Error("auto-recovery failed", "error", resetErr)
|
||||||
|
} else {
|
||||||
|
r.Logger().Info("auto-recovery complete, resuming replication")
|
||||||
|
// Reset backoff after successful recovery
|
||||||
|
backoff = 0
|
||||||
|
consecutiveErrs = 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Log with rate limiting to avoid log spam during persistent errors.
|
||||||
|
if time.Since(lastLogTime) >= SyncErrorLogInterval {
|
||||||
|
r.Logger().Error("monitor error",
|
||||||
|
"error", err,
|
||||||
|
"consecutive_errors", consecutiveErrs,
|
||||||
|
"backoff", backoff)
|
||||||
|
lastLogTime = time.Now()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ package litestream_test
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
@@ -191,7 +192,7 @@ func TestReplicaClient_OpenLTXFile(t *testing.T) {
|
|||||||
t.Helper()
|
t.Helper()
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
if _, err := c.OpenLTXFile(context.Background(), 0, ltx.TXID(1), ltx.TXID(1), 0, 0); !os.IsNotExist(err) {
|
if _, err := c.OpenLTXFile(context.Background(), 0, ltx.TXID(1), ltx.TXID(1), 0, 0); !errors.Is(err, os.ErrNotExist) {
|
||||||
t.Fatalf("expected not exist, got %#v", err)
|
t.Fatalf("expected not exist, got %#v", err)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
@@ -216,10 +217,10 @@ func TestReplicaClient_DeleteWALSegments(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := c.OpenLTXFile(context.Background(), 0, ltx.TXID(1), ltx.TXID(2), 0, 0); !os.IsNotExist(err) {
|
if _, err := c.OpenLTXFile(context.Background(), 0, ltx.TXID(1), ltx.TXID(2), 0, 0); !errors.Is(err, os.ErrNotExist) {
|
||||||
t.Fatalf("expected not exist, got %#v", err)
|
t.Fatalf("expected not exist, got %#v", err)
|
||||||
}
|
}
|
||||||
if _, err := c.OpenLTXFile(context.Background(), 0, ltx.TXID(3), ltx.TXID(4), 0, 0); !os.IsNotExist(err) {
|
if _, err := c.OpenLTXFile(context.Background(), 0, ltx.TXID(3), ltx.TXID(4), 0, 0); !errors.Is(err, os.ErrNotExist) {
|
||||||
t.Fatalf("expected not exist, got %#v", err)
|
t.Fatalf("expected not exist, got %#v", err)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
@@ -388,7 +389,7 @@ func TestReplicaClient_S3_ErrorContext(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Should return os.ErrNotExist for S3 NoSuchKey
|
// Should return os.ErrNotExist for S3 NoSuchKey
|
||||||
if !os.IsNotExist(err) {
|
if !errors.Is(err, os.ErrNotExist) {
|
||||||
t.Errorf("expected os.ErrNotExist, got %v", err)
|
t.Errorf("expected os.ErrNotExist, got %v", err)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|||||||
Reference in New Issue
Block a user