Files
litestream/store.go
2026-01-21 14:25:23 -07:00

680 lines
18 KiB
Go

package litestream
import (
"context"
"errors"
"fmt"
"log/slog"
"slices"
"sync"
"time"
"github.com/superfly/ltx"
)
var (
// ErrNoCompaction is returned when no new files are available from the previous level.
ErrNoCompaction = errors.New("no compaction")
// ErrCompactionTooEarly is returned when a compaction is attempted too soon
// since the last compaction time. This is used to prevent frequent
// re-compaction when restarting the process.
ErrCompactionTooEarly = errors.New("compaction too early")
// ErrTxNotAvailable is returned when a transaction does not exist.
ErrTxNotAvailable = errors.New("transaction not available")
// ErrDBNotReady is returned when compaction is attempted before the
// database has been initialized (e.g., page size not yet known).
ErrDBNotReady = errors.New("db not ready")
)
// Store defaults
const (
DefaultSnapshotInterval = 24 * time.Hour
DefaultSnapshotRetention = 24 * time.Hour
DefaultRetention = 24 * time.Hour
DefaultRetentionCheckInterval = 1 * time.Hour
// DefaultL0Retention is the default time that L0 files are kept around
// after they have been compacted into L1 files.
DefaultL0Retention = 5 * time.Minute
// DefaultL0RetentionCheckInterval controls how frequently L0 retention is
// enforced. This interval should be more frequent than the L1 compaction
// interval so that VFS read replicas have time to observe new files.
DefaultL0RetentionCheckInterval = 15 * time.Second
// DefaultHeartbeatCheckInterval controls how frequently the heartbeat
// monitor checks if heartbeat pings should be sent.
DefaultHeartbeatCheckInterval = 15 * time.Second
// DefaultDBInitTimeout is the maximum time to wait for a database to be
// initialized (page size known) before logging a warning.
DefaultDBInitTimeout = 30 * time.Second
)
// Store represents the top-level container for databases.
//
// It manages async background tasks like compactions so that the system
// is not overloaded by too many concurrent tasks.
type Store struct {
mu sync.Mutex
dbs []*DB
levels CompactionLevels
wg sync.WaitGroup
ctx context.Context
cancel func()
// The frequency of snapshots.
SnapshotInterval time.Duration
// The duration of time that snapshots are kept before being deleted.
SnapshotRetention time.Duration
// The duration that L0 files are kept after being compacted into L1.
L0Retention time.Duration
// How often to check for expired L0 files.
L0RetentionCheckInterval time.Duration
// If true, compaction is run in the background according to compaction levels.
CompactionMonitorEnabled bool
// If true, verify TXID consistency at destination level after each compaction.
VerifyCompaction bool
// Shutdown sync retry settings.
ShutdownSyncTimeout time.Duration
ShutdownSyncInterval time.Duration
// How often to check if heartbeat pings should be sent.
HeartbeatCheckInterval time.Duration
// Heartbeat client for health check pings. Sends pings only when
// all databases have synced successfully within the heartbeat interval.
Heartbeat *HeartbeatClient
// heartbeatMonitorRunning tracks whether the heartbeat monitor goroutine is running.
heartbeatMonitorRunning bool
}
func NewStore(dbs []*DB, levels CompactionLevels) *Store {
s := &Store{
dbs: dbs,
levels: levels,
SnapshotInterval: DefaultSnapshotInterval,
SnapshotRetention: DefaultSnapshotRetention,
L0Retention: DefaultL0Retention,
L0RetentionCheckInterval: DefaultL0RetentionCheckInterval,
CompactionMonitorEnabled: true,
ShutdownSyncTimeout: DefaultShutdownSyncTimeout,
ShutdownSyncInterval: DefaultShutdownSyncInterval,
HeartbeatCheckInterval: DefaultHeartbeatCheckInterval,
}
for _, db := range dbs {
db.L0Retention = s.L0Retention
db.ShutdownSyncTimeout = s.ShutdownSyncTimeout
db.ShutdownSyncInterval = s.ShutdownSyncInterval
db.VerifyCompaction = s.VerifyCompaction
}
s.ctx, s.cancel = context.WithCancel(context.Background())
return s
}
func (s *Store) Open(ctx context.Context) error {
if err := s.levels.Validate(); err != nil {
return err
}
for _, db := range s.dbs {
if err := db.Open(); err != nil {
return err
}
}
// Start monitors for compactions & snapshots.
if s.CompactionMonitorEnabled {
// Start compaction monitors for all levels except L0.
for _, lvl := range s.levels {
lvl := lvl
if lvl.Level == 0 {
continue
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.monitorCompactionLevel(s.ctx, lvl)
}()
}
// Start snapshot monitor for snapshots.
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.monitorCompactionLevel(s.ctx, s.SnapshotLevel())
}()
}
if s.L0Retention > 0 && s.L0RetentionCheckInterval > 0 {
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.monitorL0Retention(s.ctx)
}()
}
// Start heartbeat monitor if any database has heartbeat configured.
s.startHeartbeatMonitorIfNeeded()
return nil
}
func (s *Store) Close(ctx context.Context) (err error) {
s.mu.Lock()
dbs := slices.Clone(s.dbs)
s.mu.Unlock()
for _, db := range dbs {
if e := db.Close(ctx); e != nil && err == nil {
err = e
}
}
// Cancel and wait for background tasks to complete.
s.cancel()
s.wg.Wait()
return err
}
func (s *Store) DBs() []*DB {
s.mu.Lock()
defer s.mu.Unlock()
return slices.Clone(s.dbs)
}
// AddDB registers a new database with the store and starts monitoring it.
func (s *Store) AddDB(db *DB) error {
if db == nil {
return fmt.Errorf("db required")
}
// First check: see if database already exists
s.mu.Lock()
for _, existing := range s.dbs {
if existing.Path() == db.Path() {
s.mu.Unlock()
return nil
}
}
s.mu.Unlock()
// Apply store-wide settings before opening the database.
db.L0Retention = s.L0Retention
db.ShutdownSyncTimeout = s.ShutdownSyncTimeout
db.ShutdownSyncInterval = s.ShutdownSyncInterval
db.VerifyCompaction = s.VerifyCompaction
// Open the database without holding the lock to avoid blocking other operations.
// The double-check pattern below handles the race condition.
if err := db.Open(); err != nil {
return fmt.Errorf("open db: %w", err)
}
// Second check: verify database wasn't added by another goroutine while we were opening.
// If it was, close our instance and return without error.
s.mu.Lock()
for _, existing := range s.dbs {
if existing.Path() == db.Path() {
// Another goroutine added this database while we were opening.
// Release lock before closing to avoid potential deadlock.
s.mu.Unlock()
if err := db.Close(context.Background()); err != nil {
slog.Error("close duplicate db", "path", db.Path(), "error", err)
}
return nil
}
}
s.dbs = append(s.dbs, db)
s.mu.Unlock()
// Start heartbeat monitor if heartbeat is configured and monitor isn't running.
s.startHeartbeatMonitorIfNeeded()
return nil
}
// RemoveDB stops monitoring the database at the provided path and closes it.
func (s *Store) RemoveDB(ctx context.Context, path string) error {
if path == "" {
return fmt.Errorf("db path required")
}
s.mu.Lock()
idx := -1
var db *DB
for i, existing := range s.dbs {
if existing.Path() == path {
idx = i
db = existing
break
}
}
if db == nil {
s.mu.Unlock()
return nil
}
s.dbs = slices.Delete(s.dbs, idx, idx+1)
s.mu.Unlock()
if err := db.Close(ctx); err != nil {
return fmt.Errorf("close db: %w", err)
}
return nil
}
// EnableDB starts replication for a registered database.
// The context is checked for cancellation before opening.
// Note: db.Open() itself does not support cancellation.
func (s *Store) EnableDB(ctx context.Context, path string) error {
db := s.FindDB(path)
if db == nil {
return fmt.Errorf("database not found: %s", path)
}
if db.IsOpen() {
return fmt.Errorf("database already enabled: %s", path)
}
// Check for cancellation before starting open
if err := ctx.Err(); err != nil {
return fmt.Errorf("enable database: %w", err)
}
if err := db.Open(); err != nil {
return fmt.Errorf("open database: %w", err)
}
return nil
}
// DisableDB stops replication for a database.
func (s *Store) DisableDB(ctx context.Context, path string) error {
db := s.FindDB(path)
if db == nil {
return fmt.Errorf("database not found: %s", path)
}
if !db.IsOpen() {
return fmt.Errorf("database already disabled: %s", path)
}
if err := db.Close(ctx); err != nil {
return fmt.Errorf("close database: %w", err)
}
return nil
}
// FindDB returns the database with the given path.
func (s *Store) FindDB(path string) *DB {
s.mu.Lock()
defer s.mu.Unlock()
for _, db := range s.dbs {
if db.Path() == path {
return db
}
}
return nil
}
// SetL0Retention updates the retention window for L0 files and propagates it to
// all managed databases.
func (s *Store) SetL0Retention(d time.Duration) {
s.mu.Lock()
defer s.mu.Unlock()
s.L0Retention = d
for _, db := range s.dbs {
db.L0Retention = d
}
}
// SetShutdownSyncTimeout updates the shutdown sync timeout and propagates it to
// all managed databases.
func (s *Store) SetShutdownSyncTimeout(d time.Duration) {
s.mu.Lock()
defer s.mu.Unlock()
s.ShutdownSyncTimeout = d
for _, db := range s.dbs {
db.ShutdownSyncTimeout = d
}
}
// SetShutdownSyncInterval updates the shutdown sync interval and propagates it to
// all managed databases.
func (s *Store) SetShutdownSyncInterval(d time.Duration) {
s.mu.Lock()
defer s.mu.Unlock()
s.ShutdownSyncInterval = d
for _, db := range s.dbs {
db.ShutdownSyncInterval = d
}
}
// SetVerifyCompaction updates the verify compaction flag and propagates it to
// all managed databases.
func (s *Store) SetVerifyCompaction(v bool) {
s.mu.Lock()
defer s.mu.Unlock()
s.VerifyCompaction = v
for _, db := range s.dbs {
db.VerifyCompaction = v
db.compactor.VerifyCompaction = v
}
}
// SnapshotLevel returns a pseudo compaction level based on snapshot settings.
func (s *Store) SnapshotLevel() *CompactionLevel {
return &CompactionLevel{
Level: SnapshotLevel,
Interval: s.SnapshotInterval,
}
}
func (s *Store) monitorCompactionLevel(ctx context.Context, lvl *CompactionLevel) {
slog.Info("starting compaction monitor", "level", lvl.Level, "interval", lvl.Interval)
retryDeadline := time.Time{}
timer := time.NewTimer(time.Nanosecond)
defer timer.Stop()
for {
select {
case <-ctx.Done():
return
case <-timer.C:
// proceed
}
now := time.Now()
nextDelay := time.Until(lvl.NextCompactionAt(now))
var notReadyDBs []string
for _, db := range s.DBs() {
if !db.IsOpen() {
continue // skip disabled DBs
}
_, err := s.CompactDB(ctx, db, lvl)
switch {
case errors.Is(err, ErrNoCompaction), errors.Is(err, ErrCompactionTooEarly):
slog.Debug("no compaction", "level", lvl.Level, "path", db.Path())
case errors.Is(err, ErrDBNotReady):
slog.Debug("db not ready, skipping", "level", lvl.Level, "path", db.Path())
notReadyDBs = append(notReadyDBs, db.Path())
case err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded):
slog.Error("compaction failed", "level", lvl.Level, "error", err)
}
if lvl.Level == SnapshotLevel {
if err := s.EnforceSnapshotRetention(ctx, db); err != nil &&
!errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
slog.Error("retention enforcement failed", "error", err)
}
}
}
timedOut := !retryDeadline.IsZero() && now.After(retryDeadline)
if len(notReadyDBs) > 0 && !timedOut {
if retryDeadline.IsZero() {
retryDeadline = now.Add(DefaultDBInitTimeout)
}
nextDelay = time.Second
slog.Debug("scheduling retry for unready dbs", "level", lvl.Level)
} else {
if timedOut {
slog.Warn("timeout waiting for db initialization",
"level", lvl.Level,
"dbs", notReadyDBs,
"timeout", DefaultDBInitTimeout,
"hint", "database may have corrupted local state or blocked transactions; try removing -litestream directory and restarting")
}
retryDeadline = time.Time{}
}
if nextDelay < 0 {
nextDelay = 0
}
timer.Reset(nextDelay)
}
}
func (s *Store) monitorL0Retention(ctx context.Context) {
slog.Info("starting L0 retention monitor", "interval", s.L0RetentionCheckInterval, "retention", s.L0Retention)
ticker := time.NewTicker(s.L0RetentionCheckInterval)
defer ticker.Stop()
LOOP:
for {
select {
case <-ctx.Done():
break LOOP
case <-ticker.C:
}
for _, db := range s.DBs() {
if !db.IsOpen() {
continue // skip disabled DBs
}
if err := db.EnforceL0RetentionByTime(ctx); err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
continue
}
slog.Error("l0 retention enforcement failed", "path", db.Path(), "error", err)
}
}
}
}
// startHeartbeatMonitorIfNeeded starts the heartbeat monitor goroutine if:
// - HeartbeatCheckInterval is configured
// - Heartbeat is configured on the Store
// - The monitor is not already running
func (s *Store) startHeartbeatMonitorIfNeeded() {
s.mu.Lock()
defer s.mu.Unlock()
if s.heartbeatMonitorRunning {
return
}
if s.HeartbeatCheckInterval <= 0 {
return
}
if !s.hasHeartbeatConfigLocked() {
return
}
s.heartbeatMonitorRunning = true
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.monitorHeartbeats(s.ctx)
}()
}
// hasHeartbeatConfigLocked returns true if heartbeat is configured on the Store.
// Must be called with s.mu held.
func (s *Store) hasHeartbeatConfigLocked() bool {
return s.Heartbeat != nil && s.Heartbeat.URL != ""
}
// monitorHeartbeats periodically checks if heartbeat pings should be sent.
// Heartbeat pings are only sent when ALL databases have synced successfully
// within the heartbeat interval.
func (s *Store) monitorHeartbeats(ctx context.Context) {
slog.Info("starting heartbeat monitor", "interval", s.HeartbeatCheckInterval)
ticker := time.NewTicker(s.HeartbeatCheckInterval)
defer ticker.Stop()
LOOP:
for {
select {
case <-ctx.Done():
break LOOP
case <-ticker.C:
}
s.sendHeartbeatIfNeeded(ctx)
}
}
// sendHeartbeatIfNeeded sends a heartbeat ping if:
// - Heartbeat is configured on the Store
// - Enough time has passed since the last ping attempt
// - ALL databases have synced successfully within the heartbeat interval
func (s *Store) sendHeartbeatIfNeeded(ctx context.Context) {
hb := s.Heartbeat
if hb == nil || hb.URL == "" {
return
}
if !hb.ShouldPing() {
return
}
// Check if all databases are healthy (synced within the heartbeat interval).
// A database is healthy if it synced within the heartbeat interval.
healthySince := time.Now().Add(-hb.Interval)
if !s.allDatabasesHealthy(healthySince) {
return
}
// Record ping attempt time before making the request to ensure we respect
// the configured interval even if the ping fails. This prevents rapid
// retries that could overwhelm the endpoint.
hb.RecordPing()
if err := hb.Ping(ctx); err != nil {
slog.Error("heartbeat ping failed", "url", hb.URL, "error", err)
return
}
slog.Debug("heartbeat ping sent", "url", hb.URL)
}
// allDatabasesHealthy returns true if all databases have synced successfully
// since the given time. Returns false if there are no databases or no enabled databases.
func (s *Store) allDatabasesHealthy(since time.Time) bool {
dbs := s.DBs()
if len(dbs) == 0 {
return false
}
enabledCount := 0
for _, db := range dbs {
if !db.IsOpen() {
continue // skip disabled DBs
}
enabledCount++
lastSync := db.LastSuccessfulSyncAt()
if lastSync.IsZero() || lastSync.Before(since) {
return false
}
}
return enabledCount > 0
}
// CompactDB performs a compaction or snapshot for a given database on a single destination level.
// This function will only proceed if a compaction has not occurred before the last compaction time.
func (s *Store) CompactDB(ctx context.Context, db *DB, lvl *CompactionLevel) (*ltx.FileInfo, error) {
// Skip if database is not yet initialized (page size unknown).
if db.PageSize() == 0 {
return nil, ErrDBNotReady
}
dstLevel := lvl.Level
// Ensure we are not re-compacting before the most recent compaction time.
prevCompactionAt := lvl.PrevCompactionAt(time.Now())
dstInfo, err := db.MaxLTXFileInfo(ctx, dstLevel)
if err != nil {
return nil, fmt.Errorf("fetch dst level info: %w", err)
} else if dstInfo.CreatedAt.After(prevCompactionAt) {
return nil, ErrCompactionTooEarly
}
// Shortcut if this is a snapshot since we are not pulling from a previous level.
if dstLevel == SnapshotLevel {
info, err := db.Snapshot(ctx)
if err != nil {
return info, err
}
slog.InfoContext(ctx, "snapshot complete", "txid", info.MaxTXID.String(), "size", info.Size)
return info, nil
}
// Fetch latest LTX files for both the source & destination so we can see if we need to make progress.
srcLevel := s.levels.PrevLevel(dstLevel)
srcInfo, err := db.MaxLTXFileInfo(ctx, srcLevel)
if err != nil {
return nil, fmt.Errorf("fetch src level info: %w", err)
}
// Skip if there are no new files to compact.
if srcInfo.MaxTXID <= dstInfo.MinTXID {
return nil, ErrNoCompaction
}
info, err := db.Compact(ctx, dstLevel)
if err != nil {
return info, err
}
slog.InfoContext(ctx, "compaction complete",
"level", dstLevel,
slog.Group("txid",
"min", info.MinTXID.String(),
"max", info.MaxTXID.String(),
),
"size", info.Size,
)
return info, nil
}
// EnforceSnapshotRetention removes old snapshots by timestamp and then
// cleans up all lower levels based on minimum snapshot TXID.
func (s *Store) EnforceSnapshotRetention(ctx context.Context, db *DB) error {
// Enforce retention for the snapshot level.
minSnapshotTXID, err := db.EnforceSnapshotRetention(ctx, time.Now().Add(-s.SnapshotRetention))
if err != nil {
return fmt.Errorf("enforce snapshot retention: %w", err)
}
// We should also enforce retention for L0 on the same schedule as L1.
for _, lvl := range s.levels {
// Skip L0 since it is enforced on a more frequent basis.
if lvl.Level == 0 {
continue
}
if err := db.EnforceRetentionByTXID(ctx, lvl.Level, minSnapshotTXID); err != nil {
return fmt.Errorf("enforce L%d retention: %w", lvl.Level, err)
}
}
return nil
}