mirror of
https://github.com/benbjohnson/litestream.git
synced 2026-01-25 05:06:30 +00:00
feat(s3): improve observability for S3 operations and L0 retention (#996)
Co-authored-by: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
38
db.go
38
db.go
@@ -1893,7 +1893,9 @@ func (db *DB) EnforceL0RetentionByTime(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
db.Logger.Debug("enforcing l0 retention", "retention", db.L0Retention)
|
db.Logger.Debug("starting l0 retention enforcement", "retention", db.L0Retention)
|
||||||
|
|
||||||
|
dbName := filepath.Base(db.Path())
|
||||||
|
|
||||||
// Determine the highest TXID that has been compacted into L1.
|
// Determine the highest TXID that has been compacted into L1.
|
||||||
itr, err := db.Replica.Client.LTXFiles(ctx, 1, 0, false)
|
itr, err := db.Replica.Client.LTXFiles(ctx, 1, 0, false)
|
||||||
@@ -1911,6 +1913,9 @@ func (db *DB) EnforceL0RetentionByTime(ctx context.Context) error {
|
|||||||
return fmt.Errorf("close l1 iterator: %w", err)
|
return fmt.Errorf("close l1 iterator: %w", err)
|
||||||
}
|
}
|
||||||
if maxL1TXID == 0 {
|
if maxL1TXID == 0 {
|
||||||
|
internal.L0RetentionGaugeVec.WithLabelValues(dbName, "eligible").Set(0)
|
||||||
|
internal.L0RetentionGaugeVec.WithLabelValues(dbName, "not_compacted").Set(0)
|
||||||
|
internal.L0RetentionGaugeVec.WithLabelValues(dbName, "too_recent").Set(0)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1922,13 +1927,17 @@ func (db *DB) EnforceL0RetentionByTime(ctx context.Context) error {
|
|||||||
defer itr.Close()
|
defer itr.Close()
|
||||||
|
|
||||||
var (
|
var (
|
||||||
deleted []*ltx.FileInfo
|
deleted []*ltx.FileInfo
|
||||||
lastInfo *ltx.FileInfo
|
lastInfo *ltx.FileInfo
|
||||||
processedAll = true
|
processedAll = true
|
||||||
|
totalFiles int
|
||||||
|
notCompactedCount int
|
||||||
|
tooRecentCount int
|
||||||
)
|
)
|
||||||
for itr.Next() {
|
for itr.Next() {
|
||||||
info := itr.Item()
|
info := itr.Item()
|
||||||
lastInfo = info
|
lastInfo = info
|
||||||
|
totalFiles++
|
||||||
|
|
||||||
createdAt := info.CreatedAt
|
createdAt := info.CreatedAt
|
||||||
if createdAt.IsZero() {
|
if createdAt.IsZero() {
|
||||||
@@ -1943,11 +1952,21 @@ func (db *DB) EnforceL0RetentionByTime(ctx context.Context) error {
|
|||||||
// L0 entries are ordered; once we reach a newer file we stop so we don't
|
// L0 entries are ordered; once we reach a newer file we stop so we don't
|
||||||
// create gaps between retained files. VFS expects contiguous coverage.
|
// create gaps between retained files. VFS expects contiguous coverage.
|
||||||
processedAll = false
|
processedAll = false
|
||||||
|
tooRecentCount++
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
if info.MaxTXID <= maxL1TXID {
|
if info.MaxTXID <= maxL1TXID {
|
||||||
deleted = append(deleted, info)
|
deleted = append(deleted, info)
|
||||||
|
} else {
|
||||||
|
notCompactedCount++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Count remaining files as too_recent if we stopped early
|
||||||
|
if !processedAll {
|
||||||
|
for itr.Next() {
|
||||||
|
tooRecentCount++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1956,6 +1975,17 @@ func (db *DB) EnforceL0RetentionByTime(ctx context.Context) error {
|
|||||||
deleted = deleted[:len(deleted)-1]
|
deleted = deleted[:len(deleted)-1]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
internal.L0RetentionGaugeVec.WithLabelValues(dbName, "eligible").Set(float64(len(deleted)))
|
||||||
|
internal.L0RetentionGaugeVec.WithLabelValues(dbName, "not_compacted").Set(float64(notCompactedCount))
|
||||||
|
internal.L0RetentionGaugeVec.WithLabelValues(dbName, "too_recent").Set(float64(tooRecentCount))
|
||||||
|
|
||||||
|
db.Logger.Debug("l0 retention scan complete",
|
||||||
|
"total_l0_files", totalFiles,
|
||||||
|
"eligible_for_deletion", len(deleted),
|
||||||
|
"not_compacted_yet", notCompactedCount,
|
||||||
|
"too_recent", tooRecentCount,
|
||||||
|
"max_l1_txid", maxL1TXID)
|
||||||
|
|
||||||
if len(deleted) == 0 {
|
if len(deleted) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -148,4 +148,20 @@ var (
|
|||||||
Name: "litestream_replica_operation_bytes",
|
Name: "litestream_replica_operation_bytes",
|
||||||
Help: "The number of bytes used by replica operations",
|
Help: "The number of bytes used by replica operations",
|
||||||
}, []string{"replica_type", "operation"})
|
}, []string{"replica_type", "operation"})
|
||||||
|
|
||||||
|
OperationDurationHistogramVec = promauto.NewHistogramVec(prometheus.HistogramOpts{
|
||||||
|
Name: "litestream_replica_operation_duration_seconds",
|
||||||
|
Help: "Duration of replica operations by type and operation",
|
||||||
|
Buckets: []float64{0.01, 0.05, 0.1, 0.5, 1, 5, 10, 30, 60},
|
||||||
|
}, []string{"replica_type", "operation"})
|
||||||
|
|
||||||
|
OperationErrorCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||||
|
Name: "litestream_replica_operation_errors_total",
|
||||||
|
Help: "Number of replica operation errors by type, operation, and error code",
|
||||||
|
}, []string{"replica_type", "operation", "code"})
|
||||||
|
|
||||||
|
L0RetentionGaugeVec = promauto.NewGaugeVec(prometheus.GaugeOpts{
|
||||||
|
Name: "litestream_l0_retention_files_total",
|
||||||
|
Help: "Number of L0 files by status during retention enforcement",
|
||||||
|
}, []string{"db", "status"})
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -978,10 +978,14 @@ func (c *ReplicaClient) DeleteLTXFiles(ctx context.Context, a []*ltx.FileInfo) e
|
|||||||
|
|
||||||
c.logger.Debug("deleting ltx files batch", "count", n)
|
c.logger.Debug("deleting ltx files batch", "count", n)
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
out, err := c.s3.DeleteObjects(ctx, &s3.DeleteObjectsInput{
|
out, err := c.s3.DeleteObjects(ctx, &s3.DeleteObjectsInput{
|
||||||
Bucket: aws.String(c.Bucket),
|
Bucket: aws.String(c.Bucket),
|
||||||
Delete: &types.Delete{Objects: objIDs[:n]},
|
Delete: &types.Delete{Objects: objIDs[:n]},
|
||||||
})
|
})
|
||||||
|
duration := time.Since(start)
|
||||||
|
internal.OperationDurationHistogramVec.WithLabelValues(ReplicaClientType, "DELETE").Observe(duration.Seconds())
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("s3: delete batch of %d objects: %w", n, err)
|
return fmt.Errorf("s3: delete batch of %d objects: %w", n, err)
|
||||||
}
|
}
|
||||||
@@ -990,9 +994,30 @@ func (c *ReplicaClient) DeleteLTXFiles(ctx context.Context, a []*ltx.FileInfo) e
|
|||||||
if out != nil {
|
if out != nil {
|
||||||
deleted = len(out.Deleted)
|
deleted = len(out.Deleted)
|
||||||
}
|
}
|
||||||
c.logger.Debug("delete batch completed", "requested", n, "deleted", deleted, "errors", len(out.Errors))
|
c.logger.Debug("delete batch completed",
|
||||||
|
"requested", n,
|
||||||
|
"deleted", deleted,
|
||||||
|
"errors", len(out.Errors),
|
||||||
|
"duration_ms", duration.Milliseconds())
|
||||||
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Add(float64(deleted))
|
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Add(float64(deleted))
|
||||||
|
|
||||||
|
if len(out.Errors) > 0 {
|
||||||
|
for i, e := range out.Errors {
|
||||||
|
code := aws.ToString(e.Code)
|
||||||
|
internal.OperationErrorCounterVec.WithLabelValues(ReplicaClientType, "DELETE", code).Inc()
|
||||||
|
|
||||||
|
if i < 5 {
|
||||||
|
c.logger.Warn("delete object failed",
|
||||||
|
"key", aws.ToString(e.Key),
|
||||||
|
"code", code,
|
||||||
|
"message", aws.ToString(e.Message))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(out.Errors) > 5 {
|
||||||
|
c.logger.Warn("additional delete errors suppressed", "count", len(out.Errors)-5)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if err := deleteOutputError(out); err != nil {
|
if err := deleteOutputError(out); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user