mirror of
https://github.com/benbjohnson/litestream.git
synced 2026-01-25 05:06:30 +00:00
723 lines
19 KiB
Go
723 lines
19 KiB
Go
package oss
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"net/url"
|
|
"os"
|
|
"path"
|
|
"regexp"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
|
|
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
|
|
"github.com/superfly/ltx"
|
|
"golang.org/x/sync/errgroup"
|
|
"golang.org/x/sync/semaphore"
|
|
|
|
"github.com/benbjohnson/litestream"
|
|
"github.com/benbjohnson/litestream/internal"
|
|
)
|
|
|
|
func init() {
|
|
litestream.RegisterReplicaClientFactory("oss", NewReplicaClientFromURL)
|
|
}
|
|
|
|
// ReplicaClientType is the client type for this package.
|
|
const ReplicaClientType = "oss"
|
|
|
|
// MetadataKeyTimestamp is the metadata key for storing LTX file timestamps in OSS.
|
|
// Note: OSS SDK automatically adds "x-oss-meta-" prefix when setting metadata.
|
|
const MetadataKeyTimestamp = "litestream-timestamp"
|
|
|
|
// MaxKeys is the number of keys OSS can operate on per batch.
|
|
const MaxKeys = 1000
|
|
|
|
// DefaultRegion is the region used if one is not specified.
|
|
const DefaultRegion = "cn-hangzhou"
|
|
|
|
// DefaultMetadataConcurrency is the default number of concurrent HeadObject calls
|
|
// for fetching accurate timestamps during timestamp-based restore.
|
|
const DefaultMetadataConcurrency = 50
|
|
|
|
var _ litestream.ReplicaClient = (*ReplicaClient)(nil)
|
|
|
|
// ReplicaClient is a client for writing LTX files to Alibaba Cloud OSS.
|
|
type ReplicaClient struct {
|
|
mu sync.Mutex
|
|
client *oss.Client
|
|
uploader *oss.Uploader
|
|
logger *slog.Logger
|
|
|
|
// Alibaba Cloud authentication keys.
|
|
AccessKeyID string
|
|
AccessKeySecret string
|
|
|
|
// OSS bucket information
|
|
Region string
|
|
Bucket string
|
|
Path string
|
|
Endpoint string
|
|
|
|
// Upload configuration
|
|
PartSize int64 // Part size for multipart uploads (default: 5MB)
|
|
Concurrency int // Number of concurrent parts to upload (default: 3)
|
|
|
|
// MetadataConcurrency controls parallel HeadObject calls for timestamp-based restore.
|
|
// Higher values improve restore speed for large backup histories.
|
|
// Default: 50
|
|
MetadataConcurrency int
|
|
}
|
|
|
|
// NewReplicaClient returns a new instance of ReplicaClient.
|
|
func NewReplicaClient() *ReplicaClient {
|
|
return &ReplicaClient{
|
|
logger: slog.Default().WithGroup(ReplicaClientType),
|
|
}
|
|
}
|
|
|
|
// NewReplicaClientFromURL creates a new ReplicaClient from URL components.
|
|
// This is used by the replica client factory registration.
|
|
// URL format: oss://bucket[.oss-region.aliyuncs.com]/path
|
|
func NewReplicaClientFromURL(scheme, host, urlPath string, query url.Values, userinfo *url.Userinfo) (litestream.ReplicaClient, error) {
|
|
client := NewReplicaClient()
|
|
|
|
bucket, region, _ := ParseHost(host)
|
|
if bucket == "" {
|
|
return nil, fmt.Errorf("bucket required for oss replica URL")
|
|
}
|
|
|
|
client.Bucket = bucket
|
|
client.Region = region
|
|
client.Path = urlPath
|
|
|
|
return client, nil
|
|
}
|
|
|
|
// Type returns "oss" as the client type.
|
|
func (c *ReplicaClient) Type() string {
|
|
return ReplicaClientType
|
|
}
|
|
|
|
// Init initializes the connection to OSS. No-op if already initialized.
|
|
func (c *ReplicaClient) Init(ctx context.Context) (err error) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
if c.client != nil {
|
|
return nil
|
|
}
|
|
|
|
// Validate required configuration
|
|
if c.Bucket == "" {
|
|
return fmt.Errorf("oss: bucket name is required")
|
|
}
|
|
|
|
// Use default region if not specified
|
|
region := c.Region
|
|
if region == "" {
|
|
region = DefaultRegion
|
|
}
|
|
|
|
// Build configuration
|
|
cfg := oss.LoadDefaultConfig()
|
|
|
|
// Configure credentials
|
|
if c.AccessKeyID != "" && c.AccessKeySecret != "" {
|
|
cfg = cfg.WithCredentialsProvider(
|
|
credentials.NewStaticCredentialsProvider(c.AccessKeyID, c.AccessKeySecret),
|
|
)
|
|
} else {
|
|
// Use environment variable credentials provider
|
|
cfg = cfg.WithCredentialsProvider(
|
|
credentials.NewEnvironmentVariableCredentialsProvider(),
|
|
)
|
|
}
|
|
|
|
// Configure region
|
|
cfg = cfg.WithRegion(region)
|
|
|
|
// Configure custom endpoint if specified
|
|
if c.Endpoint != "" {
|
|
endpoint := c.Endpoint
|
|
// Add scheme if not present
|
|
if !strings.HasPrefix(endpoint, "http://") && !strings.HasPrefix(endpoint, "https://") {
|
|
endpoint = "https://" + endpoint
|
|
}
|
|
cfg = cfg.WithEndpoint(endpoint)
|
|
}
|
|
|
|
// Create OSS client
|
|
c.client = oss.NewClient(cfg)
|
|
|
|
// Create uploader with configurable part size and concurrency
|
|
uploaderOpts := []func(*oss.UploaderOptions){}
|
|
if c.PartSize > 0 {
|
|
uploaderOpts = append(uploaderOpts, func(o *oss.UploaderOptions) {
|
|
o.PartSize = c.PartSize
|
|
})
|
|
}
|
|
if c.Concurrency > 0 {
|
|
uploaderOpts = append(uploaderOpts, func(o *oss.UploaderOptions) {
|
|
o.ParallelNum = c.Concurrency
|
|
})
|
|
}
|
|
c.uploader = c.client.NewUploader(uploaderOpts...)
|
|
|
|
return nil
|
|
}
|
|
|
|
// LTXFiles returns an iterator over all LTX files on the replica for the given level.
|
|
// When useMetadata is true, fetches accurate timestamps from OSS metadata via HeadObject.
|
|
// This uses parallel batched requests (controlled by MetadataConcurrency) to avoid hangs
|
|
// with large backup histories (see issue #930).
|
|
// When false, uses fast LastModified timestamps from LIST operation.
|
|
func (c *ReplicaClient) LTXFiles(ctx context.Context, level int, seek ltx.TXID, useMetadata bool) (ltx.FileIterator, error) {
|
|
if err := c.Init(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
return newFileIterator(ctx, c, level, seek, useMetadata), nil
|
|
}
|
|
|
|
// OpenLTXFile returns a reader for an LTX file.
|
|
// Returns os.ErrNotExist if no matching file is found.
|
|
func (c *ReplicaClient) OpenLTXFile(ctx context.Context, level int, minTXID, maxTXID ltx.TXID, offset, size int64) (io.ReadCloser, error) {
|
|
if err := c.Init(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Build the key from the file info
|
|
filename := ltx.FormatFilename(minTXID, maxTXID)
|
|
key := c.ltxPath(level, filename)
|
|
|
|
request := &oss.GetObjectRequest{
|
|
Bucket: oss.Ptr(c.Bucket),
|
|
Key: oss.Ptr(key),
|
|
}
|
|
|
|
// Set range header if offset is specified
|
|
if size > 0 {
|
|
request.RangeBehavior = oss.Ptr("standard")
|
|
request.Range = oss.Ptr(fmt.Sprintf("bytes=%d-%d", offset, offset+size-1))
|
|
} else if offset > 0 {
|
|
request.RangeBehavior = oss.Ptr("standard")
|
|
request.Range = oss.Ptr(fmt.Sprintf("bytes=%d-", offset))
|
|
}
|
|
|
|
result, err := c.client.GetObject(ctx, request)
|
|
if err != nil {
|
|
if isNotExists(err) {
|
|
return nil, os.ErrNotExist
|
|
}
|
|
return nil, fmt.Errorf("oss: get object %s: %w", key, err)
|
|
}
|
|
|
|
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "GET").Inc()
|
|
|
|
return result.Body, nil
|
|
}
|
|
|
|
// WriteLTXFile writes an LTX file to the replica.
|
|
// Extracts timestamp from LTX header and stores it in OSS metadata to preserve original creation time.
|
|
// Uses multipart upload for large files via the uploader.
|
|
func (c *ReplicaClient) WriteLTXFile(ctx context.Context, level int, minTXID, maxTXID ltx.TXID, r io.Reader) (*ltx.FileInfo, error) {
|
|
if err := c.Init(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Use TeeReader to peek at LTX header while preserving data for upload
|
|
var buf bytes.Buffer
|
|
teeReader := io.TeeReader(r, &buf)
|
|
|
|
// Extract timestamp from LTX header
|
|
hdr, _, err := ltx.PeekHeader(teeReader)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("extract timestamp from LTX header: %w", err)
|
|
}
|
|
timestamp := time.UnixMilli(hdr.Timestamp).UTC()
|
|
|
|
// Combine buffered data with rest of reader
|
|
rc := internal.NewReadCounter(io.MultiReader(&buf, r))
|
|
|
|
filename := ltx.FormatFilename(minTXID, maxTXID)
|
|
key := c.ltxPath(level, filename)
|
|
|
|
// Store timestamp in OSS metadata for accurate timestamp retrieval
|
|
metadata := map[string]string{
|
|
MetadataKeyTimestamp: timestamp.Format(time.RFC3339Nano),
|
|
}
|
|
|
|
// Use uploader for automatic multipart handling (files >5GB)
|
|
result, err := c.uploader.UploadFrom(ctx, &oss.PutObjectRequest{
|
|
Bucket: oss.Ptr(c.Bucket),
|
|
Key: oss.Ptr(key),
|
|
Metadata: metadata,
|
|
}, rc)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("oss: upload to %s: %w", key, err)
|
|
}
|
|
|
|
// Build file info from the uploaded file
|
|
info := <x.FileInfo{
|
|
Level: level,
|
|
MinTXID: minTXID,
|
|
MaxTXID: maxTXID,
|
|
Size: rc.N(),
|
|
CreatedAt: timestamp,
|
|
}
|
|
|
|
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "PUT").Inc()
|
|
internal.OperationBytesCounterVec.WithLabelValues(ReplicaClientType, "PUT").Add(float64(rc.N()))
|
|
|
|
// ETag indicates successful upload
|
|
if result.ETag == nil || *result.ETag == "" {
|
|
return nil, fmt.Errorf("oss: upload failed: no ETag returned")
|
|
}
|
|
|
|
return info, nil
|
|
}
|
|
|
|
// DeleteLTXFiles deletes one or more LTX files.
|
|
func (c *ReplicaClient) DeleteLTXFiles(ctx context.Context, a []*ltx.FileInfo) error {
|
|
if err := c.Init(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(a) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// Convert file infos to object identifiers
|
|
objects := make([]oss.DeleteObject, 0, len(a))
|
|
for _, info := range a {
|
|
filename := ltx.FormatFilename(info.MinTXID, info.MaxTXID)
|
|
key := c.ltxPath(info.Level, filename)
|
|
objects = append(objects, oss.DeleteObject{Key: oss.Ptr(key)})
|
|
|
|
c.logger.Debug("deleting ltx file", "level", info.Level, "minTXID", info.MinTXID, "maxTXID", info.MaxTXID, "key", key)
|
|
}
|
|
|
|
// Delete in batches
|
|
for len(objects) > 0 {
|
|
n := min(len(objects), MaxKeys)
|
|
batch := objects[:n]
|
|
|
|
request := &oss.DeleteMultipleObjectsRequest{
|
|
Bucket: oss.Ptr(c.Bucket),
|
|
Objects: batch,
|
|
}
|
|
|
|
out, err := c.client.DeleteMultipleObjects(ctx, request)
|
|
if err != nil {
|
|
return fmt.Errorf("oss: delete batch of %d objects: %w", n, err)
|
|
} else if err := deleteResultError(batch, out); err != nil {
|
|
return err
|
|
}
|
|
|
|
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc()
|
|
|
|
objects = objects[n:]
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// DeleteAll deletes all files.
|
|
func (c *ReplicaClient) DeleteAll(ctx context.Context) error {
|
|
if err := c.Init(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
var objects []oss.DeleteObject
|
|
|
|
// Create paginator for listing objects
|
|
prefix := c.Path + "/"
|
|
paginator := c.client.NewListObjectsV2Paginator(&oss.ListObjectsV2Request{
|
|
Bucket: oss.Ptr(c.Bucket),
|
|
Prefix: oss.Ptr(prefix),
|
|
})
|
|
|
|
// Iterate through all pages
|
|
for paginator.HasNext() {
|
|
page, err := paginator.NextPage(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("oss: list objects page: %w", err)
|
|
}
|
|
|
|
// Collect object identifiers
|
|
for _, obj := range page.Contents {
|
|
if obj.Key != nil {
|
|
objects = append(objects, oss.DeleteObject{Key: obj.Key})
|
|
}
|
|
}
|
|
}
|
|
|
|
// Delete all collected objects in batches
|
|
for len(objects) > 0 {
|
|
n := min(len(objects), MaxKeys)
|
|
batch := objects[:n]
|
|
|
|
request := &oss.DeleteMultipleObjectsRequest{
|
|
Bucket: oss.Ptr(c.Bucket),
|
|
Objects: batch,
|
|
}
|
|
|
|
out, err := c.client.DeleteMultipleObjects(ctx, request)
|
|
if err != nil {
|
|
return fmt.Errorf("oss: delete all batch of %d objects: %w", n, err)
|
|
} else if err := deleteResultError(batch, out); err != nil {
|
|
return err
|
|
}
|
|
|
|
objects = objects[n:]
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ltxPath returns the full path to an LTX file.
|
|
func (c *ReplicaClient) ltxPath(level int, filename string) string {
|
|
return c.Path + "/" + fmt.Sprintf("%04x/%s", level, filename)
|
|
}
|
|
|
|
// fileIterator represents an iterator over LTX files in OSS.
|
|
type fileIterator struct {
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
client *ReplicaClient
|
|
level int
|
|
seek ltx.TXID
|
|
|
|
useMetadata bool // When true, fetch accurate timestamps from metadata
|
|
metadataCache map[string]time.Time // key -> timestamp cache for batch fetches
|
|
|
|
paginator *oss.ListObjectsV2Paginator
|
|
page *oss.ListObjectsV2Result
|
|
pageIndex int
|
|
initialized bool
|
|
|
|
closed bool
|
|
err error
|
|
info *ltx.FileInfo
|
|
}
|
|
|
|
func newFileIterator(ctx context.Context, client *ReplicaClient, level int, seek ltx.TXID, useMetadata bool) *fileIterator {
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
itr := &fileIterator{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
client: client,
|
|
level: level,
|
|
seek: seek,
|
|
useMetadata: useMetadata,
|
|
metadataCache: make(map[string]time.Time),
|
|
}
|
|
|
|
return itr
|
|
}
|
|
|
|
// fetchMetadataBatch fetches timestamps from OSS metadata for a batch of keys in parallel.
|
|
func (itr *fileIterator) fetchMetadataBatch(keys []string) error {
|
|
if len(keys) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// Determine concurrency limit
|
|
concurrency := itr.client.MetadataConcurrency
|
|
if concurrency <= 0 {
|
|
concurrency = DefaultMetadataConcurrency
|
|
}
|
|
|
|
// Pre-allocate results map to avoid lock contention during writes
|
|
results := make(map[string]time.Time, len(keys))
|
|
var mu sync.Mutex
|
|
|
|
// Use x/sync/semaphore for precise concurrency control with context support
|
|
sem := semaphore.NewWeighted(int64(concurrency))
|
|
g, ctx := errgroup.WithContext(itr.ctx)
|
|
|
|
for _, key := range keys {
|
|
key := key // capture for goroutine
|
|
|
|
g.Go(func() error {
|
|
// Acquire semaphore slot (blocking with context cancellation)
|
|
if err := sem.Acquire(ctx, 1); err != nil {
|
|
return err // context cancelled
|
|
}
|
|
defer sem.Release(1)
|
|
|
|
head, err := itr.client.client.HeadObject(ctx, &oss.HeadObjectRequest{
|
|
Bucket: oss.Ptr(itr.client.Bucket),
|
|
Key: oss.Ptr(key),
|
|
})
|
|
if err != nil {
|
|
// Non-fatal: file might not have metadata, use LastModified
|
|
return nil
|
|
}
|
|
|
|
if head.Metadata != nil {
|
|
if ts, ok := head.Metadata[MetadataKeyTimestamp]; ok {
|
|
if parsed, err := time.Parse(time.RFC3339Nano, ts); err == nil {
|
|
mu.Lock()
|
|
results[key] = parsed
|
|
mu.Unlock()
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
if err := g.Wait(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Merge results into cache
|
|
for k, v := range results {
|
|
itr.metadataCache[k] = v
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// initPaginator initializes the paginator lazily.
|
|
func (itr *fileIterator) initPaginator() {
|
|
if itr.initialized {
|
|
return
|
|
}
|
|
itr.initialized = true
|
|
|
|
// Create paginator for listing objects with level prefix
|
|
prefix := itr.client.ltxPath(itr.level, "")
|
|
itr.paginator = itr.client.client.NewListObjectsV2Paginator(&oss.ListObjectsV2Request{
|
|
Bucket: oss.Ptr(itr.client.Bucket),
|
|
Prefix: oss.Ptr(prefix),
|
|
})
|
|
}
|
|
|
|
// Close stops iteration and returns any error that occurred during iteration.
|
|
func (itr *fileIterator) Close() (err error) {
|
|
itr.closed = true
|
|
itr.cancel()
|
|
return itr.err
|
|
}
|
|
|
|
// Next returns the next file. Returns false when no more files are available.
|
|
func (itr *fileIterator) Next() bool {
|
|
if itr.closed || itr.err != nil {
|
|
return false
|
|
}
|
|
|
|
// Initialize paginator on first call
|
|
itr.initPaginator()
|
|
|
|
// Process objects until we find a valid LTX file
|
|
for {
|
|
// Load next page if needed
|
|
if itr.page == nil || itr.pageIndex >= len(itr.page.Contents) {
|
|
if !itr.paginator.HasNext() {
|
|
return false
|
|
}
|
|
|
|
var err error
|
|
itr.page, err = itr.paginator.NextPage(itr.ctx)
|
|
if err != nil {
|
|
itr.err = err
|
|
return false
|
|
}
|
|
itr.pageIndex = 0
|
|
|
|
// Batch fetch metadata for the entire page when useMetadata is true.
|
|
// This uses parallel HeadObject calls controlled by MetadataConcurrency
|
|
// to avoid the O(N) sequential calls that caused restore hangs (issue #930).
|
|
if itr.useMetadata && len(itr.page.Contents) > 0 {
|
|
keys := make([]string, 0, len(itr.page.Contents))
|
|
for _, obj := range itr.page.Contents {
|
|
if obj.Key != nil {
|
|
keys = append(keys, *obj.Key)
|
|
}
|
|
}
|
|
if err := itr.fetchMetadataBatch(keys); err != nil {
|
|
itr.err = err
|
|
return false
|
|
}
|
|
}
|
|
}
|
|
|
|
// Process current object
|
|
if itr.pageIndex < len(itr.page.Contents) {
|
|
obj := itr.page.Contents[itr.pageIndex]
|
|
itr.pageIndex++
|
|
|
|
if obj.Key == nil {
|
|
continue
|
|
}
|
|
|
|
// Extract file info from key
|
|
fullKey := *obj.Key
|
|
key := path.Base(fullKey)
|
|
minTXID, maxTXID, err := ltx.ParseFilename(key)
|
|
if err != nil {
|
|
continue // Skip non-LTX files
|
|
}
|
|
|
|
// Build file info
|
|
info := <x.FileInfo{
|
|
Level: itr.level,
|
|
MinTXID: minTXID,
|
|
MaxTXID: maxTXID,
|
|
}
|
|
|
|
// Skip if below seek TXID
|
|
if info.MinTXID < itr.seek {
|
|
continue
|
|
}
|
|
|
|
// Set file info
|
|
info.Size = obj.Size
|
|
|
|
// Use cached metadata timestamp if available (from batch fetch),
|
|
// otherwise fallback to LastModified from LIST operation.
|
|
if itr.useMetadata {
|
|
if ts, ok := itr.metadataCache[fullKey]; ok {
|
|
info.CreatedAt = ts
|
|
} else if obj.LastModified != nil {
|
|
info.CreatedAt = obj.LastModified.UTC()
|
|
} else {
|
|
info.CreatedAt = time.Now().UTC()
|
|
}
|
|
} else {
|
|
if obj.LastModified != nil {
|
|
info.CreatedAt = obj.LastModified.UTC()
|
|
} else {
|
|
info.CreatedAt = time.Now().UTC()
|
|
}
|
|
}
|
|
|
|
itr.info = info
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
|
|
// Item returns the metadata for the current file.
|
|
func (itr *fileIterator) Item() *ltx.FileInfo {
|
|
return itr.info
|
|
}
|
|
|
|
// Err returns any error that occurred during iteration.
|
|
func (itr *fileIterator) Err() error {
|
|
return itr.err
|
|
}
|
|
|
|
// ParseURL parses an OSS URL into its host and path parts.
|
|
func ParseURL(s string) (bucket, region, key string, err error) {
|
|
u, err := url.Parse(s)
|
|
if err != nil {
|
|
return "", "", "", err
|
|
}
|
|
|
|
if u.Scheme != "oss" {
|
|
return "", "", "", fmt.Errorf("oss: invalid url scheme")
|
|
}
|
|
|
|
// Parse host to extract bucket and region
|
|
bucket, region, _ = ParseHost(u.Host)
|
|
if bucket == "" {
|
|
bucket = u.Host
|
|
}
|
|
|
|
key = strings.TrimPrefix(u.Path, "/")
|
|
return bucket, region, key, nil
|
|
}
|
|
|
|
// ParseHost parses the host/endpoint for an OSS storage system.
|
|
// Supports formats like:
|
|
// - bucket.oss-cn-hangzhou.aliyuncs.com
|
|
// - bucket.oss-cn-hangzhou-internal.aliyuncs.com
|
|
// - bucket (just bucket name)
|
|
func ParseHost(host string) (bucket, region, endpoint string) {
|
|
// Check for internal OSS URL format first (more specific pattern)
|
|
if a := ossInternalRegex.FindStringSubmatch(host); len(a) > 1 {
|
|
bucket = a[1]
|
|
if len(a) > 2 && a[2] != "" {
|
|
region = a[2]
|
|
}
|
|
return bucket, region, ""
|
|
}
|
|
|
|
// Check for standard OSS URL format
|
|
if a := ossRegex.FindStringSubmatch(host); len(a) > 1 {
|
|
bucket = a[1]
|
|
if len(a) > 2 && a[2] != "" {
|
|
region = a[2]
|
|
}
|
|
return bucket, region, ""
|
|
}
|
|
|
|
// For other hosts, assume it's just the bucket name
|
|
return host, "", ""
|
|
}
|
|
|
|
var (
|
|
// oss-cn-hangzhou.aliyuncs.com or bucket.oss-cn-hangzhou.aliyuncs.com
|
|
ossRegex = regexp.MustCompile(`^(?:([^.]+)\.)?oss-([^.]+)\.aliyuncs\.com$`)
|
|
// oss-cn-hangzhou-internal.aliyuncs.com or bucket.oss-cn-hangzhou-internal.aliyuncs.com
|
|
// Uses non-greedy .+? to correctly extract region without -internal suffix
|
|
ossInternalRegex = regexp.MustCompile(`^(?:([^.]+)\.)?oss-(.+?)-internal\.aliyuncs\.com$`)
|
|
)
|
|
|
|
func isNotExists(err error) bool {
|
|
var serviceErr *oss.ServiceError
|
|
if errors.As(err, &serviceErr) {
|
|
return serviceErr.Code == "NoSuchKey"
|
|
}
|
|
return false
|
|
}
|
|
|
|
// deleteResultError checks if all requested objects were deleted.
|
|
// OSS SDK doesn't have explicit per-object error reporting like S3, so we verify
|
|
// all requested keys appear in the deleted list.
|
|
func deleteResultError(requested []oss.DeleteObject, out *oss.DeleteMultipleObjectsResult) error {
|
|
if out == nil {
|
|
return nil
|
|
}
|
|
|
|
// Build set of deleted keys for quick lookup
|
|
deleted := make(map[string]struct{}, len(out.DeletedObjects))
|
|
for _, obj := range out.DeletedObjects {
|
|
if obj.Key != nil {
|
|
deleted[*obj.Key] = struct{}{}
|
|
}
|
|
}
|
|
|
|
// Check that all requested keys were deleted
|
|
var failed []string
|
|
for _, obj := range requested {
|
|
if obj.Key == nil {
|
|
continue
|
|
}
|
|
if _, ok := deleted[*obj.Key]; !ok {
|
|
failed = append(failed, *obj.Key)
|
|
}
|
|
}
|
|
|
|
if len(failed) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// Build error message listing failed keys
|
|
var b strings.Builder
|
|
b.WriteString("oss: failed to delete files:")
|
|
for _, key := range failed {
|
|
fmt.Fprintf(&b, "\n%s", key)
|
|
}
|
|
return errors.New(b.String())
|
|
}
|