feat(heartbeat): add notification hooks for health check services (#926)

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Cory LaNou
2026-01-06 11:18:23 -06:00
committed by GitHub
parent 9d2648700e
commit 62b25c13dc
7 changed files with 524 additions and 1 deletions

View File

@@ -51,6 +51,8 @@ var (
ErrInvalidL0RetentionCheckInterval = errors.New("l0 retention check interval must be greater than 0")
ErrInvalidShutdownSyncTimeout = errors.New("shutdown-sync-timeout must be >= 0")
ErrInvalidShutdownSyncInterval = errors.New("shutdown sync interval must be greater than 0")
ErrInvalidHeartbeatURL = errors.New("heartbeat URL must be a valid HTTP or HTTPS URL")
ErrInvalidHeartbeatInterval = errors.New("heartbeat interval must be at least 1 minute")
ErrConfigFileNotFound = errors.New("config file not found")
)
@@ -212,6 +214,10 @@ type Config struct {
L0Retention *time.Duration `yaml:"l0-retention"`
L0RetentionCheckInterval *time.Duration `yaml:"l0-retention-check-interval"`
// Heartbeat settings (global defaults)
HeartbeatURL string `yaml:"heartbeat-url"`
HeartbeatInterval *time.Duration `yaml:"heartbeat-interval"`
// List of databases to manage.
DBs []*DBConfig `yaml:"dbs"`
@@ -331,6 +337,22 @@ func (c *Config) Validate() error {
}
}
// Validate global heartbeat settings
if c.HeartbeatURL != "" && !isValidHeartbeatURL(c.HeartbeatURL) {
return &ConfigValidationError{
Err: ErrInvalidHeartbeatURL,
Field: "heartbeat-url",
Value: c.HeartbeatURL,
}
}
if c.HeartbeatInterval != nil && *c.HeartbeatInterval < litestream.MinHeartbeatInterval {
return &ConfigValidationError{
Err: ErrInvalidHeartbeatInterval,
Field: "heartbeat-interval",
Value: *c.HeartbeatInterval,
}
}
// Validate compaction level intervals
for i, level := range c.Levels {
if level.Interval <= 0 {
@@ -496,7 +518,7 @@ func ParseConfig(r io.Reader, expandEnv bool) (_ Config, err error) {
}
}
// Propage settings from global config to replica configs.
// Propagate settings from global config to individual configs.
config.propagateGlobalSettings()
// Validate configuration
@@ -1792,6 +1814,11 @@ func registerConfigFlag(fs *flag.FlagSet) (configPath *string, noExpandEnv *bool
fs.Bool("no-expand-env", false, "do not expand env vars in config")
}
// isValidHeartbeatURL checks if the URL is a valid HTTP or HTTPS URL.
func isValidHeartbeatURL(u string) bool {
return strings.HasPrefix(u, "http://") || strings.HasPrefix(u, "https://")
}
// expand returns an absolute path for s.
// It also strips SQLite connection string prefixes (sqlite://, sqlite3://).
func expand(s string) (string, error) {

View File

@@ -228,6 +228,13 @@ func (c *ReplicateCommand) Run(ctx context.Context) (err error) {
if c.Config.ShutdownSyncInterval != nil {
c.Store.SetShutdownSyncInterval(*c.Config.ShutdownSyncInterval)
}
if c.Config.HeartbeatURL != "" {
interval := litestream.DefaultHeartbeatInterval
if c.Config.HeartbeatInterval != nil {
interval = *c.Config.HeartbeatInterval
}
c.Store.Heartbeat = litestream.NewHeartbeatClient(c.Config.HeartbeatURL, interval)
}
// Disable all background monitors when running once.
// This must be done after config settings are applied.

20
db.go
View File

@@ -152,6 +152,11 @@ type DB struct {
ShutdownSyncTimeout time.Duration
ShutdownSyncInterval time.Duration
// lastSuccessfulSyncAt tracks when replication last succeeded.
// Used by heartbeat monitoring to determine if a ping should be sent.
lastSuccessfulSyncMu sync.RWMutex
lastSuccessfulSyncAt time.Time
// Where to send log messages, defaults to global slog with database epath.
Logger *slog.Logger
}
@@ -303,6 +308,21 @@ func (db *DB) PageSize() int {
return db.pageSize
}
// RecordSuccessfulSync marks the current time as a successful sync.
// Used by heartbeat monitoring to determine if a ping should be sent.
func (db *DB) RecordSuccessfulSync() {
db.lastSuccessfulSyncMu.Lock()
defer db.lastSuccessfulSyncMu.Unlock()
db.lastSuccessfulSyncAt = time.Now()
}
// LastSuccessfulSyncAt returns the time of the last successful sync.
func (db *DB) LastSuccessfulSyncAt() time.Time {
db.lastSuccessfulSyncMu.RLock()
defer db.lastSuccessfulSyncMu.RUnlock()
return db.lastSuccessfulSyncAt
}
// Open initializes the background monitoring goroutine.
func (db *DB) Open() (err error) {
// Validate fields on database.

84
heartbeat.go Normal file
View File

@@ -0,0 +1,84 @@
package litestream
import (
"context"
"fmt"
"net/http"
"sync"
"time"
)
const (
DefaultHeartbeatInterval = 5 * time.Minute
DefaultHeartbeatTimeout = 30 * time.Second
MinHeartbeatInterval = 1 * time.Minute
)
type HeartbeatClient struct {
mu sync.Mutex
httpClient *http.Client
URL string
Interval time.Duration
Timeout time.Duration
lastPingAt time.Time
}
func NewHeartbeatClient(url string, interval time.Duration) *HeartbeatClient {
if interval < MinHeartbeatInterval {
interval = MinHeartbeatInterval
}
timeout := DefaultHeartbeatTimeout
return &HeartbeatClient{
URL: url,
Interval: interval,
Timeout: timeout,
httpClient: &http.Client{
Timeout: timeout,
},
}
}
func (c *HeartbeatClient) Ping(ctx context.Context) error {
if c.URL == "" {
return nil
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.URL, nil)
if err != nil {
return fmt.Errorf("create request: %w", err)
}
resp, err := c.httpClient.Do(req)
if err != nil {
return fmt.Errorf("http request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
return nil
}
func (c *HeartbeatClient) ShouldPing() bool {
c.mu.Lock()
defer c.mu.Unlock()
return time.Since(c.lastPingAt) >= c.Interval
}
func (c *HeartbeatClient) LastPingAt() time.Time {
c.mu.Lock()
defer c.mu.Unlock()
return c.lastPingAt
}
func (c *HeartbeatClient) RecordPing() {
c.mu.Lock()
defer c.mu.Unlock()
c.lastPingAt = time.Now()
}

256
heartbeat_test.go Normal file
View File

@@ -0,0 +1,256 @@
package litestream_test
import (
"context"
"net/http"
"net/http/httptest"
"path/filepath"
"sync/atomic"
"testing"
"time"
"github.com/benbjohnson/litestream"
"github.com/benbjohnson/litestream/internal/testingutil"
)
func TestHeartbeatClient_Ping(t *testing.T) {
t.Run("Success", func(t *testing.T) {
var pingCount atomic.Int64
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
t.Errorf("expected GET, got %s", r.Method)
}
pingCount.Add(1)
w.WriteHeader(http.StatusOK)
}))
defer server.Close()
client := litestream.NewHeartbeatClient(server.URL, 5*time.Minute)
if err := client.Ping(context.Background()); err != nil {
t.Fatalf("expected no error, got %v", err)
}
if got := pingCount.Load(); got != 1 {
t.Errorf("expected 1 ping, got %d", got)
}
})
t.Run("EmptyURL", func(t *testing.T) {
client := litestream.NewHeartbeatClient("", 5*time.Minute)
if err := client.Ping(context.Background()); err != nil {
t.Fatalf("expected no error for empty URL, got %v", err)
}
})
t.Run("NonSuccessStatusCode", func(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
}))
defer server.Close()
client := litestream.NewHeartbeatClient(server.URL, 5*time.Minute)
err := client.Ping(context.Background())
if err == nil {
t.Fatal("expected error for 500 status code")
}
})
t.Run("NetworkError", func(t *testing.T) {
client := litestream.NewHeartbeatClient("http://localhost:1", 5*time.Minute)
err := client.Ping(context.Background())
if err == nil {
t.Fatal("expected error for unreachable server")
}
})
t.Run("ContextCanceled", func(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(100 * time.Millisecond)
w.WriteHeader(http.StatusOK)
}))
defer server.Close()
ctx, cancel := context.WithCancel(context.Background())
cancel()
client := litestream.NewHeartbeatClient(server.URL, 5*time.Minute)
err := client.Ping(ctx)
if err == nil {
t.Fatal("expected error for canceled context")
}
})
}
func TestHeartbeatClient_ShouldPing(t *testing.T) {
t.Run("FirstPing", func(t *testing.T) {
client := litestream.NewHeartbeatClient("http://example.com", 5*time.Minute)
if !client.ShouldPing() {
t.Error("expected ShouldPing to return true for first ping")
}
})
t.Run("AfterRecordPing", func(t *testing.T) {
client := litestream.NewHeartbeatClient("http://example.com", 5*time.Minute)
client.RecordPing()
if client.ShouldPing() {
t.Error("expected ShouldPing to return false immediately after RecordPing")
}
})
}
func TestHeartbeatClient_MinInterval(t *testing.T) {
client := litestream.NewHeartbeatClient("http://example.com", 30*time.Second)
if client.Interval != litestream.MinHeartbeatInterval {
t.Errorf("expected interval to be clamped to %v, got %v", litestream.MinHeartbeatInterval, client.Interval)
}
}
func TestHeartbeatClient_LastPingAt(t *testing.T) {
client := litestream.NewHeartbeatClient("http://example.com", 5*time.Minute)
if !client.LastPingAt().IsZero() {
t.Error("expected LastPingAt to be zero initially")
}
before := time.Now()
client.RecordPing()
after := time.Now()
lastPing := client.LastPingAt()
if lastPing.Before(before) || lastPing.After(after) {
t.Errorf("LastPingAt %v should be between %v and %v", lastPing, before, after)
}
}
func TestStore_Heartbeat_AllDatabasesHealthy(t *testing.T) {
t.Run("NoDatabases", func(t *testing.T) {
levels := litestream.CompactionLevels{{Level: 0}}
store := litestream.NewStore(nil, levels)
store.CompactionMonitorEnabled = false
store.HeartbeatCheckInterval = 0 // Disable automatic monitoring
var pingCount atomic.Int64
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
pingCount.Add(1)
w.WriteHeader(http.StatusOK)
}))
defer server.Close()
store.Heartbeat = litestream.NewHeartbeatClient(server.URL, 1*time.Minute)
// With no databases, heartbeat should not fire
// We need to trigger the check manually since monitor is disabled
// The store won't send pings because allDatabasesHealthy returns false for empty stores
if pingCount.Load() != 0 {
t.Errorf("expected no pings with no databases, got %d", pingCount.Load())
}
})
t.Run("AllDatabasesSynced", func(t *testing.T) {
db0, sqldb0 := testingutil.MustOpenDBs(t)
defer testingutil.MustCloseDBs(t, db0, sqldb0)
db1, sqldb1 := testingutil.MustOpenDBs(t)
defer testingutil.MustCloseDBs(t, db1, sqldb1)
levels := litestream.CompactionLevels{{Level: 0}, {Level: 1, Interval: time.Second}}
store := litestream.NewStore([]*litestream.DB{db0, db1}, levels)
store.CompactionMonitorEnabled = false
store.HeartbeatCheckInterval = 0
var pingCount atomic.Int64
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
pingCount.Add(1)
w.WriteHeader(http.StatusOK)
}))
defer server.Close()
store.Heartbeat = litestream.NewHeartbeatClient(server.URL, 1*time.Minute)
if err := store.Open(t.Context()); err != nil {
t.Fatalf("open store: %v", err)
}
defer store.Close(t.Context())
// Create tables and sync both databases
if _, err := sqldb0.ExecContext(t.Context(), `CREATE TABLE t (id INT)`); err != nil {
t.Fatal(err)
}
if err := db0.Sync(t.Context()); err != nil {
t.Fatal(err)
}
if err := db0.Replica.Sync(t.Context()); err != nil {
t.Fatal(err)
}
if _, err := sqldb1.ExecContext(t.Context(), `CREATE TABLE t (id INT)`); err != nil {
t.Fatal(err)
}
if err := db1.Sync(t.Context()); err != nil {
t.Fatal(err)
}
if err := db1.Replica.Sync(t.Context()); err != nil {
t.Fatal(err)
}
// Both databases have synced, heartbeat should fire
if err := store.Heartbeat.Ping(t.Context()); err != nil {
t.Fatalf("ping failed: %v", err)
}
if got := pingCount.Load(); got != 1 {
t.Errorf("expected 1 ping after all DBs synced, got %d", got)
}
})
t.Run("OneDatabaseNotSynced", func(t *testing.T) {
db0, sqldb0 := testingutil.MustOpenDBs(t)
defer testingutil.MustCloseDBs(t, db0, sqldb0)
// Create second DB but don't sync it
db1 := litestream.NewDB(filepath.Join(t.TempDir(), "db1"))
db1.Replica = litestream.NewReplica(db1)
db1.Replica.Client = testingutil.NewFileReplicaClient(t)
db1.Replica.MonitorEnabled = false
db1.MonitorInterval = 0
levels := litestream.CompactionLevels{{Level: 0}, {Level: 1, Interval: time.Second}}
store := litestream.NewStore([]*litestream.DB{db0, db1}, levels)
store.CompactionMonitorEnabled = false
store.HeartbeatCheckInterval = 0
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
}))
defer server.Close()
store.Heartbeat = litestream.NewHeartbeatClient(server.URL, 1*time.Minute)
if err := store.Open(t.Context()); err != nil {
t.Fatalf("open store: %v", err)
}
defer store.Close(t.Context())
// Only sync db0
if _, err := sqldb0.ExecContext(t.Context(), `CREATE TABLE t (id INT)`); err != nil {
t.Fatal(err)
}
if err := db0.Sync(t.Context()); err != nil {
t.Fatal(err)
}
if err := db0.Replica.Sync(t.Context()); err != nil {
t.Fatal(err)
}
// db1 hasn't synced, so LastSuccessfulSyncAt should be zero
if !db1.LastSuccessfulSyncAt().IsZero() {
t.Error("expected db1.LastSuccessfulSyncAt to be zero")
}
// db0 has synced
if db0.LastSuccessfulSyncAt().IsZero() {
t.Error("expected db0.LastSuccessfulSyncAt to be non-zero")
}
})
}

View File

@@ -160,6 +160,9 @@ func (r *Replica) Sync(ctx context.Context) (err error) {
r.SetPos(ltx.Pos{TXID: txID})
}
// Record successful sync for heartbeat monitoring.
r.db.RecordSuccessfulSync()
return nil
}

126
store.go
View File

@@ -44,6 +44,10 @@ const (
// enforced. This interval should be more frequent than the L1 compaction
// interval so that VFS read replicas have time to observe new files.
DefaultL0RetentionCheckInterval = 15 * time.Second
// DefaultHeartbeatCheckInterval controls how frequently the heartbeat
// monitor checks if heartbeat pings should be sent.
DefaultHeartbeatCheckInterval = 15 * time.Second
)
// Store represents the top-level container for databases.
@@ -75,6 +79,16 @@ type Store struct {
// Shutdown sync retry settings.
ShutdownSyncTimeout time.Duration
ShutdownSyncInterval time.Duration
// How often to check if heartbeat pings should be sent.
HeartbeatCheckInterval time.Duration
// Heartbeat client for health check pings. Sends pings only when
// all databases have synced successfully within the heartbeat interval.
Heartbeat *HeartbeatClient
// heartbeatMonitorRunning tracks whether the heartbeat monitor goroutine is running.
heartbeatMonitorRunning bool
}
func NewStore(dbs []*DB, levels CompactionLevels) *Store {
@@ -89,6 +103,7 @@ func NewStore(dbs []*DB, levels CompactionLevels) *Store {
CompactionMonitorEnabled: true,
ShutdownSyncTimeout: DefaultShutdownSyncTimeout,
ShutdownSyncInterval: DefaultShutdownSyncInterval,
HeartbeatCheckInterval: DefaultHeartbeatCheckInterval,
}
for _, db := range dbs {
@@ -143,6 +158,9 @@ func (s *Store) Open(ctx context.Context) error {
}()
}
// Start heartbeat monitor if any database has heartbeat configured.
s.startHeartbeatMonitorIfNeeded()
return nil
}
@@ -215,6 +233,10 @@ func (s *Store) AddDB(db *DB) error {
s.dbs = append(s.dbs, db)
s.mu.Unlock()
// Start heartbeat monitor if heartbeat is configured and monitor isn't running.
s.startHeartbeatMonitorIfNeeded()
return nil
}
@@ -378,6 +400,110 @@ LOOP:
}
}
// startHeartbeatMonitorIfNeeded starts the heartbeat monitor goroutine if:
// - HeartbeatCheckInterval is configured
// - Heartbeat is configured on the Store
// - The monitor is not already running
func (s *Store) startHeartbeatMonitorIfNeeded() {
s.mu.Lock()
defer s.mu.Unlock()
if s.heartbeatMonitorRunning {
return
}
if s.HeartbeatCheckInterval <= 0 {
return
}
if !s.hasHeartbeatConfigLocked() {
return
}
s.heartbeatMonitorRunning = true
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.monitorHeartbeats(s.ctx)
}()
}
// hasHeartbeatConfigLocked returns true if heartbeat is configured on the Store.
// Must be called with s.mu held.
func (s *Store) hasHeartbeatConfigLocked() bool {
return s.Heartbeat != nil && s.Heartbeat.URL != ""
}
// monitorHeartbeats periodically checks if heartbeat pings should be sent.
// Heartbeat pings are only sent when ALL databases have synced successfully
// within the heartbeat interval.
func (s *Store) monitorHeartbeats(ctx context.Context) {
slog.Info("starting heartbeat monitor", "interval", s.HeartbeatCheckInterval)
ticker := time.NewTicker(s.HeartbeatCheckInterval)
defer ticker.Stop()
LOOP:
for {
select {
case <-ctx.Done():
break LOOP
case <-ticker.C:
}
s.sendHeartbeatIfNeeded(ctx)
}
}
// sendHeartbeatIfNeeded sends a heartbeat ping if:
// - Heartbeat is configured on the Store
// - Enough time has passed since the last ping attempt
// - ALL databases have synced successfully within the heartbeat interval
func (s *Store) sendHeartbeatIfNeeded(ctx context.Context) {
hb := s.Heartbeat
if hb == nil || hb.URL == "" {
return
}
if !hb.ShouldPing() {
return
}
// Check if all databases are healthy (synced within the heartbeat interval).
// A database is healthy if it synced within the heartbeat interval.
healthySince := time.Now().Add(-hb.Interval)
if !s.allDatabasesHealthy(healthySince) {
return
}
// Record ping attempt time before making the request to ensure we respect
// the configured interval even if the ping fails. This prevents rapid
// retries that could overwhelm the endpoint.
hb.RecordPing()
if err := hb.Ping(ctx); err != nil {
slog.Error("heartbeat ping failed", "url", hb.URL, "error", err)
return
}
slog.Debug("heartbeat ping sent", "url", hb.URL)
}
// allDatabasesHealthy returns true if all databases have synced successfully
// since the given time. Returns false if there are no databases.
func (s *Store) allDatabasesHealthy(since time.Time) bool {
dbs := s.DBs()
if len(dbs) == 0 {
return false
}
for _, db := range dbs {
lastSync := db.LastSuccessfulSyncAt()
if lastSync.IsZero() || lastSync.Before(since) {
return false
}
}
return true
}
// CompactDB performs a compaction or snapshot for a given database on a single destination level.
// This function will only proceed if a compaction has not occurred before the last compaction time.
func (s *Store) CompactDB(ctx context.Context, db *DB, lvl *CompactionLevel) (*ltx.FileInfo, error) {