feat(oss): add native Alibaba Cloud OSS storage backend (#862)

Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
Cory LaNou
2025-12-04 08:25:38 -06:00
committed by GitHub
parent f5b166ab21
commit 130fa20432
10 changed files with 1100 additions and 3 deletions

View File

@@ -23,7 +23,7 @@ Litestream is a **disaster recovery tool for SQLite** that runs as a background
- **Multi-level Compaction**: Hierarchical compaction keeps storage efficient (30s → 5m → 1h → snapshots)
- **Single Replica Constraint**: Each database is replicated to exactly one remote destination
- **Pure Go Build**: Uses `modernc.org/sqlite`, so no CGO dependency for the main binary
- **Optional NATS JetStream Support**: Additional replica backend alongside S3/GCS/ABS/File/SFTP
- **Optional NATS JetStream Support**: Additional replica backend alongside S3/GCS/ABS/OSS/File/SFTP
- **Snapshot Compatibility**: Only LTX-based backups are supported—keep legacy v0.3.x binaries to restore old WAL snapshots
**Key Design Principles:**
@@ -728,7 +728,7 @@ if offset < 0 {
```bash
# Test eventual consistency
go test -v ./replica_client_test.go -integration [s3|gcs|abs|sftp]
go test -v ./replica_client_test.go -integration [s3|gcs|abs|oss|sftp]
# Test partial reads
# (Example) add targeted partial-read tests in your backend package

View File

@@ -86,6 +86,7 @@ CGO_ENABLED=1 go build -tags vfs -o bin/litestream-vfs ./cmd/litestream-vfs # T
go test -v ./replica_client_test.go -integration s3
go test -v ./replica_client_test.go -integration gcs
go test -v ./replica_client_test.go -integration abs
go test -v ./replica_client_test.go -integration oss
go test -v ./replica_client_test.go -integration sftp
```
@@ -112,7 +113,7 @@ pre-commit run --all-files
**Replica (`replica.go`)**: Connects a database to replication destinations via ReplicaClient interface. Manages periodic synchronization and maintains replication position.
**ReplicaClient Interface** (`replica_client.go`): Abstraction for different storage backends (S3, GCS, Azure Blob Storage, SFTP, file system, NATS). Each implementation handles snapshot/WAL segment upload and restoration. The `LTXFiles` method includes a `useMetadata` parameter: when true, it fetches accurate timestamps from backend metadata (required for point-in-time restores); when false, it uses fast timestamps for normal operations. During compaction, the system preserves the earliest CreatedAt timestamp from source files to maintain temporal granularity for restoration.
**ReplicaClient Interface** (`replica_client.go`): Abstraction for different storage backends (S3, GCS, Azure Blob Storage, OSS, SFTP, file system, NATS). Each implementation handles snapshot/WAL segment upload and restoration. The `LTXFiles` method includes a `useMetadata` parameter: when true, it fetches accurate timestamps from backend metadata (required for point-in-time restores); when false, it uses fast timestamps for normal operations. During compaction, the system preserves the earliest CreatedAt timestamp from source files to maintain temporal granularity for restoration.
**WAL Processing**: The system monitors SQLite WAL files for changes, segments them into LTX format files, and replicates these segments to configured destinations. Uses SQLite checksums for integrity verification.
@@ -121,6 +122,7 @@ pre-commit run --all-files
- **S3** (`s3/replica_client.go`): AWS S3 and compatible storage
- **GCS** (`gs/replica_client.go`): Google Cloud Storage
- **ABS** (`abs/replica_client.go`): Azure Blob Storage
- **OSS** (`oss/replica_client.go`): Alibaba Cloud Object Storage Service
- **SFTP** (`sftp/replica_client.go`): SSH File Transfer Protocol
- **File** (`file/replica_client.go`): Local file system replication
- **NATS** (`nats/replica_client.go`): NATS JetStream object storage

View File

@@ -29,6 +29,7 @@ import (
"github.com/benbjohnson/litestream/gs"
"github.com/benbjohnson/litestream/internal"
"github.com/benbjohnson/litestream/nats"
"github.com/benbjohnson/litestream/oss"
"github.com/benbjohnson/litestream/s3"
"github.com/benbjohnson/litestream/sftp"
"github.com/benbjohnson/litestream/webdav"
@@ -1059,6 +1060,10 @@ func NewReplicaFromConfig(c *ReplicaConfig, db *litestream.DB) (_ *litestream.Re
if r.Client, err = newNATSReplicaClientFromConfig(c, r); err != nil {
return nil, err
}
case "oss":
if r.Client, err = newOSSReplicaClientFromConfig(c, r); err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("unknown replica type in config: %q", c.Type)
}
@@ -1518,6 +1523,69 @@ func newNATSReplicaClientFromConfig(c *ReplicaConfig, _ *litestream.Replica) (_
return client, nil
}
// newOSSReplicaClientFromConfig returns a new instance of oss.ReplicaClient built from config.
func newOSSReplicaClientFromConfig(c *ReplicaConfig, _ *litestream.Replica) (_ *oss.ReplicaClient, err error) {
// Ensure URL & constituent parts are not both specified.
if c.URL != "" && c.Path != "" {
return nil, fmt.Errorf("cannot specify url & path for oss replica")
} else if c.URL != "" && c.Bucket != "" {
return nil, fmt.Errorf("cannot specify url & bucket for oss replica")
}
bucket, configPath := c.Bucket, c.Path
region, endpoint := c.Region, c.Endpoint
// Apply settings from URL, if specified.
if c.URL != "" {
_, host, upath, err := ParseReplicaURL(c.URL)
if err != nil {
return nil, err
}
var (
ubucket string
uregion string
)
ubucket, uregion, _ = oss.ParseHost(host)
// Only apply URL parts to fields that have not been overridden.
if configPath == "" {
configPath = upath
}
if bucket == "" {
bucket = ubucket
}
if region == "" {
region = uregion
}
}
// Ensure required settings are set.
if bucket == "" {
return nil, fmt.Errorf("bucket required for oss replica")
}
// Build replica client.
client := oss.NewReplicaClient()
client.AccessKeyID = c.AccessKeyID
client.AccessKeySecret = c.SecretAccessKey
client.Bucket = bucket
client.Path = configPath
client.Region = region
client.Endpoint = endpoint
// Apply upload configuration if specified.
if c.PartSize != nil {
client.PartSize = int64(*c.PartSize)
}
if c.Concurrency != nil {
client.Concurrency = *c.Concurrency
}
return client, nil
}
// applyLitestreamEnv copies "LITESTREAM" prefixed environment variables to
// their AWS counterparts as the "AWS" prefix can be confusing when using a
// non-AWS S3-compatible service.

View File

@@ -20,6 +20,7 @@ import (
"github.com/benbjohnson/litestream/file"
"github.com/benbjohnson/litestream/gs"
"github.com/benbjohnson/litestream/nats"
"github.com/benbjohnson/litestream/oss"
"github.com/benbjohnson/litestream/s3"
"github.com/benbjohnson/litestream/sftp"
)
@@ -183,6 +184,8 @@ func (c *ReplicateCommand) Run(ctx context.Context) (err error) {
slogWith.Info("replicating to", "host", client.Host, "user", client.User, "path", client.Path)
case *nats.ReplicaClient:
slogWith.Info("replicating to", "bucket", client.BucketName, "url", client.URL)
case *oss.ReplicaClient:
slogWith.Info("replicating to", "bucket", client.Bucket, "path", client.Path, "region", client.Region)
default:
slogWith.Info("replicating to")
}

View File

@@ -37,6 +37,7 @@ graph TB
S3[s3/replica_client.go]
GCS[gs/replica_client.go]
ABS[abs/replica_client.go]
OSS[oss/replica_client.go]
File[file/replica_client.go]
SFTP[sftp/replica_client.go]
NATS[nats/replica_client.go]
@@ -54,6 +55,7 @@ graph TB
RC --> S3
RC --> GCS
RC --> ABS
RC --> OSS
RC --> File
RC --> SFTP
RC --> NATS
@@ -61,6 +63,7 @@ graph TB
S3 --> Cloud
GCS --> Cloud
ABS --> Cloud
OSS --> Cloud
```
### Layer Responsibilities

2
go.mod
View File

@@ -52,6 +52,8 @@ require (
modernc.org/memory v1.11.0 // indirect
)
require github.com/aliyun/alibabacloud-oss-go-sdk-v2 v1.3.0
require (
cloud.google.com/go v0.111.0 // indirect
cloud.google.com/go/compute v1.23.3 // indirect

2
go.sum
View File

@@ -36,6 +36,8 @@ github.com/MadAppGang/httplog v1.3.0 h1:1XU54TO8kiqTeO+7oZLKAM3RP/cJ7SadzslRcKsp
github.com/MadAppGang/httplog v1.3.0/go.mod h1:gpYEdkjh/Cda6YxtDy4AB7KY+fR7mb3SqBZw74A5hJ4=
github.com/TylerBrock/colorjson v0.0.0-20200706003622-8a50f05110d2 h1:ZBbLwSJqkHBuFDA6DUhhse0IGJ7T5bemHyNILUjvOq4=
github.com/TylerBrock/colorjson v0.0.0-20200706003622-8a50f05110d2/go.mod h1:VSw57q4QFiWDbRnjdX8Cb3Ow0SFncRw+bA/ofY6Q83w=
github.com/aliyun/alibabacloud-oss-go-sdk-v2 v1.3.0 h1:wQlqotpyjYPjJz+Noh5bRu7Snmydk8SKC5Z6u1CR20Y=
github.com/aliyun/alibabacloud-oss-go-sdk-v2 v1.3.0/go.mod h1:FTzydeQVmR24FI0D6XWUOMKckjXehM/jgMn1xC+DA9M=
github.com/aws/aws-sdk-go-v2 v1.37.1 h1:SMUxeNz3Z6nqGsXv0JuJXc8w5YMtrQMuIBmDx//bBDY=
github.com/aws/aws-sdk-go-v2 v1.37.1/go.mod h1:9Q0OoGQoboYIAJyslFyF1f5K1Ryddop8gqMhWx/n4Wg=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.0 h1:6GMWV6CNpA/6fbFHnoAjrv4+LGfyTqZz2LtCHnspgDg=

View File

@@ -24,6 +24,7 @@ import (
"github.com/benbjohnson/litestream/gs"
"github.com/benbjohnson/litestream/internal"
"github.com/benbjohnson/litestream/nats"
"github.com/benbjohnson/litestream/oss"
"github.com/benbjohnson/litestream/s3"
"github.com/benbjohnson/litestream/sftp"
"github.com/benbjohnson/litestream/webdav"
@@ -104,6 +105,16 @@ var (
natsPassword = flag.String("nats-password", os.Getenv("LITESTREAM_NATS_PASSWORD"), "")
)
// Alibaba Cloud OSS settings
var (
ossAccessKeyID = flag.String("oss-access-key-id", os.Getenv("LITESTREAM_OSS_ACCESS_KEY_ID"), "")
ossAccessKeySecret = flag.String("oss-access-key-secret", os.Getenv("LITESTREAM_OSS_ACCESS_KEY_SECRET"), "")
ossRegion = flag.String("oss-region", os.Getenv("LITESTREAM_OSS_REGION"), "")
ossBucket = flag.String("oss-bucket", os.Getenv("LITESTREAM_OSS_BUCKET"), "")
ossPath = flag.String("oss-path", os.Getenv("LITESTREAM_OSS_PATH"), "")
ossEndpoint = flag.String("oss-endpoint", os.Getenv("LITESTREAM_OSS_ENDPOINT"), "")
)
func Integration() bool {
return *integration
}
@@ -215,6 +226,8 @@ func NewReplicaClient(tb testing.TB, typ string) litestream.ReplicaClient {
return NewWebDAVReplicaClient(tb)
case nats.ReplicaClientType:
return NewNATSReplicaClient(tb)
case oss.ReplicaClientType:
return NewOSSReplicaClient(tb)
case "tigris":
return NewTigrisReplicaClient(tb)
default:
@@ -347,6 +360,20 @@ func NewNATSReplicaClient(tb testing.TB) *nats.ReplicaClient {
return c
}
// NewOSSReplicaClient returns a new client for integration testing.
func NewOSSReplicaClient(tb testing.TB) *oss.ReplicaClient {
tb.Helper()
c := oss.NewReplicaClient()
c.AccessKeyID = *ossAccessKeyID
c.AccessKeySecret = *ossAccessKeySecret
c.Region = *ossRegion
c.Bucket = *ossBucket
c.Path = path.Join(*ossPath, fmt.Sprintf("%016x", rand.Uint64()))
c.Endpoint = *ossEndpoint
return c
}
// MustDeleteAll deletes all objects under the client's path.
func MustDeleteAll(tb testing.TB, c litestream.ReplicaClient) {
tb.Helper()

618
oss/replica_client.go Normal file
View File

@@ -0,0 +1,618 @@
package oss
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"log/slog"
"net/url"
"os"
"path"
"regexp"
"strings"
"sync"
"time"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
"github.com/superfly/ltx"
"github.com/benbjohnson/litestream"
"github.com/benbjohnson/litestream/internal"
)
// ReplicaClientType is the client type for this package.
const ReplicaClientType = "oss"
// MetadataKeyTimestamp is the metadata key for storing LTX file timestamps in OSS.
// Note: OSS SDK automatically adds "x-oss-meta-" prefix when setting metadata.
const MetadataKeyTimestamp = "litestream-timestamp"
// MaxKeys is the number of keys OSS can operate on per batch.
const MaxKeys = 1000
// DefaultRegion is the region used if one is not specified.
const DefaultRegion = "cn-hangzhou"
var _ litestream.ReplicaClient = (*ReplicaClient)(nil)
// ReplicaClient is a client for writing LTX files to Alibaba Cloud OSS.
type ReplicaClient struct {
mu sync.Mutex
client *oss.Client
uploader *oss.Uploader
logger *slog.Logger
// Alibaba Cloud authentication keys.
AccessKeyID string
AccessKeySecret string
// OSS bucket information
Region string
Bucket string
Path string
Endpoint string
// Upload configuration
PartSize int64 // Part size for multipart uploads (default: 5MB)
Concurrency int // Number of concurrent parts to upload (default: 3)
}
// NewReplicaClient returns a new instance of ReplicaClient.
func NewReplicaClient() *ReplicaClient {
return &ReplicaClient{
logger: slog.Default().WithGroup(ReplicaClientType),
}
}
// Type returns "oss" as the client type.
func (c *ReplicaClient) Type() string {
return ReplicaClientType
}
// Init initializes the connection to OSS. No-op if already initialized.
func (c *ReplicaClient) Init(ctx context.Context) (err error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.client != nil {
return nil
}
// Validate required configuration
if c.Bucket == "" {
return fmt.Errorf("oss: bucket name is required")
}
// Use default region if not specified
region := c.Region
if region == "" {
region = DefaultRegion
}
// Build configuration
cfg := oss.LoadDefaultConfig()
// Configure credentials
if c.AccessKeyID != "" && c.AccessKeySecret != "" {
cfg = cfg.WithCredentialsProvider(
credentials.NewStaticCredentialsProvider(c.AccessKeyID, c.AccessKeySecret),
)
} else {
// Use environment variable credentials provider
cfg = cfg.WithCredentialsProvider(
credentials.NewEnvironmentVariableCredentialsProvider(),
)
}
// Configure region
cfg = cfg.WithRegion(region)
// Configure custom endpoint if specified
if c.Endpoint != "" {
endpoint := c.Endpoint
// Add scheme if not present
if !strings.HasPrefix(endpoint, "http://") && !strings.HasPrefix(endpoint, "https://") {
endpoint = "https://" + endpoint
}
cfg = cfg.WithEndpoint(endpoint)
}
// Create OSS client
c.client = oss.NewClient(cfg)
// Create uploader with configurable part size and concurrency
uploaderOpts := []func(*oss.UploaderOptions){}
if c.PartSize > 0 {
uploaderOpts = append(uploaderOpts, func(o *oss.UploaderOptions) {
o.PartSize = c.PartSize
})
}
if c.Concurrency > 0 {
uploaderOpts = append(uploaderOpts, func(o *oss.UploaderOptions) {
o.ParallelNum = c.Concurrency
})
}
c.uploader = c.client.NewUploader(uploaderOpts...)
return nil
}
// LTXFiles returns an iterator over all LTX files on the replica for the given level.
// When useMetadata is true, fetches accurate timestamps from OSS metadata via HeadObject.
// When false, uses fast LastModified timestamps from LIST operation.
func (c *ReplicaClient) LTXFiles(ctx context.Context, level int, seek ltx.TXID, useMetadata bool) (ltx.FileIterator, error) {
if err := c.Init(ctx); err != nil {
return nil, err
}
return newFileIterator(ctx, c, level, seek, useMetadata), nil
}
// OpenLTXFile returns a reader for an LTX file.
// Returns os.ErrNotExist if no matching file is found.
func (c *ReplicaClient) OpenLTXFile(ctx context.Context, level int, minTXID, maxTXID ltx.TXID, offset, size int64) (io.ReadCloser, error) {
if err := c.Init(ctx); err != nil {
return nil, err
}
// Build the key from the file info
filename := ltx.FormatFilename(minTXID, maxTXID)
key := c.ltxPath(level, filename)
request := &oss.GetObjectRequest{
Bucket: oss.Ptr(c.Bucket),
Key: oss.Ptr(key),
}
// Set range header if offset is specified
if size > 0 {
request.RangeBehavior = oss.Ptr("standard")
request.Range = oss.Ptr(fmt.Sprintf("bytes=%d-%d", offset, offset+size-1))
} else if offset > 0 {
request.RangeBehavior = oss.Ptr("standard")
request.Range = oss.Ptr(fmt.Sprintf("bytes=%d-", offset))
}
result, err := c.client.GetObject(ctx, request)
if err != nil {
if isNotExists(err) {
return nil, os.ErrNotExist
}
return nil, fmt.Errorf("oss: get object %s: %w", key, err)
}
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "GET").Inc()
return result.Body, nil
}
// WriteLTXFile writes an LTX file to the replica.
// Extracts timestamp from LTX header and stores it in OSS metadata to preserve original creation time.
// Uses multipart upload for large files via the uploader.
func (c *ReplicaClient) WriteLTXFile(ctx context.Context, level int, minTXID, maxTXID ltx.TXID, r io.Reader) (*ltx.FileInfo, error) {
if err := c.Init(ctx); err != nil {
return nil, err
}
// Use TeeReader to peek at LTX header while preserving data for upload
var buf bytes.Buffer
teeReader := io.TeeReader(r, &buf)
// Extract timestamp from LTX header
hdr, _, err := ltx.PeekHeader(teeReader)
if err != nil {
return nil, fmt.Errorf("extract timestamp from LTX header: %w", err)
}
timestamp := time.UnixMilli(hdr.Timestamp).UTC()
// Combine buffered data with rest of reader
rc := internal.NewReadCounter(io.MultiReader(&buf, r))
filename := ltx.FormatFilename(minTXID, maxTXID)
key := c.ltxPath(level, filename)
// Store timestamp in OSS metadata for accurate timestamp retrieval
metadata := map[string]string{
MetadataKeyTimestamp: timestamp.Format(time.RFC3339Nano),
}
// Use uploader for automatic multipart handling (files >5GB)
result, err := c.uploader.UploadFrom(ctx, &oss.PutObjectRequest{
Bucket: oss.Ptr(c.Bucket),
Key: oss.Ptr(key),
Metadata: metadata,
}, rc)
if err != nil {
return nil, fmt.Errorf("oss: upload to %s: %w", key, err)
}
// Build file info from the uploaded file
info := &ltx.FileInfo{
Level: level,
MinTXID: minTXID,
MaxTXID: maxTXID,
Size: rc.N(),
CreatedAt: timestamp,
}
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "PUT").Inc()
internal.OperationBytesCounterVec.WithLabelValues(ReplicaClientType, "PUT").Add(float64(rc.N()))
// ETag indicates successful upload
if result.ETag == nil || *result.ETag == "" {
return nil, fmt.Errorf("oss: upload failed: no ETag returned")
}
return info, nil
}
// DeleteLTXFiles deletes one or more LTX files.
func (c *ReplicaClient) DeleteLTXFiles(ctx context.Context, a []*ltx.FileInfo) error {
if err := c.Init(ctx); err != nil {
return err
}
if len(a) == 0 {
return nil
}
// Convert file infos to object identifiers
objects := make([]oss.DeleteObject, 0, len(a))
for _, info := range a {
filename := ltx.FormatFilename(info.MinTXID, info.MaxTXID)
key := c.ltxPath(info.Level, filename)
objects = append(objects, oss.DeleteObject{Key: oss.Ptr(key)})
c.logger.Debug("deleting ltx file", "level", info.Level, "minTXID", info.MinTXID, "maxTXID", info.MaxTXID, "key", key)
}
// Delete in batches
for len(objects) > 0 {
n := min(len(objects), MaxKeys)
batch := objects[:n]
request := &oss.DeleteMultipleObjectsRequest{
Bucket: oss.Ptr(c.Bucket),
Objects: batch,
}
out, err := c.client.DeleteMultipleObjects(ctx, request)
if err != nil {
return fmt.Errorf("oss: delete batch of %d objects: %w", n, err)
} else if err := deleteResultError(batch, out); err != nil {
return err
}
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc()
objects = objects[n:]
}
return nil
}
// DeleteAll deletes all files.
func (c *ReplicaClient) DeleteAll(ctx context.Context) error {
if err := c.Init(ctx); err != nil {
return err
}
var objects []oss.DeleteObject
// Create paginator for listing objects
prefix := c.Path + "/"
paginator := c.client.NewListObjectsV2Paginator(&oss.ListObjectsV2Request{
Bucket: oss.Ptr(c.Bucket),
Prefix: oss.Ptr(prefix),
})
// Iterate through all pages
for paginator.HasNext() {
page, err := paginator.NextPage(ctx)
if err != nil {
return fmt.Errorf("oss: list objects page: %w", err)
}
// Collect object identifiers
for _, obj := range page.Contents {
if obj.Key != nil {
objects = append(objects, oss.DeleteObject{Key: obj.Key})
}
}
}
// Delete all collected objects in batches
for len(objects) > 0 {
n := min(len(objects), MaxKeys)
batch := objects[:n]
request := &oss.DeleteMultipleObjectsRequest{
Bucket: oss.Ptr(c.Bucket),
Objects: batch,
}
out, err := c.client.DeleteMultipleObjects(ctx, request)
if err != nil {
return fmt.Errorf("oss: delete all batch of %d objects: %w", n, err)
} else if err := deleteResultError(batch, out); err != nil {
return err
}
objects = objects[n:]
}
return nil
}
// ltxPath returns the full path to an LTX file.
func (c *ReplicaClient) ltxPath(level int, filename string) string {
return c.Path + "/" + fmt.Sprintf("%04x/%s", level, filename)
}
// fileIterator represents an iterator over LTX files in OSS.
type fileIterator struct {
ctx context.Context
cancel context.CancelFunc
client *ReplicaClient
level int
seek ltx.TXID
useMetadata bool // When true, fetch accurate timestamps from metadata
paginator *oss.ListObjectsV2Paginator
page *oss.ListObjectsV2Result
pageIndex int
initialized bool
closed bool
err error
info *ltx.FileInfo
}
func newFileIterator(ctx context.Context, client *ReplicaClient, level int, seek ltx.TXID, useMetadata bool) *fileIterator {
ctx, cancel := context.WithCancel(ctx)
itr := &fileIterator{
ctx: ctx,
cancel: cancel,
client: client,
level: level,
seek: seek,
useMetadata: useMetadata,
}
return itr
}
// initPaginator initializes the paginator lazily.
func (itr *fileIterator) initPaginator() {
if itr.initialized {
return
}
itr.initialized = true
// Create paginator for listing objects with level prefix
prefix := itr.client.ltxPath(itr.level, "")
itr.paginator = itr.client.client.NewListObjectsV2Paginator(&oss.ListObjectsV2Request{
Bucket: oss.Ptr(itr.client.Bucket),
Prefix: oss.Ptr(prefix),
})
}
// Close stops iteration.
func (itr *fileIterator) Close() (err error) {
itr.closed = true
itr.cancel()
return nil
}
// Next returns the next file. Returns false when no more files are available.
func (itr *fileIterator) Next() bool {
if itr.closed || itr.err != nil {
return false
}
// Initialize paginator on first call
itr.initPaginator()
// Process objects until we find a valid LTX file
for {
// Load next page if needed
if itr.page == nil || itr.pageIndex >= len(itr.page.Contents) {
if !itr.paginator.HasNext() {
return false
}
var err error
itr.page, err = itr.paginator.NextPage(itr.ctx)
if err != nil {
itr.err = err
return false
}
itr.pageIndex = 0
}
// Process current object
if itr.pageIndex < len(itr.page.Contents) {
obj := itr.page.Contents[itr.pageIndex]
itr.pageIndex++
if obj.Key == nil {
continue
}
// Extract file info from key
key := path.Base(*obj.Key)
minTXID, maxTXID, err := ltx.ParseFilename(key)
if err != nil {
continue // Skip non-LTX files
}
// Build file info
info := &ltx.FileInfo{
Level: itr.level,
MinTXID: minTXID,
MaxTXID: maxTXID,
}
// Skip if below seek TXID
if info.MinTXID < itr.seek {
continue
}
// Set file info
info.Size = obj.Size
// Use fast LastModified timestamp by default
var createdAt time.Time
if obj.LastModified != nil {
createdAt = obj.LastModified.UTC()
} else {
createdAt = time.Now().UTC()
}
// Only fetch accurate timestamp from metadata when requested (timestamp-based restore)
if itr.useMetadata {
head, err := itr.client.client.HeadObject(itr.ctx, &oss.HeadObjectRequest{
Bucket: oss.Ptr(itr.client.Bucket),
Key: obj.Key,
})
if err != nil {
itr.err = fmt.Errorf("fetch object metadata: %w", err)
return false
}
if head.Metadata != nil {
if ts, ok := head.Metadata[MetadataKeyTimestamp]; ok {
if parsed, err := time.Parse(time.RFC3339Nano, ts); err == nil {
createdAt = parsed
} else {
itr.err = fmt.Errorf("parse timestamp from metadata: %w", err)
return false
}
}
}
}
info.CreatedAt = createdAt
itr.info = info
return true
}
}
}
// Item returns the metadata for the current file.
func (itr *fileIterator) Item() *ltx.FileInfo {
return itr.info
}
// Err returns any error that occurred during iteration.
func (itr *fileIterator) Err() error {
return itr.err
}
// ParseURL parses an OSS URL into its host and path parts.
func ParseURL(s string) (bucket, region, key string, err error) {
u, err := url.Parse(s)
if err != nil {
return "", "", "", err
}
if u.Scheme != "oss" {
return "", "", "", fmt.Errorf("oss: invalid url scheme")
}
// Parse host to extract bucket and region
bucket, region, _ = ParseHost(u.Host)
if bucket == "" {
bucket = u.Host
}
key = strings.TrimPrefix(u.Path, "/")
return bucket, region, key, nil
}
// ParseHost parses the host/endpoint for an OSS storage system.
// Supports formats like:
// - bucket.oss-cn-hangzhou.aliyuncs.com
// - bucket.oss-cn-hangzhou-internal.aliyuncs.com
// - bucket (just bucket name)
func ParseHost(host string) (bucket, region, endpoint string) {
// Check for internal OSS URL format first (more specific pattern)
if a := ossInternalRegex.FindStringSubmatch(host); len(a) > 1 {
bucket = a[1]
if len(a) > 2 && a[2] != "" {
region = a[2]
}
return bucket, region, ""
}
// Check for standard OSS URL format
if a := ossRegex.FindStringSubmatch(host); len(a) > 1 {
bucket = a[1]
if len(a) > 2 && a[2] != "" {
region = a[2]
}
return bucket, region, ""
}
// For other hosts, assume it's just the bucket name
return host, "", ""
}
var (
// oss-cn-hangzhou.aliyuncs.com or bucket.oss-cn-hangzhou.aliyuncs.com
ossRegex = regexp.MustCompile(`^(?:([^.]+)\.)?oss-([^.]+)\.aliyuncs\.com$`)
// oss-cn-hangzhou-internal.aliyuncs.com or bucket.oss-cn-hangzhou-internal.aliyuncs.com
// Uses non-greedy .+? to correctly extract region without -internal suffix
ossInternalRegex = regexp.MustCompile(`^(?:([^.]+)\.)?oss-(.+?)-internal\.aliyuncs\.com$`)
)
func isNotExists(err error) bool {
var serviceErr *oss.ServiceError
if errors.As(err, &serviceErr) {
return serviceErr.Code == "NoSuchKey"
}
return false
}
// deleteResultError checks if all requested objects were deleted.
// OSS SDK doesn't have explicit per-object error reporting like S3, so we verify
// all requested keys appear in the deleted list.
func deleteResultError(requested []oss.DeleteObject, out *oss.DeleteMultipleObjectsResult) error {
if out == nil {
return nil
}
// Build set of deleted keys for quick lookup
deleted := make(map[string]struct{}, len(out.DeletedObjects))
for _, obj := range out.DeletedObjects {
if obj.Key != nil {
deleted[*obj.Key] = struct{}{}
}
}
// Check that all requested keys were deleted
var failed []string
for _, obj := range requested {
if obj.Key == nil {
continue
}
if _, ok := deleted[*obj.Key]; !ok {
failed = append(failed, *obj.Key)
}
}
if len(failed) == 0 {
return nil
}
// Build error message listing failed keys
var b strings.Builder
b.WriteString("oss: failed to delete files:")
for _, key := range failed {
fmt.Fprintf(&b, "\n%s", key)
}
return errors.New(b.String())
}

372
oss/replica_client_test.go Normal file
View File

@@ -0,0 +1,372 @@
package oss
import (
"errors"
"testing"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
)
func TestReplicaClient_Type(t *testing.T) {
c := NewReplicaClient()
if got := c.Type(); got != ReplicaClientType {
t.Errorf("Type() = %q, want %q", got, ReplicaClientType)
}
if got := c.Type(); got != "oss" {
t.Errorf("Type() = %q, want %q", got, "oss")
}
}
func TestReplicaClient_Init_BucketValidation(t *testing.T) {
t.Run("EmptyBucket", func(t *testing.T) {
c := NewReplicaClient()
c.Bucket = "" // Empty bucket name
c.Region = "cn-hangzhou"
err := c.Init(t.Context())
if err == nil {
t.Fatal("expected error for empty bucket name")
}
if got := err.Error(); got != "oss: bucket name is required" {
t.Errorf("unexpected error: %v", err)
}
})
t.Run("ValidBucketWithRegion", func(t *testing.T) {
c := NewReplicaClient()
c.Bucket = "test-bucket"
c.Region = "cn-hangzhou"
c.AccessKeyID = "test-key"
c.AccessKeySecret = "test-secret"
// Init should succeed (client will be created even without real credentials)
err := c.Init(t.Context())
if err != nil {
t.Errorf("Init() should succeed with valid bucket: %v", err)
}
})
t.Run("ValidBucketDefaultRegion", func(t *testing.T) {
c := NewReplicaClient()
c.Bucket = "test-bucket"
// Region is empty, should use DefaultRegion
c.AccessKeyID = "test-key"
c.AccessKeySecret = "test-secret"
err := c.Init(t.Context())
if err != nil {
t.Errorf("Init() should succeed with default region: %v", err)
}
})
}
func TestReplicaClient_Init_Idempotent(t *testing.T) {
c := NewReplicaClient()
c.Bucket = "test-bucket"
c.AccessKeyID = "test-key"
c.AccessKeySecret = "test-secret"
// First init
if err := c.Init(t.Context()); err != nil {
t.Fatalf("first Init() failed: %v", err)
}
// Second init should be a no-op
if err := c.Init(t.Context()); err != nil {
t.Fatalf("second Init() failed: %v", err)
}
}
func TestParseURL(t *testing.T) {
tests := []struct {
name string
url string
wantBucket string
wantRegion string
wantKey string
wantErr bool
}{
{
name: "SimpleOSSURL",
url: "oss://my-bucket/path/to/file",
wantBucket: "my-bucket",
wantRegion: "",
wantKey: "path/to/file",
wantErr: false,
},
{
name: "OSSURLWithRegion",
url: "oss://my-bucket.oss-cn-hangzhou.aliyuncs.com/backup",
wantBucket: "my-bucket",
wantRegion: "cn-hangzhou",
wantKey: "backup",
wantErr: false,
},
{
name: "OSSURLNoPath",
url: "oss://my-bucket",
wantBucket: "my-bucket",
wantRegion: "",
wantKey: "",
wantErr: false,
},
{
name: "InvalidScheme",
url: "s3://my-bucket/path",
wantBucket: "",
wantRegion: "",
wantKey: "",
wantErr: true,
},
{
name: "HTTPScheme",
url: "http://my-bucket/path",
wantBucket: "",
wantRegion: "",
wantKey: "",
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
bucket, region, key, err := ParseURL(tt.url)
if (err != nil) != tt.wantErr {
t.Errorf("ParseURL() error = %v, wantErr %v", err, tt.wantErr)
return
}
if err != nil {
return
}
if bucket != tt.wantBucket {
t.Errorf("bucket = %q, want %q", bucket, tt.wantBucket)
}
if region != tt.wantRegion {
t.Errorf("region = %q, want %q", region, tt.wantRegion)
}
if key != tt.wantKey {
t.Errorf("key = %q, want %q", key, tt.wantKey)
}
})
}
}
func TestParseHost(t *testing.T) {
tests := []struct {
name string
host string
wantBucket string
wantRegion string
}{
{
name: "StandardOSSURL",
host: "my-bucket.oss-cn-hangzhou.aliyuncs.com",
wantBucket: "my-bucket",
wantRegion: "cn-hangzhou",
},
{
name: "OSSURLBeijingRegion",
host: "test-bucket.oss-cn-beijing.aliyuncs.com",
wantBucket: "test-bucket",
wantRegion: "cn-beijing",
},
{
name: "OSSURLShanghaiRegion",
host: "data-bucket.oss-cn-shanghai.aliyuncs.com",
wantBucket: "data-bucket",
wantRegion: "cn-shanghai",
},
{
name: "InternalOSSURL",
host: "my-bucket.oss-cn-hangzhou-internal.aliyuncs.com",
wantBucket: "my-bucket",
wantRegion: "cn-hangzhou",
},
{
name: "InternalOSSURLBeijing",
host: "test-bucket.oss-cn-beijing-internal.aliyuncs.com",
wantBucket: "test-bucket",
wantRegion: "cn-beijing",
},
{
name: "SimpleBucketName",
host: "my-bucket",
wantBucket: "my-bucket",
wantRegion: "",
},
{
name: "BucketWithHyphens",
host: "my-test-bucket-2024.oss-cn-shenzhen.aliyuncs.com",
wantBucket: "my-test-bucket-2024",
wantRegion: "cn-shenzhen",
},
{
name: "OSSURLWithNumbers",
host: "bucket123.oss-cn-hangzhou.aliyuncs.com",
wantBucket: "bucket123",
wantRegion: "cn-hangzhou",
},
{
name: "OSSURLHongKong",
host: "hk-bucket.oss-cn-hongkong.aliyuncs.com",
wantBucket: "hk-bucket",
wantRegion: "cn-hongkong",
},
{
name: "OSSURLSingapore",
host: "sg-bucket.oss-ap-southeast-1.aliyuncs.com",
wantBucket: "sg-bucket",
wantRegion: "ap-southeast-1",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
bucket, region, _ := ParseHost(tt.host)
if bucket != tt.wantBucket {
t.Errorf("bucket = %q, want %q", bucket, tt.wantBucket)
}
if region != tt.wantRegion {
t.Errorf("region = %q, want %q", region, tt.wantRegion)
}
})
}
}
func TestIsNotExists(t *testing.T) {
t.Run("NilError", func(t *testing.T) {
if isNotExists(nil) {
t.Error("isNotExists should return false for nil error")
}
})
t.Run("RegularError", func(t *testing.T) {
regularErr := errors.New("regular error")
if isNotExists(regularErr) {
t.Error("isNotExists should return false for regular error")
}
})
t.Run("WrappedError", func(t *testing.T) {
wrappedErr := errors.New("wrapped: something went wrong")
if isNotExists(wrappedErr) {
t.Error("isNotExists should return false for wrapped non-ServiceError")
}
})
}
func TestLtxPath(t *testing.T) {
c := NewReplicaClient()
c.Path = "backups"
tests := []struct {
level int
filename string
want string
}{
{0, "00000001-00000001.ltx", "backups/0000/00000001-00000001.ltx"},
{1, "00000001-00000010.ltx", "backups/0001/00000001-00000010.ltx"},
{15, "00000001-000000ff.ltx", "backups/000f/00000001-000000ff.ltx"},
}
for _, tt := range tests {
t.Run(tt.want, func(t *testing.T) {
got := c.ltxPath(tt.level, tt.filename)
if got != tt.want {
t.Errorf("ltxPath(%d, %q) = %q, want %q", tt.level, tt.filename, got, tt.want)
}
})
}
}
func TestDeleteResultError(t *testing.T) {
ptr := func(s string) *string { return &s }
t.Run("NilResult", func(t *testing.T) {
requested := []oss.DeleteObject{{Key: ptr("key1")}}
if err := deleteResultError(requested, nil); err != nil {
t.Errorf("expected nil error for nil result, got %v", err)
}
})
t.Run("AllDeleted", func(t *testing.T) {
requested := []oss.DeleteObject{
{Key: ptr("key1")},
{Key: ptr("key2")},
}
result := &oss.DeleteMultipleObjectsResult{
DeletedObjects: []oss.DeletedInfo{
{Key: ptr("key1")},
{Key: ptr("key2")},
},
}
if err := deleteResultError(requested, result); err != nil {
t.Errorf("expected nil error when all deleted, got %v", err)
}
})
t.Run("SomeNotDeleted", func(t *testing.T) {
requested := []oss.DeleteObject{
{Key: ptr("key1")},
{Key: ptr("key2")},
{Key: ptr("key3")},
}
result := &oss.DeleteMultipleObjectsResult{
DeletedObjects: []oss.DeletedInfo{
{Key: ptr("key1")},
// key2 and key3 not deleted
},
}
err := deleteResultError(requested, result)
if err == nil {
t.Fatal("expected error when some keys not deleted")
}
errStr := err.Error()
if !contains(errStr, "key2") {
t.Errorf("error should mention key2: %s", errStr)
}
if !contains(errStr, "key3") {
t.Errorf("error should mention key3: %s", errStr)
}
})
t.Run("EmptyRequested", func(t *testing.T) {
requested := []oss.DeleteObject{}
result := &oss.DeleteMultipleObjectsResult{}
if err := deleteResultError(requested, result); err != nil {
t.Errorf("expected nil error for empty requested, got %v", err)
}
})
t.Run("NilKeyInRequested", func(t *testing.T) {
requested := []oss.DeleteObject{
{Key: nil}, // nil key should be skipped
{Key: ptr("key1")},
}
result := &oss.DeleteMultipleObjectsResult{
DeletedObjects: []oss.DeletedInfo{
{Key: ptr("key1")},
},
}
if err := deleteResultError(requested, result); err != nil {
t.Errorf("expected nil error, got %v", err)
}
})
}
func contains(s, substr string) bool {
return len(s) >= len(substr) && (s == substr || len(s) > 0 && containsHelper(s, substr))
}
func containsHelper(s, substr string) bool {
for i := 0; i <= len(s)-len(substr); i++ {
if s[i:i+len(substr)] == substr {
return true
}
}
return false
}