mirror of
https://github.com/benbjohnson/litestream.git
synced 2026-01-25 05:06:30 +00:00
fix(metrics): update db_size, wal_size, and total_wal_bytes metrics (#882)
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
12
db.go
12
db.go
@@ -613,7 +613,12 @@ func (db *DB) Sync(ctx context.Context) (err error) {
|
||||
return fmt.Errorf("pos: %w", err)
|
||||
}
|
||||
db.txIDGauge.Set(float64(pos.TXID))
|
||||
// db.shadowWALSizeGauge.Set(float64(size))
|
||||
|
||||
// Update file size metrics.
|
||||
if fi, err := os.Stat(db.path); err == nil {
|
||||
db.dbSizeGauge.Set(float64(fi.Size()))
|
||||
}
|
||||
db.walSizeGauge.Set(float64(newWALSize))
|
||||
|
||||
// Notify replicas of WAL changes.
|
||||
// if changed {
|
||||
@@ -1091,6 +1096,11 @@ func (db *DB) sync(ctx context.Context, checkpointing bool, info syncInfo) error
|
||||
}
|
||||
assert(sz >= 0, fmt.Sprintf("wal size must be positive: sz=%d, maxOffset=%d, info.offset=%d", sz, maxOffset, info.offset))
|
||||
|
||||
// Track total WAL bytes synced.
|
||||
if sz > 0 {
|
||||
db.totalWALBytesCounter.Add(float64(sz))
|
||||
}
|
||||
|
||||
// Exit if we have no new WAL pages and we aren't snapshotting.
|
||||
if !info.snapshotting && sz == 0 {
|
||||
db.Logger.Log(ctx, internal.LevelTrace, "sync: skip", "reason", "no new wal pages")
|
||||
|
||||
@@ -1,6 +1,88 @@
|
||||
package litestream
|
||||
|
||||
import "testing"
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus/testutil"
|
||||
"github.com/superfly/ltx"
|
||||
_ "modernc.org/sqlite"
|
||||
)
|
||||
|
||||
// testReplicaClient is a minimal mock for testing that doesn't cause import cycles.
|
||||
type testReplicaClient struct {
|
||||
dir string
|
||||
}
|
||||
|
||||
func (c *testReplicaClient) Type() string { return "test" }
|
||||
|
||||
func (c *testReplicaClient) LTXFiles(_ context.Context, _ int, _ ltx.TXID, _ bool) (ltx.FileIterator, error) {
|
||||
return ltx.NewFileInfoSliceIterator(nil), nil
|
||||
}
|
||||
|
||||
func (c *testReplicaClient) OpenLTXFile(_ context.Context, _ int, _, _ ltx.TXID, _, _ int64) (io.ReadCloser, error) {
|
||||
return nil, os.ErrNotExist
|
||||
}
|
||||
|
||||
func (c *testReplicaClient) WriteLTXFile(_ context.Context, level int, minTXID, maxTXID ltx.TXID, r io.Reader) (*ltx.FileInfo, error) {
|
||||
data, err := io.ReadAll(r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
levelDir := filepath.Join(c.dir, fmt.Sprintf("l%d", level))
|
||||
if err := os.MkdirAll(levelDir, 0o755); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
path := filepath.Join(levelDir, ltx.FormatFilename(minTXID, maxTXID))
|
||||
if err := os.WriteFile(path, data, 0o600); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return <x.FileInfo{Level: level, MinTXID: minTXID, MaxTXID: maxTXID, Size: int64(len(data))}, nil
|
||||
}
|
||||
|
||||
func (c *testReplicaClient) DeleteLTXFiles(_ context.Context, _ []*ltx.FileInfo) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *testReplicaClient) DeleteAll(_ context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// errorReplicaClient is a replica client that returns errors for testing.
|
||||
type errorReplicaClient struct {
|
||||
writeErr error
|
||||
}
|
||||
|
||||
func (c *errorReplicaClient) Type() string { return "error" }
|
||||
|
||||
func (c *errorReplicaClient) LTXFiles(_ context.Context, _ int, _ ltx.TXID, _ bool) (ltx.FileIterator, error) {
|
||||
return ltx.NewFileInfoSliceIterator(nil), nil
|
||||
}
|
||||
|
||||
func (c *errorReplicaClient) OpenLTXFile(_ context.Context, _ int, _, _ ltx.TXID, _, _ int64) (io.ReadCloser, error) {
|
||||
return nil, os.ErrNotExist
|
||||
}
|
||||
|
||||
func (c *errorReplicaClient) WriteLTXFile(_ context.Context, _ int, _, _ ltx.TXID, _ io.Reader) (*ltx.FileInfo, error) {
|
||||
if c.writeErr != nil {
|
||||
return nil, c.writeErr
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (c *errorReplicaClient) DeleteLTXFiles(_ context.Context, _ []*ltx.FileInfo) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *errorReplicaClient) DeleteAll(_ context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// TestCalcWALSize ensures calcWALSize doesn't overflow with large page sizes.
|
||||
// Regression test for uint32 overflow bug where large page sizes (>=16KB)
|
||||
@@ -69,3 +151,235 @@ func TestCalcWALSize(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestDB_Sync_UpdatesMetrics verifies that DB size, WAL size, and total WAL bytes
|
||||
// metrics are properly updated during sync operations.
|
||||
// Regression test for issue #876: metrics were defined but never updated.
|
||||
func TestDB_Sync_UpdatesMetrics(t *testing.T) {
|
||||
// Set up database manually (can't use testingutil due to import cycle)
|
||||
dir := t.TempDir()
|
||||
dbPath := filepath.Join(dir, "db")
|
||||
|
||||
// Create and open litestream DB
|
||||
db := NewDB(dbPath)
|
||||
db.MonitorInterval = 0 // disable background goroutine
|
||||
db.Replica = NewReplica(db)
|
||||
db.Replica.Client = &testReplicaClient{dir: t.TempDir()}
|
||||
db.Replica.MonitorEnabled = false
|
||||
if err := db.Open(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer func() {
|
||||
if err := db.Close(context.Background()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Open SQL connection
|
||||
sqldb, err := sql.Open("sqlite", dbPath)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer sqldb.Close()
|
||||
|
||||
if _, err := sqldb.Exec(`PRAGMA journal_mode = wal;`); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Insert data to create DB and WAL content
|
||||
if _, err := sqldb.Exec(`CREATE TABLE t (id INT, data TEXT)`); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := sqldb.Exec(`INSERT INTO t VALUES (1, 'test data')`); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Sync to trigger metric updates
|
||||
if err := db.Sync(context.Background()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Verify DB size metric matches actual file size
|
||||
dbSizeMetric := dbSizeGaugeVec.WithLabelValues(db.Path())
|
||||
dbSizeValue := testutil.ToFloat64(dbSizeMetric)
|
||||
dbFileInfo, err := os.Stat(db.Path())
|
||||
if err != nil {
|
||||
t.Fatalf("failed to stat db file: %v", err)
|
||||
}
|
||||
if dbSizeValue != float64(dbFileInfo.Size()) {
|
||||
t.Fatalf("litestream_db_size=%v, want %v", dbSizeValue, dbFileInfo.Size())
|
||||
}
|
||||
|
||||
// Verify WAL size metric matches actual file size
|
||||
walSizeMetric := walSizeGaugeVec.WithLabelValues(db.Path())
|
||||
walSizeValue := testutil.ToFloat64(walSizeMetric)
|
||||
walFileInfo, err := os.Stat(db.WALPath())
|
||||
if err != nil {
|
||||
t.Fatalf("failed to stat wal file: %v", err)
|
||||
}
|
||||
if walSizeValue != float64(walFileInfo.Size()) {
|
||||
t.Fatalf("litestream_wal_size=%v, want %v", walSizeValue, walFileInfo.Size())
|
||||
}
|
||||
|
||||
// Verify total WAL bytes counter was incremented
|
||||
totalWALMetric := totalWALBytesCounterVec.WithLabelValues(db.Path())
|
||||
totalWALValue := testutil.ToFloat64(totalWALMetric)
|
||||
if totalWALValue <= 0 {
|
||||
t.Fatalf("litestream_total_wal_bytes=%v, want > 0", totalWALValue)
|
||||
}
|
||||
|
||||
// Verify txid metric was updated (should be > 0 after writes)
|
||||
txidMetric := txIDIndexGaugeVec.WithLabelValues(db.Path())
|
||||
txidValue := testutil.ToFloat64(txidMetric)
|
||||
if txidValue <= 0 {
|
||||
t.Fatalf("litestream_txid=%v, want > 0", txidValue)
|
||||
}
|
||||
|
||||
// Verify sync count was incremented
|
||||
syncCountMetric := syncNCounterVec.WithLabelValues(db.Path())
|
||||
syncCountValue := testutil.ToFloat64(syncCountMetric)
|
||||
if syncCountValue <= 0 {
|
||||
t.Fatalf("litestream_sync_count=%v, want > 0", syncCountValue)
|
||||
}
|
||||
|
||||
// Verify sync seconds was recorded
|
||||
syncSecondsMetric := syncSecondsCounterVec.WithLabelValues(db.Path())
|
||||
syncSecondsValue := testutil.ToFloat64(syncSecondsMetric)
|
||||
if syncSecondsValue <= 0 {
|
||||
t.Fatalf("litestream_sync_seconds=%v, want > 0", syncSecondsValue)
|
||||
}
|
||||
}
|
||||
|
||||
// TestDB_Checkpoint_UpdatesMetrics verifies that checkpoint metrics are updated.
|
||||
func TestDB_Checkpoint_UpdatesMetrics(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
dbPath := filepath.Join(dir, "db")
|
||||
|
||||
db := NewDB(dbPath)
|
||||
db.MonitorInterval = 0
|
||||
db.Replica = NewReplica(db)
|
||||
db.Replica.Client = &testReplicaClient{dir: t.TempDir()}
|
||||
db.Replica.MonitorEnabled = false
|
||||
if err := db.Open(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer func() {
|
||||
if err := db.Close(context.Background()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}()
|
||||
|
||||
sqldb, err := sql.Open("sqlite", dbPath)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer sqldb.Close()
|
||||
|
||||
if _, err := sqldb.Exec(`PRAGMA journal_mode = wal;`); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if _, err := sqldb.Exec(`CREATE TABLE t (id INT, data TEXT)`); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := sqldb.Exec(`INSERT INTO t VALUES (1, 'test data')`); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Sync first to initialize database state
|
||||
if err := db.Sync(context.Background()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Get baseline checkpoint metrics
|
||||
baselineCount := testutil.ToFloat64(checkpointNCounterVec.WithLabelValues(db.Path(), "PASSIVE"))
|
||||
baselineSeconds := testutil.ToFloat64(checkpointSecondsCounterVec.WithLabelValues(db.Path(), "PASSIVE"))
|
||||
|
||||
// Force checkpoint
|
||||
if err := db.Checkpoint(context.Background(), "PASSIVE"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Verify checkpoint_count was incremented
|
||||
checkpointCountMetric := checkpointNCounterVec.WithLabelValues(db.Path(), "PASSIVE")
|
||||
checkpointCountValue := testutil.ToFloat64(checkpointCountMetric)
|
||||
if checkpointCountValue <= baselineCount {
|
||||
t.Fatalf("litestream_checkpoint_count=%v, want > %v", checkpointCountValue, baselineCount)
|
||||
}
|
||||
|
||||
// Verify checkpoint_seconds was recorded
|
||||
checkpointSecondsMetric := checkpointSecondsCounterVec.WithLabelValues(db.Path(), "PASSIVE")
|
||||
checkpointSecondsValue := testutil.ToFloat64(checkpointSecondsMetric)
|
||||
if checkpointSecondsValue <= baselineSeconds {
|
||||
t.Fatalf("litestream_checkpoint_seconds=%v, want > %v", checkpointSecondsValue, baselineSeconds)
|
||||
}
|
||||
}
|
||||
|
||||
// TestDB_Sync_ErrorMetrics verifies that sync error counter is incremented on failure.
|
||||
func TestDB_Sync_ErrorMetrics(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
dbPath := filepath.Join(dir, "db")
|
||||
|
||||
errorClient := &errorReplicaClient{writeErr: errors.New("simulated write error")}
|
||||
workingClient := &testReplicaClient{dir: t.TempDir()}
|
||||
|
||||
db := NewDB(dbPath)
|
||||
db.MonitorInterval = 0
|
||||
db.Replica = NewReplica(db)
|
||||
db.Replica.Client = workingClient // Start with working client
|
||||
db.Replica.MonitorEnabled = false
|
||||
if err := db.Open(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer func() {
|
||||
// Switch back to working client for clean close
|
||||
db.Replica.Client = workingClient
|
||||
_ = db.Close(context.Background())
|
||||
}()
|
||||
|
||||
sqldb, err := sql.Open("sqlite", dbPath)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer sqldb.Close()
|
||||
|
||||
if _, err := sqldb.Exec(`PRAGMA journal_mode = wal;`); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if _, err := sqldb.Exec(`CREATE TABLE t (id INT, data TEXT)`); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := sqldb.Exec(`INSERT INTO t VALUES (1, 'test data')`); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// First sync with working client to initialize
|
||||
if err := db.Sync(context.Background()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Insert more data
|
||||
if _, err := sqldb.Exec(`INSERT INTO t VALUES (2, 'more data')`); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Get baseline error count
|
||||
baselineErrors := testutil.ToFloat64(syncErrorNCounterVec.WithLabelValues(db.Path()))
|
||||
|
||||
// Switch to error client
|
||||
db.Replica.Client = errorClient
|
||||
|
||||
// Sync should fail due to error replica client
|
||||
err = db.Sync(context.Background())
|
||||
if err == nil {
|
||||
t.Skip("sync did not return error, skipping error metric test")
|
||||
}
|
||||
|
||||
// Verify sync_error_count was incremented
|
||||
syncErrorMetric := syncErrorNCounterVec.WithLabelValues(db.Path())
|
||||
syncErrorValue := testutil.ToFloat64(syncErrorMetric)
|
||||
if syncErrorValue <= baselineErrors {
|
||||
t.Fatalf("litestream_sync_error_count=%v, want > %v", syncErrorValue, baselineErrors)
|
||||
}
|
||||
}
|
||||
|
||||
51
internal/internal_test.go
Normal file
51
internal/internal_test.go
Normal file
@@ -0,0 +1,51 @@
|
||||
package internal
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus/testutil"
|
||||
)
|
||||
|
||||
// TestReplicaOperationMetrics verifies that replica operation metrics are properly
|
||||
// incremented when operations are performed.
|
||||
func TestReplicaOperationMetrics(t *testing.T) {
|
||||
// Get baseline counts for a unique test label to avoid interference
|
||||
testType := "test-metrics"
|
||||
initialTotal := testutil.ToFloat64(
|
||||
OperationTotalCounterVec.WithLabelValues(testType, "PUT"))
|
||||
initialBytes := testutil.ToFloat64(
|
||||
OperationBytesCounterVec.WithLabelValues(testType, "PUT"))
|
||||
|
||||
// Simulate PUT operation
|
||||
OperationTotalCounterVec.WithLabelValues(testType, "PUT").Inc()
|
||||
OperationBytesCounterVec.WithLabelValues(testType, "PUT").Add(1024)
|
||||
|
||||
// Verify total counter was incremented
|
||||
newTotal := testutil.ToFloat64(
|
||||
OperationTotalCounterVec.WithLabelValues(testType, "PUT"))
|
||||
if newTotal != initialTotal+1 {
|
||||
t.Fatalf("litestream_replica_operation_total=%v, want %v", newTotal, initialTotal+1)
|
||||
}
|
||||
|
||||
// Verify bytes counter was incremented
|
||||
newBytes := testutil.ToFloat64(
|
||||
OperationBytesCounterVec.WithLabelValues(testType, "PUT"))
|
||||
if newBytes != initialBytes+1024 {
|
||||
t.Fatalf("litestream_replica_operation_bytes=%v, want %v", newBytes, initialBytes+1024)
|
||||
}
|
||||
|
||||
// Test other operations
|
||||
operations := []string{"GET", "DELETE", "LIST"}
|
||||
for _, op := range operations {
|
||||
baseTotal := testutil.ToFloat64(
|
||||
OperationTotalCounterVec.WithLabelValues(testType, op))
|
||||
|
||||
OperationTotalCounterVec.WithLabelValues(testType, op).Inc()
|
||||
|
||||
afterTotal := testutil.ToFloat64(
|
||||
OperationTotalCounterVec.WithLabelValues(testType, op))
|
||||
if afterTotal != baseTotal+1 {
|
||||
t.Fatalf("litestream_replica_operation_total[%s]=%v, want %v", op, afterTotal, baseTotal+1)
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user