mirror of
https://github.com/benbjohnson/litestream.git
synced 2026-01-25 05:06:30 +00:00
Fix: standardize error message prefixes across all replica clients (#711)
Some checks failed
Commit / Lint (push) Has been cancelled
Commit / Build Windows (push) Has been cancelled
Commit / Build & Unit Test (push) Has been cancelled
Commit / Run S3 Mock Tests (push) Has been cancelled
Commit / Run S3 Integration Tests (push) Has been cancelled
Commit / Run GCP Integration Tests (push) Has been cancelled
Commit / Run Azure Blob Store Integration Tests (push) Has been cancelled
Commit / Run SFTP Integration Tests (push) Has been cancelled
Some checks failed
Commit / Lint (push) Has been cancelled
Commit / Build Windows (push) Has been cancelled
Commit / Build & Unit Test (push) Has been cancelled
Commit / Run S3 Mock Tests (push) Has been cancelled
Commit / Run S3 Integration Tests (push) Has been cancelled
Commit / Run GCP Integration Tests (push) Has been cancelled
Commit / Run Azure Blob Store Integration Tests (push) Has been cancelled
Commit / Run SFTP Integration Tests (push) Has been cancelled
Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -79,7 +79,7 @@ func (c *ReplicaClient) DeleteAll(ctx context.Context) error {
|
|||||||
if err := c.bkt.Object(attrs.Name).Delete(ctx); isNotExists(err) {
|
if err := c.bkt.Object(attrs.Name).Delete(ctx); isNotExists(err) {
|
||||||
continue
|
continue
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
return fmt.Errorf("cannot delete object %q: %w", attrs.Name, err)
|
return fmt.Errorf("gs: cannot delete object %q: %w", attrs.Name, err)
|
||||||
}
|
}
|
||||||
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc()
|
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc()
|
||||||
}
|
}
|
||||||
@@ -166,7 +166,7 @@ func (c *ReplicaClient) DeleteLTXFiles(ctx context.Context, a []*ltx.FileInfo) e
|
|||||||
for _, info := range a {
|
for _, info := range a {
|
||||||
key := litestream.LTXFilePath(c.Path, info.Level, info.MinTXID, info.MaxTXID)
|
key := litestream.LTXFilePath(c.Path, info.Level, info.MinTXID, info.MaxTXID)
|
||||||
if err := c.bkt.Object(key).Delete(ctx); err != nil && !isNotExists(err) {
|
if err := c.bkt.Object(key).Delete(ctx); err != nil && !isNotExists(err) {
|
||||||
return fmt.Errorf("cannot delete ltx file %q: %w", key, err)
|
return fmt.Errorf("gs: cannot delete ltx file %q: %w", key, err)
|
||||||
}
|
}
|
||||||
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc()
|
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -95,7 +95,7 @@ func (c *ReplicaClient) Init(ctx context.Context) (err error) {
|
|||||||
if region == "" {
|
if region == "" {
|
||||||
if c.Endpoint == "" {
|
if c.Endpoint == "" {
|
||||||
if region, err = c.findBucketRegion(ctx, c.Bucket); err != nil {
|
if region, err = c.findBucketRegion(ctx, c.Bucket); err != nil {
|
||||||
return fmt.Errorf("cannot lookup bucket region: %w", err)
|
return fmt.Errorf("s3: cannot lookup bucket region: %w", err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
region = DefaultRegion // default for non-S3 object stores
|
region = DefaultRegion // default for non-S3 object stores
|
||||||
@@ -154,7 +154,7 @@ func (c *ReplicaClient) Init(ctx context.Context) (err error) {
|
|||||||
// Load AWS configuration
|
// Load AWS configuration
|
||||||
cfg, err := config.LoadDefaultConfig(ctx, configOpts...)
|
cfg, err := config.LoadDefaultConfig(ctx, configOpts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot load aws config: %w", err)
|
return fmt.Errorf("s3: cannot load aws config: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create S3 client options
|
// Create S3 client options
|
||||||
@@ -233,7 +233,7 @@ func (c *ReplicaClient) findBucketRegion(ctx context.Context, bucket string) (st
|
|||||||
// Load AWS configuration
|
// Load AWS configuration
|
||||||
cfg, err := config.LoadDefaultConfig(ctx, configOpts...)
|
cfg, err := config.LoadDefaultConfig(ctx, configOpts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", fmt.Errorf("cannot load aws config for region lookup: %w", err)
|
return "", fmt.Errorf("s3: cannot load aws config for region lookup: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Use default region for initial region lookup
|
// Use default region for initial region lookup
|
||||||
@@ -287,7 +287,7 @@ func (c *ReplicaClient) OpenLTXFile(ctx context.Context, level int, minTXID, max
|
|||||||
if isNotExists(err) {
|
if isNotExists(err) {
|
||||||
return nil, os.ErrNotExist
|
return nil, os.ErrNotExist
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("get object %s: %w", key, err)
|
return nil, fmt.Errorf("s3: get object %s: %w", key, err)
|
||||||
}
|
}
|
||||||
return out.Body, nil
|
return out.Body, nil
|
||||||
}
|
}
|
||||||
@@ -306,7 +306,7 @@ func (c *ReplicaClient) WriteLTXFile(ctx context.Context, level int, minTXID, ma
|
|||||||
Body: r,
|
Body: r,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("upload to %s: %w", key, err)
|
return nil, fmt.Errorf("s3: upload to %s: %w", key, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Build file info from the uploaded file
|
// Build file info from the uploaded file
|
||||||
@@ -319,7 +319,7 @@ func (c *ReplicaClient) WriteLTXFile(ctx context.Context, level int, minTXID, ma
|
|||||||
|
|
||||||
// ETag indicates successful upload
|
// ETag indicates successful upload
|
||||||
if out.ETag == nil {
|
if out.ETag == nil {
|
||||||
return nil, fmt.Errorf("upload failed: no ETag returned")
|
return nil, fmt.Errorf("s3: upload failed: no ETag returned")
|
||||||
}
|
}
|
||||||
|
|
||||||
return info, nil
|
return info, nil
|
||||||
@@ -352,7 +352,7 @@ func (c *ReplicaClient) DeleteLTXFiles(ctx context.Context, a []*ltx.FileInfo) e
|
|||||||
Delete: &types.Delete{Objects: objIDs[:n], Quiet: aws.Bool(true)},
|
Delete: &types.Delete{Objects: objIDs[:n], Quiet: aws.Bool(true)},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("delete batch of %d objects: %w", n, err)
|
return fmt.Errorf("s3: delete batch of %d objects: %w", n, err)
|
||||||
} else if err := deleteOutputError(out); err != nil {
|
} else if err := deleteOutputError(out); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -381,7 +381,7 @@ func (c *ReplicaClient) DeleteAll(ctx context.Context) error {
|
|||||||
for paginator.HasMorePages() {
|
for paginator.HasMorePages() {
|
||||||
page, err := paginator.NextPage(ctx)
|
page, err := paginator.NextPage(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("list objects page: %w", err)
|
return fmt.Errorf("s3: list objects page: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Collect object identifiers
|
// Collect object identifiers
|
||||||
@@ -399,7 +399,7 @@ func (c *ReplicaClient) DeleteAll(ctx context.Context) error {
|
|||||||
Delete: &types.Delete{Objects: objIDs[:n], Quiet: aws.Bool(true)},
|
Delete: &types.Delete{Objects: objIDs[:n], Quiet: aws.Bool(true)},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("delete all batch of %d objects: %w", n, err)
|
return fmt.Errorf("s3: delete all batch of %d objects: %w", n, err)
|
||||||
} else if err := deleteOutputError(out); err != nil {
|
} else if err := deleteOutputError(out); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -535,14 +535,14 @@ func ParseURL(s, endpoint string) (bucket, region, key string, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if u.Scheme != "s3" {
|
if u.Scheme != "s3" {
|
||||||
return "", "", "", fmt.Errorf("invalid s3 url scheme")
|
return "", "", "", fmt.Errorf("s3: invalid url scheme")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Special handling for filebase.com
|
// Special handling for filebase.com
|
||||||
if u.Host == "filebase.com" {
|
if u.Host == "filebase.com" {
|
||||||
parts := strings.SplitN(strings.TrimPrefix(u.Path, "/"), "/", 2)
|
parts := strings.SplitN(strings.TrimPrefix(u.Path, "/"), "/", 2)
|
||||||
if len(parts) == 0 {
|
if len(parts) == 0 {
|
||||||
return "", "", "", fmt.Errorf("s3 bucket required")
|
return "", "", "", fmt.Errorf("s3: bucket required")
|
||||||
}
|
}
|
||||||
bucket = parts[0]
|
bucket = parts[0]
|
||||||
if len(parts) > 1 {
|
if len(parts) > 1 {
|
||||||
|
|||||||
@@ -82,12 +82,12 @@ func (c *ReplicaClient) Init(ctx context.Context) (_ *sftp.Client, err error) {
|
|||||||
if c.KeyPath != "" {
|
if c.KeyPath != "" {
|
||||||
buf, err := os.ReadFile(c.KeyPath)
|
buf, err := os.ReadFile(c.KeyPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("cannot read sftp key path: %w", err)
|
return nil, fmt.Errorf("sftp: cannot read sftp key path: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
signer, err := ssh.ParsePrivateKey(buf)
|
signer, err := ssh.ParsePrivateKey(buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("cannot parse sftp key path: %w", err)
|
return nil, fmt.Errorf("sftp: cannot parse sftp key path: %w", err)
|
||||||
}
|
}
|
||||||
config.Auth = append(config.Auth, ssh.PublicKeys(signer))
|
config.Auth = append(config.Auth, ssh.PublicKeys(signer))
|
||||||
}
|
}
|
||||||
@@ -128,7 +128,7 @@ func (c *ReplicaClient) DeleteAll(ctx context.Context) (err error) {
|
|||||||
if err := walker.Err(); os.IsNotExist(err) {
|
if err := walker.Err(); os.IsNotExist(err) {
|
||||||
continue
|
continue
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
return fmt.Errorf("cannot walk path %q: %w", walker.Path(), err)
|
return fmt.Errorf("sftp: cannot walk path %q: %w", walker.Path(), err)
|
||||||
}
|
}
|
||||||
if walker.Stat().IsDir() {
|
if walker.Stat().IsDir() {
|
||||||
dirs = append(dirs, walker.Path())
|
dirs = append(dirs, walker.Path())
|
||||||
@@ -136,7 +136,7 @@ func (c *ReplicaClient) DeleteAll(ctx context.Context) (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err := sftpClient.Remove(walker.Path()); err != nil && !os.IsNotExist(err) {
|
if err := sftpClient.Remove(walker.Path()); err != nil && !os.IsNotExist(err) {
|
||||||
return fmt.Errorf("cannot delete file %q: %w", walker.Path(), err)
|
return fmt.Errorf("sftp: cannot delete file %q: %w", walker.Path(), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc()
|
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc()
|
||||||
@@ -146,7 +146,7 @@ func (c *ReplicaClient) DeleteAll(ctx context.Context) (err error) {
|
|||||||
for i := len(dirs) - 1; i >= 0; i-- {
|
for i := len(dirs) - 1; i >= 0; i-- {
|
||||||
filename := dirs[i]
|
filename := dirs[i]
|
||||||
if err := sftpClient.RemoveDirectory(filename); err != nil && !os.IsNotExist(err) {
|
if err := sftpClient.RemoveDirectory(filename); err != nil && !os.IsNotExist(err) {
|
||||||
return fmt.Errorf("cannot delete directory %q: %w", filename, err)
|
return fmt.Errorf("sftp: cannot delete directory %q: %w", filename, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -207,12 +207,12 @@ func (c *ReplicaClient) WriteLTXFile(ctx context.Context, level int, minTXID, ma
|
|||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
|
|
||||||
if err := sftpClient.MkdirAll(path.Dir(filename)); err != nil {
|
if err := sftpClient.MkdirAll(path.Dir(filename)); err != nil {
|
||||||
return nil, fmt.Errorf("cannot make parent snapshot directory %q: %w", path.Dir(filename), err)
|
return nil, fmt.Errorf("sftp: cannot make parent snapshot directory %q: %w", path.Dir(filename), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
f, err := sftpClient.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC)
|
f, err := sftpClient.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("cannot open snapshot file for writing: %w", err)
|
return nil, fmt.Errorf("sftp: cannot open snapshot file for writing: %w", err)
|
||||||
}
|
}
|
||||||
defer f.Close()
|
defer f.Close()
|
||||||
|
|
||||||
@@ -268,7 +268,7 @@ func (c *ReplicaClient) DeleteLTXFiles(ctx context.Context, a []*ltx.FileInfo) (
|
|||||||
for _, info := range a {
|
for _, info := range a {
|
||||||
filename := litestream.LTXFilePath(c.Path, info.Level, info.MinTXID, info.MaxTXID)
|
filename := litestream.LTXFilePath(c.Path, info.Level, info.MinTXID, info.MaxTXID)
|
||||||
if err := sftpClient.Remove(filename); err != nil && !os.IsNotExist(err) {
|
if err := sftpClient.Remove(filename); err != nil && !os.IsNotExist(err) {
|
||||||
return fmt.Errorf("cannot delete ltx file %q: %w", filename, err)
|
return fmt.Errorf("sftp: cannot delete ltx file %q: %w", filename, err)
|
||||||
}
|
}
|
||||||
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc()
|
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc()
|
||||||
}
|
}
|
||||||
@@ -286,7 +286,7 @@ func (c *ReplicaClient) Cleanup(ctx context.Context) (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err := sftpClient.RemoveDirectory(c.Path); err != nil && !os.IsNotExist(err) {
|
if err := sftpClient.RemoveDirectory(c.Path); err != nil && !os.IsNotExist(err) {
|
||||||
return fmt.Errorf("cannot delete path: %w", err)
|
return fmt.Errorf("sftp: cannot delete path: %w", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user