fix: remove local LTX files during retention enforcement to prevent unbounded disk usage (#795)

Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
Cory LaNou
2025-10-14 17:07:31 -05:00
committed by GitHub
parent 5008d4541c
commit d4dfb33e46
8 changed files with 145 additions and 13 deletions

View File

@@ -41,6 +41,7 @@ var _ litestream.ReplicaClient = (*ReplicaClient)(nil)
type ReplicaClient struct {
mu sync.Mutex
client *azblob.Client
logger *slog.Logger
// Azure credentials
AccountName string
@@ -54,7 +55,9 @@ type ReplicaClient struct {
// NewReplicaClient returns a new instance of ReplicaClient.
func NewReplicaClient() *ReplicaClient {
return &ReplicaClient{}
return &ReplicaClient{
logger: slog.Default().WithGroup(ReplicaClientType),
}
}
// Type returns "abs" as the client type.
@@ -243,6 +246,9 @@ func (c *ReplicaClient) DeleteLTXFiles(ctx context.Context, a []*ltx.FileInfo) e
for _, info := range a {
key := litestream.LTXFilePath(c.Path, info.Level, info.MinTXID, info.MaxTXID)
c.logger.Debug("deleting ltx file", "level", info.Level, "minTXID", info.MinTXID, "maxTXID", info.MaxTXID, "key", key)
_, err := c.client.DeleteBlob(ctx, c.Bucket, key, nil)
if isNotExists(err) {
continue

28
db.go
View File

@@ -1537,14 +1537,20 @@ func (db *DB) EnforceSnapshotRetention(ctx context.Context, timestamp time.Time)
deleted = deleted[:len(deleted)-1]
}
// Remove all files marked for deletion.
for _, info := range deleted {
db.Logger.Info("deleting ltx file", "level", SnapshotLevel, "minTXID", info.MinTXID, "maxTXID", info.MaxTXID)
}
// Remove all files marked for deletion from both remote and local storage.
if err := db.Replica.Client.DeleteLTXFiles(ctx, deleted); err != nil {
return 0, fmt.Errorf("remove ltx files: %w", err)
}
for _, info := range deleted {
localPath := db.LTXPath(SnapshotLevel, info.MinTXID, info.MaxTXID)
db.Logger.Debug("deleting local ltx file", "level", SnapshotLevel, "minTXID", info.MinTXID, "maxTXID", info.MaxTXID, "path", localPath)
if err := os.Remove(localPath); err != nil && !os.IsNotExist(err) {
db.Logger.Error("failed to remove local ltx file", "path", localPath, "error", err)
}
}
return minSnapshotTXID, nil
}
@@ -1578,14 +1584,20 @@ func (db *DB) EnforceRetentionByTXID(ctx context.Context, level int, txID ltx.TX
deleted = deleted[:len(deleted)-1]
}
// Remove all files marked for deletion.
for _, info := range deleted {
db.Logger.Info("deleting ltx file", "level", level, "minTXID", info.MinTXID, "maxTXID", info.MaxTXID)
}
// Remove all files marked for deletion from both remote and local storage.
if err := db.Replica.Client.DeleteLTXFiles(ctx, deleted); err != nil {
return fmt.Errorf("remove ltx files: %w", err)
}
for _, info := range deleted {
localPath := db.LTXPath(level, info.MinTXID, info.MaxTXID)
db.Logger.Debug("deleting local ltx file", "level", level, "minTXID", info.MinTXID, "maxTXID", info.MaxTXID, "path", localPath)
if err := os.Remove(localPath); err != nil && !os.IsNotExist(err) {
db.Logger.Error("failed to remove local ltx file", "path", localPath, "error", err)
}
}
return nil
}

View File

@@ -757,3 +757,88 @@ func TestCompaction_PreservesLastTimestamp(t *testing.T) {
t.Error("L1 file timestamp should preserve last source file timestamp")
}
}
func TestDB_EnforceRetentionByTXID_LocalCleanup(t *testing.T) {
ctx := context.Background()
db, sqldb := testingutil.MustOpenDBs(t)
defer testingutil.MustCloseDBs(t, db, sqldb)
replicaPath := filepath.Join(t.TempDir(), "replica")
client := file.NewReplicaClient(replicaPath)
db.Replica = litestream.NewReplicaWithClient(db, client)
db.Replica.MonitorEnabled = false
if _, err := sqldb.ExecContext(ctx, `CREATE TABLE t (id INTEGER PRIMARY KEY, val TEXT)`); err != nil {
t.Fatalf("create table: %v", err)
}
type localFile struct {
path string
minTXID ltx.TXID
maxTXID ltx.TXID
}
var firstBatchL0Files []localFile
for i := 0; i < 3; i++ {
if _, err := sqldb.ExecContext(ctx, `INSERT INTO t (val) VALUES (?)`, fmt.Sprintf("batch1-value-%d", i)); err != nil {
t.Fatalf("insert batch1 %d: %v", i, err)
}
if err := db.Sync(ctx); err != nil {
t.Fatalf("sync db batch1 %d: %v", i, err)
}
minTXID, maxTXID, err := db.MaxLTX()
if err != nil {
t.Fatalf("get max ltx: %v", err)
}
localPath := db.LTXPath(0, minTXID, maxTXID)
firstBatchL0Files = append(firstBatchL0Files, localFile{
path: localPath,
minTXID: minTXID,
maxTXID: maxTXID,
})
if err := db.Replica.Sync(ctx); err != nil {
t.Fatalf("sync replica batch1 %d: %v", i, err)
}
}
for _, lf := range firstBatchL0Files {
if _, err := os.Stat(lf.path); os.IsNotExist(err) {
t.Fatalf("local L0 file should exist before first compaction: %s", lf.path)
}
}
if _, err := db.Compact(ctx, 1); err != nil {
t.Fatalf("compact batch1 to L1: %v", err)
}
for i := 0; i < 3; i++ {
if _, err := sqldb.ExecContext(ctx, `INSERT INTO t (val) VALUES (?)`, fmt.Sprintf("batch2-value-%d", i)); err != nil {
t.Fatalf("insert batch2 %d: %v", i, err)
}
if err := db.Sync(ctx); err != nil {
t.Fatalf("sync db batch2 %d: %v", i, err)
}
if err := db.Replica.Sync(ctx); err != nil {
t.Fatalf("sync replica batch2 %d: %v", i, err)
}
}
secondCompactInfo, err := db.Compact(ctx, 1)
if err != nil {
t.Fatalf("compact batch2 to L1: %v", err)
}
for _, lf := range firstBatchL0Files {
if lf.maxTXID < secondCompactInfo.MinTXID {
if _, err := os.Stat(lf.path); err == nil {
t.Errorf("local L0 file should be removed after second compaction: %s (maxTXID=%s < minTXID=%s)",
lf.path, lf.maxTXID, secondCompactInfo.MinTXID)
} else if !os.IsNotExist(err) {
t.Fatalf("unexpected error checking local file: %v", err)
}
}
}
}

View File

@@ -5,6 +5,7 @@ import (
"context"
"fmt"
"io"
"log/slog"
"os"
"path/filepath"
"time"
@@ -25,12 +26,14 @@ type ReplicaClient struct {
path string // destination path
Replica *litestream.Replica
logger *slog.Logger
}
// NewReplicaClient returns a new instance of ReplicaClient.
func NewReplicaClient(path string) *ReplicaClient {
return &ReplicaClient{
path: path,
logger: slog.Default().WithGroup(ReplicaClientType),
path: path,
}
}
@@ -199,6 +202,8 @@ func (c *ReplicaClient) DeleteLTXFiles(ctx context.Context, a []*ltx.FileInfo) e
for _, info := range a {
filename := c.LTXFilePath(info.Level, info.MinTXID, info.MaxTXID)
c.logger.Debug("deleting ltx file", "level", info.Level, "minTXID", info.MinTXID, "maxTXID", info.MaxTXID, "path", filename)
if err := os.Remove(filename); err != nil && !os.IsNotExist(err) {
return err
}

View File

@@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"io"
"log/slog"
"os"
"path"
"sync"
@@ -32,6 +33,7 @@ type ReplicaClient struct {
mu sync.Mutex
client *storage.Client // gs client
bkt *storage.BucketHandle // gs bucket handle
logger *slog.Logger
// GS bucket information
Bucket string
@@ -40,7 +42,9 @@ type ReplicaClient struct {
// NewReplicaClient returns a new instance of ReplicaClient.
func NewReplicaClient() *ReplicaClient {
return &ReplicaClient{}
return &ReplicaClient{
logger: slog.Default().WithGroup(ReplicaClientType),
}
}
// Type returns "gs" as the client type.
@@ -190,6 +194,9 @@ func (c *ReplicaClient) DeleteLTXFiles(ctx context.Context, a []*ltx.FileInfo) e
for _, info := range a {
key := litestream.LTXFilePath(c.Path, info.Level, info.MinTXID, info.MaxTXID)
c.logger.Debug("deleting ltx file", "level", info.Level, "minTXID", info.MinTXID, "maxTXID", info.MaxTXID, "key", key)
if err := c.bkt.Object(key).Delete(ctx); err != nil && !isNotExists(err) {
return fmt.Errorf("gs: cannot delete ltx file %q: %w", key, err)
}

View File

@@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"io"
"log/slog"
"os"
"sort"
"strconv"
@@ -31,7 +32,8 @@ var _ litestream.ReplicaClient = (*ReplicaClient)(nil)
// ReplicaClient is a client for writing LTX files to NATS JetStream Object Store.
type ReplicaClient struct {
mu sync.Mutex
mu sync.Mutex
logger *slog.Logger
// NATS connection and JetStream context
nc *nats.Conn
@@ -72,6 +74,7 @@ type ReplicaClient struct {
// NewReplicaClient returns a new instance of ReplicaClient.
func NewReplicaClient() *ReplicaClient {
return &ReplicaClient{
logger: slog.Default().WithGroup(ReplicaClientType),
MaxReconnects: -1, // Unlimited
ReconnectWait: 2 * time.Second,
Timeout: 10 * time.Second,
@@ -395,6 +398,8 @@ func (c *ReplicaClient) DeleteLTXFiles(ctx context.Context, a []*ltx.FileInfo) e
for _, fileInfo := range a {
objectPath := c.ltxPath(fileInfo.Level, fileInfo.MinTXID, fileInfo.MaxTXID)
c.logger.Debug("deleting ltx file", "level", fileInfo.Level, "minTXID", fileInfo.MinTXID, "maxTXID", fileInfo.MaxTXID, "path", objectPath)
if err := c.objectStore.Delete(ctx, objectPath); err != nil {
if !isNotFoundError(err) {
return fmt.Errorf("failed to delete object %s: %w", objectPath, err)

View File

@@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"io"
"log/slog"
"net"
"net/http"
"net/url"
@@ -51,6 +52,7 @@ type ReplicaClient struct {
mu sync.Mutex
s3 *s3.Client // s3 service
uploader *manager.Uploader
logger *slog.Logger
// AWS authentication keys.
AccessKeyID string
@@ -71,7 +73,9 @@ type ReplicaClient struct {
// NewReplicaClient returns a new instance of ReplicaClient.
func NewReplicaClient() *ReplicaClient {
return &ReplicaClient{}
return &ReplicaClient{
logger: slog.Default().WithGroup(ReplicaClientType),
}
}
// Type returns "s3" as the client type.
@@ -390,6 +394,8 @@ func (c *ReplicaClient) DeleteLTXFiles(ctx context.Context, a []*ltx.FileInfo) e
filename := ltx.FormatFilename(info.MinTXID, info.MaxTXID)
key := c.Path + "/" + fmt.Sprintf("%04x/%s", info.Level, filename)
objIDs = append(objIDs, types.ObjectIdentifier{Key: aws.String(key)})
c.logger.Debug("deleting ltx file", "level", info.Level, "minTXID", info.MinTXID, "maxTXID", info.MaxTXID, "key", key)
}
// Delete in batches

View File

@@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"io"
"log/slog"
"net"
"os"
"path"
@@ -35,6 +36,7 @@ type ReplicaClient struct {
mu sync.Mutex
sshClient *ssh.Client
sftpClient *sftp.Client
logger *slog.Logger
// SFTP connection info
Host string
@@ -52,6 +54,7 @@ type ReplicaClient struct {
// NewReplicaClient returns a new instance of ReplicaClient.
func NewReplicaClient() *ReplicaClient {
return &ReplicaClient{
logger: slog.Default().WithGroup(ReplicaClientType),
DialTimeout: DefaultDialTimeout,
ConcurrentWrites: true, // Default to true for better performance
}
@@ -308,6 +311,9 @@ func (c *ReplicaClient) DeleteLTXFiles(ctx context.Context, a []*ltx.FileInfo) (
for _, info := range a {
filename := litestream.LTXFilePath(c.Path, info.Level, info.MinTXID, info.MaxTXID)
c.logger.Debug("deleting ltx file", "level", info.Level, "minTXID", info.MinTXID, "maxTXID", info.MaxTXID, "path", filename)
if err := sftpClient.Remove(filename); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("sftp: cannot delete ltx file %q: %w", filename, err)
}