mirror of
https://github.com/benbjohnson/litestream.git
synced 2026-01-25 05:06:30 +00:00
fix(store): retry compaction when db not ready instead of waiting full interval (#888)
Some checks failed
Commit / Lint (push) Has been cancelled
Commit / VFS Build Test (macOS) (push) Has been cancelled
Commit / VFS Build Test (Linux) (push) Has been cancelled
Commit / Build Windows (push) Has been cancelled
Commit / Docker Build Test (push) Has been cancelled
Commit / Build & Unit Test (push) Has been cancelled
Commit / Run S3 Mock Tests (push) Has been cancelled
Commit / Run MinIO Integration Tests (push) Has been cancelled
Commit / Run NATS Integration Tests (push) Has been cancelled
Commit / Run S3 Integration Tests (push) Has been cancelled
Commit / Run GCP Integration Tests (push) Has been cancelled
Commit / Run Azure Blob Store Integration Tests (push) Has been cancelled
Commit / Run SFTP Integration Tests (push) Has been cancelled
Commit / Run WebDAV Integration Tests (push) Has been cancelled
Some checks failed
Commit / Lint (push) Has been cancelled
Commit / VFS Build Test (macOS) (push) Has been cancelled
Commit / VFS Build Test (Linux) (push) Has been cancelled
Commit / Build Windows (push) Has been cancelled
Commit / Docker Build Test (push) Has been cancelled
Commit / Build & Unit Test (push) Has been cancelled
Commit / Run S3 Mock Tests (push) Has been cancelled
Commit / Run MinIO Integration Tests (push) Has been cancelled
Commit / Run NATS Integration Tests (push) Has been cancelled
Commit / Run S3 Integration Tests (push) Has been cancelled
Commit / Run GCP Integration Tests (push) Has been cancelled
Commit / Run Azure Blob Store Integration Tests (push) Has been cancelled
Commit / Run SFTP Integration Tests (push) Has been cancelled
Commit / Run WebDAV Integration Tests (push) Has been cancelled
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
79
store.go
79
store.go
@@ -263,52 +263,61 @@ func (s *Store) SnapshotLevel() *CompactionLevel {
|
||||
func (s *Store) monitorCompactionLevel(ctx context.Context, lvl *CompactionLevel) {
|
||||
slog.Info("starting compaction monitor", "level", lvl.Level, "interval", lvl.Interval)
|
||||
|
||||
// Start first compaction immediately to check for any missed compactions from shutdown
|
||||
retryDeadline := time.Time{}
|
||||
timer := time.NewTimer(time.Nanosecond)
|
||||
defer timer.Stop()
|
||||
|
||||
LOOP:
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
timer.Stop()
|
||||
break LOOP
|
||||
|
||||
return
|
||||
case <-timer.C:
|
||||
// Reset timer before we start compactions so we don't delay it
|
||||
// from long compactions.
|
||||
timer = time.NewTimer(time.Until(lvl.NextCompactionAt(time.Now())))
|
||||
// proceed
|
||||
}
|
||||
|
||||
for _, db := range s.DBs() {
|
||||
// First attempt to compact the database.
|
||||
if _, err := s.CompactDB(ctx, db, lvl); errors.Is(err, ErrNoCompaction) {
|
||||
slog.Debug("no compaction", "level", lvl.Level, "path", db.Path())
|
||||
continue
|
||||
} else if errors.Is(err, ErrCompactionTooEarly) {
|
||||
slog.Debug("recently compacted, skipping", "level", lvl.Level, "path", db.Path())
|
||||
continue
|
||||
} else if errors.Is(err, ErrDBNotReady) {
|
||||
slog.Debug("db not ready, skipping", "level", lvl.Level, "path", db.Path())
|
||||
continue
|
||||
} else if err != nil {
|
||||
// Don't log or sleep on context cancellation errors during shutdown
|
||||
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
|
||||
slog.Error("compaction failed", "level", lvl.Level, "error", err)
|
||||
time.Sleep(1 * time.Second) // wait so we don't rack up S3 charges
|
||||
}
|
||||
}
|
||||
now := time.Now()
|
||||
nextDelay := time.Until(lvl.NextCompactionAt(now))
|
||||
|
||||
// Each time we snapshot, clean up everything before the oldest snapshot.
|
||||
if lvl.Level == SnapshotLevel {
|
||||
if err := s.EnforceSnapshotRetention(ctx, db); err != nil {
|
||||
// Don't log or sleep on context cancellation errors during shutdown
|
||||
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
|
||||
slog.Error("retention enforcement failed", "error", err)
|
||||
time.Sleep(1 * time.Second) // wait so we don't rack up S3 charges
|
||||
}
|
||||
}
|
||||
var anyNotReady bool
|
||||
|
||||
for _, db := range s.DBs() {
|
||||
_, err := s.CompactDB(ctx, db, lvl)
|
||||
switch {
|
||||
case errors.Is(err, ErrNoCompaction), errors.Is(err, ErrCompactionTooEarly):
|
||||
slog.Debug("no compaction", "level", lvl.Level, "path", db.Path())
|
||||
case errors.Is(err, ErrDBNotReady):
|
||||
slog.Debug("db not ready, skipping", "level", lvl.Level, "path", db.Path())
|
||||
anyNotReady = true
|
||||
case err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded):
|
||||
slog.Error("compaction failed", "level", lvl.Level, "error", err)
|
||||
}
|
||||
|
||||
if lvl.Level == SnapshotLevel {
|
||||
if err := s.EnforceSnapshotRetention(ctx, db); err != nil &&
|
||||
!errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
|
||||
slog.Error("retention enforcement failed", "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
timedOut := !retryDeadline.IsZero() && now.After(retryDeadline)
|
||||
if anyNotReady && !timedOut {
|
||||
if retryDeadline.IsZero() {
|
||||
retryDeadline = now.Add(30 * time.Second)
|
||||
}
|
||||
nextDelay = time.Second
|
||||
slog.Debug("scheduling retry for unready dbs", "level", lvl.Level)
|
||||
} else {
|
||||
if timedOut {
|
||||
slog.Warn("timeout waiting for db initialization", "level", lvl.Level)
|
||||
}
|
||||
retryDeadline = time.Time{}
|
||||
}
|
||||
|
||||
if nextDelay < 0 {
|
||||
nextDelay = 0
|
||||
}
|
||||
timer.Reset(nextDelay)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user