fix(s3): improve S3-compatible provider compatibility (#899)

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Cory LaNou
2025-12-16 14:32:22 -06:00
committed by GitHub
parent feb65fb2a1
commit e2cbaf452f
7 changed files with 429 additions and 22 deletions

View File

@@ -1261,10 +1261,38 @@ func NewS3ReplicaClientFromConfig(c *ReplicaConfig, _ *litestream.Replica) (_ *s
return nil, fmt.Errorf("bucket required for s3 replica")
}
// Detect S3-compatible provider endpoints for applying appropriate defaults.
// These providers require specific settings to work correctly with AWS SDK v2.
isTigris := litestream.IsTigrisEndpoint(endpoint)
if !isTigris && !endpointWasSet && litestream.IsTigrisEndpoint(c.Endpoint) {
isTigris = true
}
isDigitalOcean := litestream.IsDigitalOceanEndpoint(endpoint)
isBackblaze := litestream.IsBackblazeEndpoint(endpoint)
isFilebase := litestream.IsFilebaseEndpoint(endpoint)
isScaleway := litestream.IsScalewayEndpoint(endpoint)
isMinIO := litestream.IsMinIOEndpoint(endpoint)
// Track if forcePathStyle was explicitly set by user (config or URL query param).
forcePathStyleSet := c.ForcePathStyle != nil
// Apply provider-specific defaults for S3-compatible providers.
// These settings ensure compatibility with each provider's S3 implementation.
if isTigris {
// Tigris: requires signed payloads, no MD5
signSetting.ApplyDefault(true)
requireSetting.ApplyDefault(false)
}
if isDigitalOcean || isBackblaze || isFilebase || isScaleway || isMinIO {
// All these providers require signed payloads (don't support UNSIGNED-PAYLOAD)
signSetting.ApplyDefault(true)
}
if !forcePathStyleSet {
// Filebase, Backblaze B2, and MinIO require path-style URLs
if isFilebase || isBackblaze || isMinIO {
forcePathStyle = true
}
}
// Build replica.
client := s3.NewReplicaClient()
@@ -1276,10 +1304,6 @@ func NewS3ReplicaClientFromConfig(c *ReplicaConfig, _ *litestream.Replica) (_ *s
client.Endpoint = endpoint
client.ForcePathStyle = forcePathStyle
client.SkipVerify = skipVerify
if isTigris {
signSetting.ApplyDefault(true)
requireSetting.ApplyDefault(false)
}
client.SignPayload = signSetting.value
client.RequireContentMD5 = requireSetting.value

15
docker-compose.test.yml Normal file
View File

@@ -0,0 +1,15 @@
services:
minio:
image: minio/minio:latest
ports:
- "9000:9000"
- "9001:9001"
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
command: server /data --console-address ":9001"
healthcheck:
test: ["CMD", "mc", "ready", "local"]
interval: 5s
timeout: 5s
retries: 5

55
etc/run-s3-docker-tests.sh Executable file
View File

@@ -0,0 +1,55 @@
#!/bin/bash
set -e
# Script to run S3 integration tests against a local MinIO container.
# This provides a more realistic test environment than moto for testing
# S3-compatible provider compatibility.
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PROJECT_ROOT="$(dirname "$SCRIPT_DIR")"
cd "$PROJECT_ROOT"
echo "Starting MinIO container..."
docker compose -f docker-compose.test.yml up -d
# Wait for MinIO to be ready
echo "Waiting for MinIO to be ready..."
for i in {1..30}; do
if docker compose -f docker-compose.test.yml exec -T minio mc ready local 2>/dev/null; then
echo "MinIO is ready"
break
fi
if [ $i -eq 30 ]; then
echo "MinIO failed to start"
docker compose -f docker-compose.test.yml logs minio
docker compose -f docker-compose.test.yml down
exit 1
fi
sleep 1
done
# Create test bucket using mc client inside the container
echo "Creating test bucket..."
docker compose -f docker-compose.test.yml exec -T minio mc alias set local http://localhost:9000 minioadmin minioadmin
docker compose -f docker-compose.test.yml exec -T minio mc mb local/test-bucket --ignore-existing
# Set up cleanup trap
cleanup() {
echo "Cleaning up..."
docker compose -f docker-compose.test.yml down
}
trap cleanup EXIT
# Export environment variables for the S3 integration tests
export LITESTREAM_S3_ACCESS_KEY_ID=minioadmin
export LITESTREAM_S3_SECRET_ACCESS_KEY=minioadmin
export LITESTREAM_S3_BUCKET=test-bucket
export LITESTREAM_S3_ENDPOINT=http://localhost:9000
export LITESTREAM_S3_FORCE_PATH_STYLE=true
export LITESTREAM_S3_REGION=us-east-1
echo "Running S3 integration tests against MinIO..."
go test -v ./replica_client_test.go -integration -replica-clients=s3 "$@"
echo "Tests completed successfully!"

View File

@@ -189,16 +189,93 @@ func BoolQueryValue(query url.Values, keys ...string) (value bool, ok bool) {
// IsTigrisEndpoint returns true if the endpoint is the Tigris object storage service.
func IsTigrisEndpoint(endpoint string) bool {
host := extractEndpointHost(endpoint)
return host == "fly.storage.tigris.dev"
}
// IsDigitalOceanEndpoint returns true if the endpoint is Digital Ocean Spaces.
func IsDigitalOceanEndpoint(endpoint string) bool {
host := extractEndpointHost(endpoint)
if host == "" {
return false
}
return strings.HasSuffix(host, ".digitaloceanspaces.com")
}
// IsBackblazeEndpoint returns true if the endpoint is Backblaze B2.
func IsBackblazeEndpoint(endpoint string) bool {
host := extractEndpointHost(endpoint)
if host == "" {
return false
}
return strings.HasSuffix(host, ".backblazeb2.com")
}
// IsFilebaseEndpoint returns true if the endpoint is Filebase.
func IsFilebaseEndpoint(endpoint string) bool {
host := extractEndpointHost(endpoint)
if host == "" {
return false
}
return host == "s3.filebase.com"
}
// IsScalewayEndpoint returns true if the endpoint is Scaleway Object Storage.
func IsScalewayEndpoint(endpoint string) bool {
host := extractEndpointHost(endpoint)
if host == "" {
return false
}
return strings.HasSuffix(host, ".scw.cloud")
}
// IsCloudflareR2Endpoint returns true if the endpoint is Cloudflare R2.
func IsCloudflareR2Endpoint(endpoint string) bool {
host := extractEndpointHost(endpoint)
if host == "" {
return false
}
return strings.HasSuffix(host, ".r2.cloudflarestorage.com")
}
// IsMinIOEndpoint returns true if the endpoint appears to be MinIO or similar
// (a custom endpoint with a port number that is not a known cloud provider).
func IsMinIOEndpoint(endpoint string) bool {
host := extractEndpointHost(endpoint)
if host == "" {
return false
}
// MinIO typically uses host:port format without .com domain
// Check for port number in the host
if !strings.Contains(host, ":") {
return false
}
// Exclude known cloud providers
if strings.Contains(host, ".amazonaws.com") ||
strings.Contains(host, ".digitaloceanspaces.com") ||
strings.Contains(host, ".backblazeb2.com") ||
strings.Contains(host, ".filebase.com") ||
strings.Contains(host, ".scw.cloud") ||
strings.Contains(host, ".r2.cloudflarestorage.com") ||
strings.Contains(host, "tigris.dev") {
return false
}
return true
}
// extractEndpointHost extracts the host from an endpoint URL or returns the
// endpoint as-is if it's not a full URL.
func extractEndpointHost(endpoint string) string {
endpoint = strings.TrimSpace(strings.ToLower(endpoint))
if endpoint == "" {
return false
return ""
}
if strings.HasPrefix(endpoint, "http://") || strings.HasPrefix(endpoint, "https://") {
if u, err := url.Parse(endpoint); err == nil && u.Host != "" {
endpoint = u.Host
return u.Host
}
}
return endpoint == "fly.storage.tigris.dev"
return endpoint
}
// IsURL returns true if s appears to be a URL (has a scheme).

View File

@@ -630,3 +630,137 @@ func TestCleanReplicaURLPath(t *testing.T) {
})
}
}
func TestIsDigitalOceanEndpoint(t *testing.T) {
tests := []struct {
endpoint string
expected bool
}{
{"https://sfo3.digitaloceanspaces.com", true},
{"https://nyc3.digitaloceanspaces.com", true},
{"sfo3.digitaloceanspaces.com", true},
{"https://s3.amazonaws.com", false},
{"https://s3.filebase.com", false},
{"", false},
}
for _, tt := range tests {
t.Run(tt.endpoint, func(t *testing.T) {
got := litestream.IsDigitalOceanEndpoint(tt.endpoint)
if got != tt.expected {
t.Errorf("IsDigitalOceanEndpoint(%q) = %v, want %v", tt.endpoint, got, tt.expected)
}
})
}
}
func TestIsBackblazeEndpoint(t *testing.T) {
tests := []struct {
endpoint string
expected bool
}{
{"https://s3.us-west-002.backblazeb2.com", true},
{"https://s3.eu-central-003.backblazeb2.com", true},
{"s3.us-west-002.backblazeb2.com", true},
{"https://s3.amazonaws.com", false},
{"https://s3.filebase.com", false},
{"", false},
}
for _, tt := range tests {
t.Run(tt.endpoint, func(t *testing.T) {
got := litestream.IsBackblazeEndpoint(tt.endpoint)
if got != tt.expected {
t.Errorf("IsBackblazeEndpoint(%q) = %v, want %v", tt.endpoint, got, tt.expected)
}
})
}
}
func TestIsFilebaseEndpoint(t *testing.T) {
tests := []struct {
endpoint string
expected bool
}{
{"https://s3.filebase.com", true},
{"http://s3.filebase.com", true},
{"s3.filebase.com", true},
{"https://s3.amazonaws.com", false},
{"https://sfo3.digitaloceanspaces.com", false},
{"", false},
}
for _, tt := range tests {
t.Run(tt.endpoint, func(t *testing.T) {
got := litestream.IsFilebaseEndpoint(tt.endpoint)
if got != tt.expected {
t.Errorf("IsFilebaseEndpoint(%q) = %v, want %v", tt.endpoint, got, tt.expected)
}
})
}
}
func TestIsScalewayEndpoint(t *testing.T) {
tests := []struct {
endpoint string
expected bool
}{
{"https://s3.fr-par.scw.cloud", true},
{"https://s3.nl-ams.scw.cloud", true},
{"s3.fr-par.scw.cloud", true},
{"https://s3.amazonaws.com", false},
{"https://s3.filebase.com", false},
{"", false},
}
for _, tt := range tests {
t.Run(tt.endpoint, func(t *testing.T) {
got := litestream.IsScalewayEndpoint(tt.endpoint)
if got != tt.expected {
t.Errorf("IsScalewayEndpoint(%q) = %v, want %v", tt.endpoint, got, tt.expected)
}
})
}
}
func TestIsCloudflareR2Endpoint(t *testing.T) {
tests := []struct {
endpoint string
expected bool
}{
{"https://abcdef123456.r2.cloudflarestorage.com", true},
{"https://account-id.r2.cloudflarestorage.com", true},
{"abcdef123456.r2.cloudflarestorage.com", true},
{"https://s3.amazonaws.com", false},
{"https://s3.filebase.com", false},
{"", false},
}
for _, tt := range tests {
t.Run(tt.endpoint, func(t *testing.T) {
got := litestream.IsCloudflareR2Endpoint(tt.endpoint)
if got != tt.expected {
t.Errorf("IsCloudflareR2Endpoint(%q) = %v, want %v", tt.endpoint, got, tt.expected)
}
})
}
}
func TestIsMinIOEndpoint(t *testing.T) {
tests := []struct {
endpoint string
expected bool
}{
{"http://localhost:9000", true},
{"http://192.168.1.100:9000", true},
{"minio.local:9000", true},
{"https://s3.amazonaws.com", false},
{"https://s3.filebase.com", false},
{"https://sfo3.digitaloceanspaces.com", false},
{"s3.filebase.com", false}, // No port, not MinIO
{"", false},
}
for _, tt := range tests {
t.Run(tt.endpoint, func(t *testing.T) {
got := litestream.IsMinIOEndpoint(tt.endpoint)
if got != tt.expected {
t.Errorf("IsMinIOEndpoint(%q) = %v, want %v", tt.endpoint, got, tt.expected)
}
})
}
}

View File

@@ -154,8 +154,17 @@ func NewReplicaClientFromURL(scheme, host, urlPath string, query url.Values, use
return nil, fmt.Errorf("bucket required for s3 replica URL")
}
// Check for Tigris endpoint
// Detect S3-compatible provider endpoints for applying appropriate defaults.
isTigris := litestream.IsTigrisEndpoint(endpoint)
isDigitalOcean := litestream.IsDigitalOceanEndpoint(endpoint)
isBackblaze := litestream.IsBackblazeEndpoint(endpoint)
isFilebase := litestream.IsFilebaseEndpoint(endpoint)
isScaleway := litestream.IsScalewayEndpoint(endpoint)
isCloudflareR2 := litestream.IsCloudflareR2Endpoint(endpoint)
isMinIO := litestream.IsMinIOEndpoint(endpoint)
// Track if forcePathStyle was explicitly set via query parameter.
forcePathStyleSet := query.Get("forcePathStyle") != ""
// Read authentication from environment variables
if v := os.Getenv("AWS_ACCESS_KEY_ID"); v != "" {
@@ -169,16 +178,9 @@ func NewReplicaClientFromURL(scheme, host, urlPath string, query url.Values, use
client.SecretAccessKey = v
}
// Configure client
client.Bucket = bucket
client.Path = urlPath
client.Region = region
client.Endpoint = endpoint
client.ForcePathStyle = forcePathStyle
client.SkipVerify = skipVerify
// Apply Tigris defaults
// Apply provider-specific defaults for S3-compatible providers.
if isTigris {
// Tigris: requires signed payloads, no MD5
if !signPayloadSet {
signPayload, signPayloadSet = true, true
}
@@ -186,6 +188,26 @@ func NewReplicaClientFromURL(scheme, host, urlPath string, query url.Values, use
requireMD5, requireMD5Set = false, true
}
}
if isDigitalOcean || isBackblaze || isFilebase || isScaleway || isCloudflareR2 || isMinIO {
// All these providers require signed payloads (don't support UNSIGNED-PAYLOAD)
if !signPayloadSet {
signPayload, signPayloadSet = true, true
}
}
if !forcePathStyleSet {
// Filebase, Backblaze B2, and MinIO require path-style URLs
if isFilebase || isBackblaze || isMinIO {
forcePathStyle = true
}
}
// Configure client
client.Bucket = bucket
client.Path = urlPath
client.Region = region
client.Endpoint = endpoint
client.ForcePathStyle = forcePathStyle
client.SkipVerify = skipVerify
if signPayloadSet {
client.SignPayload = signPayload
@@ -552,12 +574,16 @@ func (c *ReplicaClient) middlewareOption() func(*middleware.Stack) error {
if err := v4.AddContentSHA256HeaderMiddleware(stack); err != nil {
return err
}
}
// Disable AWS SDK v2's trailing checksum middleware which uses
// aws-chunked encoding that is incompatible with UNSIGNED-PAYLOAD.
// AWS S3 rejects requests with: "aws-chunked encoding is not supported
// when x-amz-content-sha256 UNSIGNED-PAYLOAD is supplied."
// aws-chunked encoding. This is required for:
// 1. UNSIGNED-PAYLOAD requests (aws-chunked + UNSIGNED-PAYLOAD is rejected by AWS)
// 2. S3-compatible providers (Filebase, MinIO, Backblaze B2, etc.) that don't
// support aws-chunked encoding at all
// See: https://github.com/aws/aws-sdk-go-v2/discussions/2960
// See: https://github.com/benbjohnson/litestream/issues/895
if !c.SignPayload || c.Endpoint != "" {
stack.Finalize.Remove("addInputChecksumTrailer")
}

View File

@@ -219,6 +219,82 @@ func TestReplicaClient_UnsignedPayload_NoChunkedEncoding(t *testing.T) {
}
}
// TestReplicaClient_SignedPayload_CustomEndpoint_NoChunkedEncoding verifies that
// aws-chunked encoding is disabled for custom endpoints even when SignPayload=true.
// This is necessary for S3-compatible providers (Filebase, MinIO, Backblaze B2, etc.)
// that don't support aws-chunked encoding at all. See issue #895.
func TestReplicaClient_SignedPayload_CustomEndpoint_NoChunkedEncoding(t *testing.T) {
data := mustLTX(t)
headers := make(chan http.Header, 1)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
_, _ = io.Copy(io.Discard, r.Body)
if r.Method == http.MethodPut {
select {
case headers <- r.Header.Clone():
default:
}
w.Header().Set("ETag", `"test-etag"`)
w.WriteHeader(http.StatusOK)
return
}
w.WriteHeader(http.StatusOK)
}))
defer server.Close()
client := NewReplicaClient()
client.Bucket = "test-bucket"
client.Path = "replica"
client.Region = "us-east-1"
client.Endpoint = server.URL // Custom endpoint (non-AWS)
client.ForcePathStyle = true
client.AccessKeyID = "test-access-key"
client.SecretAccessKey = "test-secret-key"
client.SignPayload = true // Signed payload, but still using custom endpoint
ctx := context.Background()
if err := client.Init(ctx); err != nil {
t.Fatalf("Init() error: %v", err)
}
if _, err := client.WriteLTXFile(ctx, 0, 2, 2, bytes.NewReader(data)); err != nil {
t.Fatalf("WriteLTXFile() error: %v", err)
}
select {
case hdr := <-headers:
// With SignPayload=true, we expect an actual SHA256 hash (not UNSIGNED-PAYLOAD)
sha256Header := hdr.Get("x-amz-content-sha256")
if sha256Header == "" {
t.Error("x-amz-content-sha256 header should be set")
}
if sha256Header == "UNSIGNED-PAYLOAD" {
t.Error("x-amz-content-sha256 should be actual hash, not UNSIGNED-PAYLOAD, when SignPayload=true")
}
// But aws-chunked encoding should still be disabled for custom endpoints
contentEnc := hdr.Get("Content-Encoding")
if strings.Contains(contentEnc, "aws-chunked") {
t.Errorf("Content-Encoding contains aws-chunked: %q; aws-chunked is not supported by S3-compatible providers", contentEnc)
}
transferEnc := hdr.Get("Transfer-Encoding")
if strings.Contains(transferEnc, "aws-chunked") {
t.Errorf("Transfer-Encoding contains aws-chunked: %q; aws-chunked is not supported by S3-compatible providers", transferEnc)
}
decoded := hdr.Get("X-Amz-Decoded-Content-Length")
if decoded != "" {
t.Errorf("X-Amz-Decoded-Content-Length = %q; this header indicates aws-chunked encoding which is not supported by S3-compatible providers", decoded)
}
case <-time.After(time.Second):
t.Fatal("timeout waiting for PUT request")
}
}
func mustLTX(t *testing.T) []byte {
t.Helper()