diff --git a/gs/replica_client.go b/gs/replica_client.go index a6e6c8e..fe2c9a5 100644 --- a/gs/replica_client.go +++ b/gs/replica_client.go @@ -79,7 +79,7 @@ func (c *ReplicaClient) DeleteAll(ctx context.Context) error { if err := c.bkt.Object(attrs.Name).Delete(ctx); isNotExists(err) { continue } else if err != nil { - return fmt.Errorf("cannot delete object %q: %w", attrs.Name, err) + return fmt.Errorf("gs: cannot delete object %q: %w", attrs.Name, err) } internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc() } @@ -166,7 +166,7 @@ func (c *ReplicaClient) DeleteLTXFiles(ctx context.Context, a []*ltx.FileInfo) e for _, info := range a { key := litestream.LTXFilePath(c.Path, info.Level, info.MinTXID, info.MaxTXID) if err := c.bkt.Object(key).Delete(ctx); err != nil && !isNotExists(err) { - return fmt.Errorf("cannot delete ltx file %q: %w", key, err) + return fmt.Errorf("gs: cannot delete ltx file %q: %w", key, err) } internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc() } diff --git a/s3/replica_client.go b/s3/replica_client.go index e9cb10c..d544940 100644 --- a/s3/replica_client.go +++ b/s3/replica_client.go @@ -95,7 +95,7 @@ func (c *ReplicaClient) Init(ctx context.Context) (err error) { if region == "" { if c.Endpoint == "" { if region, err = c.findBucketRegion(ctx, c.Bucket); err != nil { - return fmt.Errorf("cannot lookup bucket region: %w", err) + return fmt.Errorf("s3: cannot lookup bucket region: %w", err) } } else { region = DefaultRegion // default for non-S3 object stores @@ -154,7 +154,7 @@ func (c *ReplicaClient) Init(ctx context.Context) (err error) { // Load AWS configuration cfg, err := config.LoadDefaultConfig(ctx, configOpts...) if err != nil { - return fmt.Errorf("cannot load aws config: %w", err) + return fmt.Errorf("s3: cannot load aws config: %w", err) } // Create S3 client options @@ -233,7 +233,7 @@ func (c *ReplicaClient) findBucketRegion(ctx context.Context, bucket string) (st // Load AWS configuration cfg, err := config.LoadDefaultConfig(ctx, configOpts...) if err != nil { - return "", fmt.Errorf("cannot load aws config for region lookup: %w", err) + return "", fmt.Errorf("s3: cannot load aws config for region lookup: %w", err) } // Use default region for initial region lookup @@ -287,7 +287,7 @@ func (c *ReplicaClient) OpenLTXFile(ctx context.Context, level int, minTXID, max if isNotExists(err) { return nil, os.ErrNotExist } - return nil, fmt.Errorf("get object %s: %w", key, err) + return nil, fmt.Errorf("s3: get object %s: %w", key, err) } return out.Body, nil } @@ -306,7 +306,7 @@ func (c *ReplicaClient) WriteLTXFile(ctx context.Context, level int, minTXID, ma Body: r, }) if err != nil { - return nil, fmt.Errorf("upload to %s: %w", key, err) + return nil, fmt.Errorf("s3: upload to %s: %w", key, err) } // Build file info from the uploaded file @@ -319,7 +319,7 @@ func (c *ReplicaClient) WriteLTXFile(ctx context.Context, level int, minTXID, ma // ETag indicates successful upload if out.ETag == nil { - return nil, fmt.Errorf("upload failed: no ETag returned") + return nil, fmt.Errorf("s3: upload failed: no ETag returned") } return info, nil @@ -352,7 +352,7 @@ func (c *ReplicaClient) DeleteLTXFiles(ctx context.Context, a []*ltx.FileInfo) e Delete: &types.Delete{Objects: objIDs[:n], Quiet: aws.Bool(true)}, }) if err != nil { - return fmt.Errorf("delete batch of %d objects: %w", n, err) + return fmt.Errorf("s3: delete batch of %d objects: %w", n, err) } else if err := deleteOutputError(out); err != nil { return err } @@ -381,7 +381,7 @@ func (c *ReplicaClient) DeleteAll(ctx context.Context) error { for paginator.HasMorePages() { page, err := paginator.NextPage(ctx) if err != nil { - return fmt.Errorf("list objects page: %w", err) + return fmt.Errorf("s3: list objects page: %w", err) } // Collect object identifiers @@ -399,7 +399,7 @@ func (c *ReplicaClient) DeleteAll(ctx context.Context) error { Delete: &types.Delete{Objects: objIDs[:n], Quiet: aws.Bool(true)}, }) if err != nil { - return fmt.Errorf("delete all batch of %d objects: %w", n, err) + return fmt.Errorf("s3: delete all batch of %d objects: %w", n, err) } else if err := deleteOutputError(out); err != nil { return err } @@ -535,14 +535,14 @@ func ParseURL(s, endpoint string) (bucket, region, key string, err error) { } if u.Scheme != "s3" { - return "", "", "", fmt.Errorf("invalid s3 url scheme") + return "", "", "", fmt.Errorf("s3: invalid url scheme") } // Special handling for filebase.com if u.Host == "filebase.com" { parts := strings.SplitN(strings.TrimPrefix(u.Path, "/"), "/", 2) if len(parts) == 0 { - return "", "", "", fmt.Errorf("s3 bucket required") + return "", "", "", fmt.Errorf("s3: bucket required") } bucket = parts[0] if len(parts) > 1 { diff --git a/sftp/replica_client.go b/sftp/replica_client.go index 22ca537..6d68aaa 100644 --- a/sftp/replica_client.go +++ b/sftp/replica_client.go @@ -82,12 +82,12 @@ func (c *ReplicaClient) Init(ctx context.Context) (_ *sftp.Client, err error) { if c.KeyPath != "" { buf, err := os.ReadFile(c.KeyPath) if err != nil { - return nil, fmt.Errorf("cannot read sftp key path: %w", err) + return nil, fmt.Errorf("sftp: cannot read sftp key path: %w", err) } signer, err := ssh.ParsePrivateKey(buf) if err != nil { - return nil, fmt.Errorf("cannot parse sftp key path: %w", err) + return nil, fmt.Errorf("sftp: cannot parse sftp key path: %w", err) } config.Auth = append(config.Auth, ssh.PublicKeys(signer)) } @@ -128,7 +128,7 @@ func (c *ReplicaClient) DeleteAll(ctx context.Context) (err error) { if err := walker.Err(); os.IsNotExist(err) { continue } else if err != nil { - return fmt.Errorf("cannot walk path %q: %w", walker.Path(), err) + return fmt.Errorf("sftp: cannot walk path %q: %w", walker.Path(), err) } if walker.Stat().IsDir() { dirs = append(dirs, walker.Path()) @@ -136,7 +136,7 @@ func (c *ReplicaClient) DeleteAll(ctx context.Context) (err error) { } if err := sftpClient.Remove(walker.Path()); err != nil && !os.IsNotExist(err) { - return fmt.Errorf("cannot delete file %q: %w", walker.Path(), err) + return fmt.Errorf("sftp: cannot delete file %q: %w", walker.Path(), err) } internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc() @@ -146,7 +146,7 @@ func (c *ReplicaClient) DeleteAll(ctx context.Context) (err error) { for i := len(dirs) - 1; i >= 0; i-- { filename := dirs[i] if err := sftpClient.RemoveDirectory(filename); err != nil && !os.IsNotExist(err) { - return fmt.Errorf("cannot delete directory %q: %w", filename, err) + return fmt.Errorf("sftp: cannot delete directory %q: %w", filename, err) } } @@ -207,12 +207,12 @@ func (c *ReplicaClient) WriteLTXFile(ctx context.Context, level int, minTXID, ma startTime := time.Now() if err := sftpClient.MkdirAll(path.Dir(filename)); err != nil { - return nil, fmt.Errorf("cannot make parent snapshot directory %q: %w", path.Dir(filename), err) + return nil, fmt.Errorf("sftp: cannot make parent snapshot directory %q: %w", path.Dir(filename), err) } f, err := sftpClient.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC) if err != nil { - return nil, fmt.Errorf("cannot open snapshot file for writing: %w", err) + return nil, fmt.Errorf("sftp: cannot open snapshot file for writing: %w", err) } defer f.Close() @@ -268,7 +268,7 @@ func (c *ReplicaClient) DeleteLTXFiles(ctx context.Context, a []*ltx.FileInfo) ( for _, info := range a { filename := litestream.LTXFilePath(c.Path, info.Level, info.MinTXID, info.MaxTXID) if err := sftpClient.Remove(filename); err != nil && !os.IsNotExist(err) { - return fmt.Errorf("cannot delete ltx file %q: %w", filename, err) + return fmt.Errorf("sftp: cannot delete ltx file %q: %w", filename, err) } internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc() } @@ -286,7 +286,7 @@ func (c *ReplicaClient) Cleanup(ctx context.Context) (err error) { } if err := sftpClient.RemoveDirectory(c.Path); err != nil && !os.IsNotExist(err) { - return fmt.Errorf("cannot delete path: %w", err) + return fmt.Errorf("sftp: cannot delete path: %w", err) } return nil }