diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index 4dd3aed..335beb5 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -1174,9 +1174,11 @@ func NewReplicaFromConfig(c *ReplicaConfig, db *litestream.DB) (_ *litestream.Re return nil, err } case "s3": - if r.Client, err = NewS3ReplicaClientFromConfig(c, r); err != nil { + var client *s3.ReplicaClient + if client, err = NewS3ReplicaClientFromConfig(c, r); err != nil { return nil, err } + r.Client = client case "gs": if r.Client, err = newGSReplicaClientFromConfig(c, r); err != nil { return nil, err diff --git a/db.go b/db.go index 5cae409..98ee35b 100644 --- a/db.go +++ b/db.go @@ -1793,7 +1793,7 @@ func (db *DB) Snapshot(ctx context.Context) (*ltx.FileInfo, error) { if err != nil { return nil, err } - info, err := db.Replica.Client.WriteLTXFile(ctx, SnapshotLevel, 1, pos.TXID, r) + info, err := db.Replica.WriteLTXFile(ctx, SnapshotLevel, 1, pos.TXID, r) if err != nil { return info, err } @@ -1841,7 +1841,7 @@ func (db *DB) EnforceSnapshotRetention(ctx context.Context, timestamp time.Time) } // Remove all files marked for deletion from both remote and local storage. - if err := db.Replica.Client.DeleteLTXFiles(ctx, deleted); err != nil { + if err := db.Replica.DeleteLTXFiles(ctx, deleted); err != nil { return 0, fmt.Errorf("remove ltx files: %w", err) } @@ -1931,7 +1931,7 @@ func (db *DB) EnforceL0RetentionByTime(ctx context.Context) error { return nil } - if err := db.Replica.Client.DeleteLTXFiles(ctx, deleted); err != nil { + if err := db.Replica.DeleteLTXFiles(ctx, deleted); err != nil { return fmt.Errorf("remove expired l0 files: %w", err) } diff --git a/leaser.go b/leaser.go index df7f64e..11aa462 100644 --- a/leaser.go +++ b/leaser.go @@ -9,13 +9,14 @@ import ( // DefaultLeaseTimeout is the default duration a lease is held before expiring. const DefaultLeaseTimeout = 30 * time.Second -// LeaseRetryInterval is the interval between lease acquisition attempts when -// an existing lease is held by another instance. -const LeaseRetryInterval = 1 * time.Second - // Leaser represents a client for a distributed leasing service. // It uses epoch-based leader election where each leadership change increments // the epoch, providing a fencing token to prevent split-brain scenarios. +// +// Leases are not zero-downtime: a graceful shutdown should ReleaseLease for +// immediate takeover, otherwise a standby must wait for the lease to expire. +// Leases are intended to be scoped globally across all databases managed by a +// single process. type Leaser interface { // Type returns the name of the implementation (e.g. "s3"). Type() string diff --git a/replica.go b/replica.go index 2b18f97..e8d2e89 100644 --- a/replica.go +++ b/replica.go @@ -121,6 +121,7 @@ func (r *Replica) Stop(hard bool) (err error) { err = e } } + return err } @@ -180,7 +181,7 @@ func (r *Replica) uploadLTXFile(ctx context.Context, level int, minTXID, maxTXID } defer func() { _ = f.Close() }() - if _, err := r.Client.WriteLTXFile(ctx, level, minTXID, maxTXID, f); err != nil { + if _, err := r.WriteLTXFile(ctx, level, minTXID, maxTXID, f); err != nil { return fmt.Errorf("write ltx file: %w", err) } r.Logger().Debug("ltx file uploaded", "filename", filename, "minTXID", minTXID, "maxTXID", maxTXID) @@ -192,6 +193,21 @@ func (r *Replica) uploadLTXFile(ctx context.Context, level int, minTXID, maxTXID return nil } +// WriteLTXFile writes an LTX file to the replica. +func (r *Replica) WriteLTXFile(ctx context.Context, level int, minTXID, maxTXID ltx.TXID, rd io.Reader) (*ltx.FileInfo, error) { + return r.Client.WriteLTXFile(ctx, level, minTXID, maxTXID, rd) +} + +// DeleteLTXFiles deletes LTX files from the replica. +func (r *Replica) DeleteLTXFiles(ctx context.Context, files []*ltx.FileInfo) error { + return r.Client.DeleteLTXFiles(ctx, files) +} + +// DeleteAll removes all replica data. +func (r *Replica) DeleteAll(ctx context.Context) error { + return r.Client.DeleteAll(ctx) +} + // calcPos returns the last position saved to the replica for level 0. func (r *Replica) calcPos(ctx context.Context) (pos ltx.Pos, err error) { info, err := r.MaxLTXFileInfo(ctx, 0) diff --git a/s3/leaser.go b/s3/leaser.go index bc9ae8c..31b8be8 100644 --- a/s3/leaser.go +++ b/s3/leaser.go @@ -53,8 +53,9 @@ type Leaser struct { SecretAccessKey string // S3 bucket information - Region string - Bucket string + Region string + Bucket string + // Path is the lease prefix in the bucket (global across DBs). Path string Endpoint string ForcePathStyle bool @@ -65,6 +66,7 @@ type Leaser struct { func NewLeaser() *Leaser { return &Leaser{ LeaseTimeout: litestream.DefaultLeaseTimeout, + Owner: defaultOwner(), logger: slog.Default().WithGroup("s3-leaser"), } } @@ -84,12 +86,23 @@ func (l *Leaser) Init(ctx context.Context) error { if l.Bucket == "" { return fmt.Errorf("s3 leaser: bucket name is required") } + if l.Owner == "" { + l.Owner = defaultOwner() + } + if l.Path == "" { + l.Path = "leases" + } else { + l.Path = strings.Trim(l.Path, "/") + if l.Path == "" { + l.Path = "leases" + } + } region := l.Region if region == "" { if l.Endpoint == "" { var err error - if region, err = findBucketRegion(ctx, l.Bucket); err != nil { + if region, err = l.findBucketRegion(ctx, l.Bucket); err != nil { return fmt.Errorf("s3 leaser: cannot lookup bucket region: %w", err) } } else { @@ -97,27 +110,7 @@ func (l *Leaser) Init(ctx context.Context) error { } } - httpClient := &http.Client{ - Timeout: 30 * time.Second, - } - - if l.SkipVerify { - httpClient.Transport = &http.Transport{ - Proxy: http.ProxyFromEnvironment, - DialContext: (&net.Dialer{ - Timeout: 30 * time.Second, - KeepAlive: 30 * time.Second, - }).DialContext, - ForceAttemptHTTP2: true, - MaxIdleConns: 100, - IdleConnTimeout: 90 * time.Second, - TLSHandshakeTimeout: 10 * time.Second, - ExpectContinueTimeout: 1 * time.Second, - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: true, - }, - } - } + httpClient := l.newHTTPClient() configOpts := []func(*config.LoadOptions) error{ config.WithRegion(region), @@ -143,9 +136,12 @@ func (l *Leaser) Init(ctx context.Context) error { }, } - if l.Endpoint != "" { + if endpoint, disableHTTPS := l.endpointWithScheme(); endpoint != "" { s3Opts = append(s3Opts, func(o *s3.Options) { - o.BaseEndpoint = aws.String(l.Endpoint) + o.BaseEndpoint = aws.String(endpoint) + if disableHTTPS { + o.EndpointOptions.DisableHTTPS = true + } o.RequestChecksumCalculation = aws.RequestChecksumCalculationWhenRequired }) } @@ -213,8 +209,16 @@ func (l *Leaser) acquireLease(ctx context.Context, prevEpoch int64) (*litestream if len(epochs) > 0 { epoch = epochs[len(epochs)-1] - // Check current lease only if not renewing or epoch doesn't match - if prevEpoch == 0 || epoch != prevEpoch { + if prevEpoch != 0 && epoch == prevEpoch { + lease, err := l.fetchLease(ctx, epoch) + if os.IsNotExist(err) { + // Lease was reaped, continue to acquire + } else if err != nil { + return nil, fmt.Errorf("fetch lease (epoch %d): %w", epoch, err) + } else if !lease.Expired() && lease.Owner != "" && l.Owner != "" && lease.Owner != l.Owner { + return nil, fmt.Errorf("cannot renew lease: owner mismatch (have %q, lease owner %q)", l.Owner, lease.Owner) + } + } else { if lease, err := l.fetchLease(ctx, epoch); os.IsNotExist(err) { // Lease was reaped, continue to acquire } else if err != nil { @@ -260,6 +264,9 @@ func (l *Leaser) ReleaseLease(ctx context.Context, epoch int64) error { } else if lease.Timeout <= 0 { return nil // already released } + if lease.Owner != "" && l.Owner != "" && lease.Owner != l.Owner { + return fmt.Errorf("release lease: owner mismatch (have %q, lease owner %q)", l.Owner, lease.Owner) + } lease.Timeout = 0 @@ -326,6 +333,13 @@ func (l *Leaser) createLease(ctx context.Context, epoch int64) (*litestream.Leas return nil, fmt.Errorf("put lease: %w", err) } + modTime, err := l.leaseModTime(ctx, epoch) + if err != nil { + _ = l.DeleteLease(ctx, epoch) + return nil, fmt.Errorf("get lease modtime: %w", err) + } + lease.ModTime = modTime + return lease, nil } @@ -353,6 +367,19 @@ func (l *Leaser) leaseKey(epoch int64) string { return fmt.Sprintf("%s/%016x%s", l.Path, epoch, LockFileExt) } +func (l *Leaser) leaseModTime(ctx context.Context, epoch int64) (time.Time, error) { + output, err := l.s3.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: aws.String(l.Bucket), + Key: aws.String(l.leaseKey(epoch)), + }) + if isNotExists(err) { + return time.Time{}, os.ErrNotExist + } else if err != nil { + return time.Time{}, fmt.Errorf("head lease: %w", err) + } + return aws.ToTime(output.LastModified), nil +} + // isPreconditionFailed checks if the error is a 412 Precondition Failed response. func isPreconditionFailed(err smithy.APIError) bool { code := err.ErrorCode() @@ -361,15 +388,38 @@ func isPreconditionFailed(err smithy.APIError) bool { var lockFileRegex = regexp.MustCompile(`^[0-9a-f]{16}\.lock$`) -// findBucketRegion looks up the AWS region for a bucket. -func findBucketRegion(ctx context.Context, bucket string) (string, error) { - cfg, err := config.LoadDefaultConfig(ctx) +func (l *Leaser) findBucketRegion(ctx context.Context, bucket string) (string, error) { + configOpts := []func(*config.LoadOptions) error{ + config.WithHTTPClient(l.newHTTPClient()), + } + + if l.AccessKeyID != "" && l.SecretAccessKey != "" { + configOpts = append(configOpts, config.WithCredentialsProvider( + credentials.NewStaticCredentialsProvider(l.AccessKeyID, l.SecretAccessKey, ""), + )) + } + + cfg, err := config.LoadDefaultConfig(ctx, configOpts...) if err != nil { - return "", fmt.Errorf("cannot load aws config for region lookup: %w", err) + return "", fmt.Errorf("s3 leaser: cannot load aws config for region lookup: %w", err) } cfg.Region = DefaultRegion - client := s3.NewFromConfig(cfg) + s3Opts := []func(*s3.Options){ + func(o *s3.Options) { + o.UsePathStyle = l.ForcePathStyle + }, + } + if endpoint, disableHTTPS := l.endpointWithScheme(); endpoint != "" { + s3Opts = append(s3Opts, func(o *s3.Options) { + o.BaseEndpoint = aws.String(endpoint) + if disableHTTPS { + o.EndpointOptions.DisableHTTPS = true + } + }) + } + + client := s3.NewFromConfig(cfg, s3Opts...) out, err := client.GetBucketLocation(ctx, &s3.GetBucketLocationInput{ Bucket: aws.String(bucket), @@ -383,3 +433,48 @@ func findBucketRegion(ctx context.Context, bucket string) (string, error) { } return string(out.LocationConstraint), nil } + +func (l *Leaser) endpointWithScheme() (string, bool) { + if l.Endpoint == "" { + return "", false + } + endpoint := l.Endpoint + if !strings.HasPrefix(endpoint, "http://") && !strings.HasPrefix(endpoint, "https://") { + endpoint = "https://" + endpoint + } + return endpoint, strings.HasPrefix(endpoint, "http://") +} + +func (l *Leaser) newHTTPClient() *http.Client { + httpClient := &http.Client{ + Timeout: 30 * time.Second, + } + + if l.SkipVerify { + httpClient.Transport = &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + ForceAttemptHTTP2: true, + MaxIdleConns: 100, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + }, + } + } + + return httpClient +} + +func defaultOwner() string { + host, err := os.Hostname() + if err != nil || host == "" { + host = "unknown" + } + return fmt.Sprintf("%s:%d", host, os.Getpid()) +} diff --git a/s3/leaser_test.go b/s3/leaser_test.go index 2dbe5d8..6dc2a6a 100644 --- a/s3/leaser_test.go +++ b/s3/leaser_test.go @@ -62,6 +62,10 @@ func TestLeaser_AcquireLease_Fresh(t *testing.T) { w.Header().Set("ETag", `"abc123"`) w.WriteHeader(http.StatusOK) + case r.Method == http.MethodHead && strings.Contains(r.URL.Path, ".lock"): + w.Header().Set("Last-Modified", time.Now().UTC().Format(http.TimeFormat)) + w.WriteHeader(http.StatusOK) + default: t.Errorf("unexpected request: %s %s", r.Method, r.URL.Path) w.WriteHeader(http.StatusBadRequest) @@ -77,6 +81,9 @@ func TestLeaser_AcquireLease_Fresh(t *testing.T) { l.ForcePathStyle = true l.AccessKeyID = "test-key" l.SecretAccessKey = "test-secret" + l.Owner = "my-instance" + l.Owner = "my-instance" + l.Owner = "my-instance" l.Owner = "test-owner" ctx := context.Background() @@ -152,6 +159,7 @@ func TestLeaser_AcquireLease_ExistingActive(t *testing.T) { l.ForcePathStyle = true l.AccessKeyID = "test-key" l.SecretAccessKey = "test-secret" + l.Owner = "my-instance" ctx := context.Background() _, err := l.AcquireLease(ctx) @@ -205,6 +213,10 @@ func TestLeaser_AcquireLease_ExpiredLease(t *testing.T) { w.Header().Set("ETag", `"abc123"`) w.WriteHeader(http.StatusOK) + case r.Method == http.MethodHead && strings.Contains(r.URL.Path, ".lock"): + w.Header().Set("Last-Modified", time.Now().UTC().Format(http.TimeFormat)) + w.WriteHeader(http.StatusOK) + case r.Method == http.MethodDelete: // Background reaping of old lease w.WriteHeader(http.StatusNoContent) @@ -224,6 +236,7 @@ func TestLeaser_AcquireLease_ExpiredLease(t *testing.T) { l.ForcePathStyle = true l.AccessKeyID = "test-key" l.SecretAccessKey = "test-secret" + l.Owner = "my-instance" ctx := context.Background() lease, err := l.AcquireLease(ctx) @@ -311,6 +324,12 @@ func TestLeaser_RenewLease(t *testing.T) { var putPath string var deleteReceived atomic.Bool + existingLease := &litestream.Lease{ + Epoch: 5, + Timeout: 30 * time.Second, + Owner: "my-instance", + } + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch { case r.Method == http.MethodGet && strings.Contains(r.URL.RawQuery, "list-type=2"): @@ -332,6 +351,17 @@ func TestLeaser_RenewLease(t *testing.T) { w.Header().Set("ETag", `"abc123"`) w.WriteHeader(http.StatusOK) + case r.Method == http.MethodGet && strings.Contains(r.URL.Path, ".lock"): + body, _ := json.Marshal(existingLease) + w.Header().Set("Content-Type", "application/json") + w.Header().Set("Last-Modified", time.Now().UTC().Format(http.TimeFormat)) + w.WriteHeader(http.StatusOK) + _, _ = w.Write(body) + + case r.Method == http.MethodHead && strings.Contains(r.URL.Path, ".lock"): + w.Header().Set("Last-Modified", time.Now().UTC().Format(http.TimeFormat)) + w.WriteHeader(http.StatusOK) + case r.Method == http.MethodDelete: deleteReceived.Store(true) w.WriteHeader(http.StatusNoContent) @@ -351,12 +381,7 @@ func TestLeaser_RenewLease(t *testing.T) { l.ForcePathStyle = true l.AccessKeyID = "test-key" l.SecretAccessKey = "test-secret" - - existingLease := &litestream.Lease{ - Epoch: 5, - Timeout: 30 * time.Second, - Owner: "my-instance", - } + l.Owner = "my-instance" ctx := context.Background() newLease, err := l.RenewLease(ctx, existingLease) @@ -416,6 +441,7 @@ func TestLeaser_ReleaseLease(t *testing.T) { l.ForcePathStyle = true l.AccessKeyID = "test-key" l.SecretAccessKey = "test-secret" + l.Owner = "my-instance" ctx := context.Background() err := l.ReleaseLease(ctx, 3) @@ -468,6 +494,7 @@ func TestLeaser_ReleaseLease_AlreadyReleased(t *testing.T) { l.ForcePathStyle = true l.AccessKeyID = "test-key" l.SecretAccessKey = "test-secret" + l.Owner = "my-instance" ctx := context.Background() err := l.ReleaseLease(ctx, 3) @@ -476,6 +503,49 @@ func TestLeaser_ReleaseLease_AlreadyReleased(t *testing.T) { } } +func TestLeaser_ReleaseLease_OwnerMismatch(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case r.Method == http.MethodGet && strings.Contains(r.URL.Path, ".lock"): + activeLease := &litestream.Lease{ + Epoch: 3, + Timeout: 30 * time.Second, + Owner: "other-instance", + } + body, _ := json.Marshal(activeLease) + w.Header().Set("Content-Type", "application/json") + w.Header().Set("Last-Modified", time.Now().Format(http.TimeFormat)) + w.WriteHeader(http.StatusOK) + _, _ = w.Write(body) + + case r.Method == http.MethodPut: + t.Error("should not PUT when owner mismatches") + w.WriteHeader(http.StatusOK) + + default: + t.Errorf("unexpected request: %s %s", r.Method, r.URL.Path) + w.WriteHeader(http.StatusBadRequest) + } + })) + defer server.Close() + + l := NewLeaser() + l.Bucket = "test-bucket" + l.Path = "leases" + l.Region = "us-east-1" + l.Endpoint = server.URL + l.ForcePathStyle = true + l.AccessKeyID = "test-key" + l.SecretAccessKey = "test-secret" + l.Owner = "my-instance" + + ctx := context.Background() + err := l.ReleaseLease(ctx, 3) + if err == nil { + t.Fatal("ReleaseLease() expected owner mismatch error") + } +} + func TestLeaser_ReleaseLease_NotFound(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.Method == http.MethodGet && strings.Contains(r.URL.Path, ".lock") { @@ -646,6 +716,10 @@ func TestLeaser_ConcurrentAcquire(t *testing.T) { w.WriteHeader(http.StatusOK) _, _ = w.Write(body) + case r.Method == http.MethodHead && strings.Contains(r.URL.Path, ".lock"): + w.Header().Set("Last-Modified", time.Now().UTC().Format(http.TimeFormat)) + w.WriteHeader(http.StatusOK) + default: w.WriteHeader(http.StatusOK) } diff --git a/tests/integration/README.md b/tests/integration/README.md index b349cc6..478e59d 100644 --- a/tests/integration/README.md +++ b/tests/integration/README.md @@ -129,6 +129,27 @@ go test -v -tags=integration ./tests/integration/... -run=TestRapidCheckpoints # 1GB boundary test go test -v -tags=integration ./tests/integration/... -run=Test1GBBoundary + +# S3 lease failover test (requires Docker) +go test -v -tags=integration ./tests/integration/... -run=TestS3LeaseFailover + +# S3 lease single-writer test (requires Docker) +go test -v -tags=integration ./tests/integration/... -run=TestS3LeaseSingleWriter + +# S3 lease pause/resume failover test (requires Docker) +go test -v -tags=integration ./tests/integration/... -run=TestS3LeasePauseLeader + +# S3 lease renewal partition test (requires Docker) +go test -v -tags=integration ./tests/integration/... -run=TestS3LeaseRenewalPartition + +# S3 lease clock skew test (requires Docker) +go test -v -tags=integration ./tests/integration/... -run=TestS3LeaseClockSkew + +# S3 lease rapid restart churn test (requires Docker) +go test -v -tags=integration ./tests/integration/... -run=TestS3LeaseRapidRestart + +# S3 lease stress churn test (requires Docker) +go test -v -tags=integration ./tests/integration/... -run=TestS3LeaseStressChurn ``` ### Short Mode @@ -151,6 +172,15 @@ Core functionality tests that run in seconds to minutes: - `TestDatabaseIntegrity` - Complex schema and data integrity - `TestDatabaseDeletion` - Source database deletion during replication +### S3 Leader Election Tests + +- `TestS3LeaseFailover` - MinIO-backed lease failover with crash recovery +- `TestS3LeaseSingleWriter` - Concurrent writers, single lease epoch enforcement +- `TestS3LeasePauseLeader` - Leader pause/resume with lease takeover +- `TestS3LeaseRenewalPartition` - Lease renewal during S3 outage proxy +- `TestS3LeaseClockSkew` - Lease validity skew delaying failover +- `TestS3LeaseRapidRestart` - Rapid leader restart churn with monotonic epochs + ### Concurrent Tests Stress and concurrency tests: