mirror of
https://github.com/benbjohnson/litestream.git
synced 2026-01-25 05:06:30 +00:00
feat: add backward compatibility for v0.3.x backup restores (#1034)
Add backward compatibility to restore databases from Litestream v0.3.x format backups. This allows users with existing v0.3.x backups to restore them using newer versions of Litestream. Changes: - Add legacy types (LegacyPos, LegacySnapshotInfo, LegacyWALSegmentInfo) and path helpers for v0.3.x format in litestream.go - Add LegacyDetector interface in replica_client.go for detecting and reading legacy format backups - Add restoreLegacy() function in replica.go with auto-detection in Restore() to delegate to legacy format when detected - Implement LegacyDetector interface in file/replica_client.go - Implement LegacyDetector interface in s3/replica_client.go - Add LZ4 decompression helper in internal/internal.go Legacy restore algorithm: 1. Detect v0.3.x format via generations/ directory 2. Find latest snapshot before target timestamp 3. Download & decompress LZ4 snapshot to temp file 4. Download WAL segments in parallel 5. Apply each WAL using SQLite's PRAGMA wal_checkpoint(TRUNCATE) 6. Rename temp file to final path Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -9,6 +9,7 @@ import (
|
||||
"net/url"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/superfly/ltx"
|
||||
@@ -251,3 +252,143 @@ func (c *ReplicaClient) DeleteAll(ctx context.Context) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Ensure ReplicaClient implements the LegacyDetector interface.
|
||||
var _ litestream.LegacyDetector = (*ReplicaClient)(nil)
|
||||
|
||||
// IsLegacyFormat returns true if the replica contains v0.3.x format backups.
|
||||
func (c *ReplicaClient) IsLegacyFormat(ctx context.Context) (bool, error) {
|
||||
generationsPath := filepath.Join(c.path, litestream.LegacyGenerationsDir)
|
||||
fi, err := os.Stat(generationsPath)
|
||||
if os.IsNotExist(err) {
|
||||
return false, nil
|
||||
} else if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return fi.IsDir(), nil
|
||||
}
|
||||
|
||||
// LegacyGenerations returns a list of generation IDs found in the replica.
|
||||
func (c *ReplicaClient) LegacyGenerations(ctx context.Context) ([]string, error) {
|
||||
generationsPath := filepath.Join(c.path, litestream.LegacyGenerationsDir)
|
||||
f, err := os.Open(generationsPath)
|
||||
if os.IsNotExist(err) {
|
||||
return nil, nil
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
fis, err := f.Readdir(-1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var generations []string
|
||||
for _, fi := range fis {
|
||||
if !fi.IsDir() {
|
||||
continue
|
||||
}
|
||||
if !litestream.IsValidGenerationID(fi.Name()) {
|
||||
continue
|
||||
}
|
||||
generations = append(generations, fi.Name())
|
||||
}
|
||||
|
||||
sort.Strings(generations)
|
||||
return generations, nil
|
||||
}
|
||||
|
||||
// LegacySnapshots returns a list of snapshots for a given generation.
|
||||
func (c *ReplicaClient) LegacySnapshots(ctx context.Context, generation string) ([]litestream.LegacySnapshotInfo, error) {
|
||||
snapshotDir := filepath.FromSlash(litestream.LegacySnapshotDir(c.path, generation))
|
||||
f, err := os.Open(snapshotDir)
|
||||
if os.IsNotExist(err) {
|
||||
return nil, nil
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
fis, err := f.Readdir(-1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var infos []litestream.LegacySnapshotInfo
|
||||
for _, fi := range fis {
|
||||
index, err := litestream.ParseLegacySnapshotFilename(fi.Name())
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
infos = append(infos, litestream.LegacySnapshotInfo{
|
||||
Generation: generation,
|
||||
Index: index,
|
||||
Size: fi.Size(),
|
||||
CreatedAt: fi.ModTime().UTC(),
|
||||
})
|
||||
}
|
||||
|
||||
// Sort by index ascending
|
||||
sort.Slice(infos, func(i, j int) bool {
|
||||
return infos[i].Index < infos[j].Index
|
||||
})
|
||||
|
||||
return infos, nil
|
||||
}
|
||||
|
||||
// LegacyWALSegments returns a list of WAL segments for a given generation.
|
||||
func (c *ReplicaClient) LegacyWALSegments(ctx context.Context, generation string) ([]litestream.LegacyWALSegmentInfo, error) {
|
||||
walDir := filepath.FromSlash(litestream.LegacyWALDirPath(c.path, generation))
|
||||
f, err := os.Open(walDir)
|
||||
if os.IsNotExist(err) {
|
||||
return nil, nil
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
fis, err := f.Readdir(-1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var infos []litestream.LegacyWALSegmentInfo
|
||||
for _, fi := range fis {
|
||||
index, offset, err := litestream.ParseLegacyWALSegmentFilename(fi.Name())
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
infos = append(infos, litestream.LegacyWALSegmentInfo{
|
||||
Generation: generation,
|
||||
Index: index,
|
||||
Offset: offset,
|
||||
Size: fi.Size(),
|
||||
CreatedAt: fi.ModTime().UTC(),
|
||||
})
|
||||
}
|
||||
|
||||
// Sort by index and offset ascending
|
||||
sort.Slice(infos, func(i, j int) bool {
|
||||
if infos[i].Index != infos[j].Index {
|
||||
return infos[i].Index < infos[j].Index
|
||||
}
|
||||
return infos[i].Offset < infos[j].Offset
|
||||
})
|
||||
|
||||
return infos, nil
|
||||
}
|
||||
|
||||
// OpenLegacySnapshot opens a reader for a legacy snapshot file.
|
||||
func (c *ReplicaClient) OpenLegacySnapshot(ctx context.Context, generation string, index int) (io.ReadCloser, error) {
|
||||
path := filepath.FromSlash(litestream.LegacySnapshotPath(c.path, generation, index))
|
||||
return os.Open(path)
|
||||
}
|
||||
|
||||
// OpenLegacyWALSegment opens a reader for a legacy WAL segment file.
|
||||
func (c *ReplicaClient) OpenLegacyWALSegment(ctx context.Context, generation string, index int, offset int64) (io.ReadCloser, error) {
|
||||
path := filepath.FromSlash(litestream.LegacyWALSegmentPath(c.path, generation, index, offset))
|
||||
return os.Open(path)
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"os"
|
||||
"syscall"
|
||||
|
||||
"github.com/pierrec/lz4/v4"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
)
|
||||
@@ -60,6 +61,32 @@ func (r *ReadCounter) Read(p []byte) (int, error) {
|
||||
// N returns the total number of bytes read.
|
||||
func (r *ReadCounter) N() int64 { return r.n }
|
||||
|
||||
// NewLZ4Reader returns an io.ReadCloser that decompresses LZ4 data from r.
|
||||
// The returned reader must be closed after use.
|
||||
func NewLZ4Reader(r io.Reader) io.ReadCloser {
|
||||
return &lz4ReadCloser{
|
||||
reader: lz4.NewReader(r),
|
||||
source: r,
|
||||
}
|
||||
}
|
||||
|
||||
type lz4ReadCloser struct {
|
||||
reader *lz4.Reader
|
||||
source io.Reader
|
||||
}
|
||||
|
||||
func (r *lz4ReadCloser) Read(p []byte) (int, error) {
|
||||
return r.reader.Read(p)
|
||||
}
|
||||
|
||||
func (r *lz4ReadCloser) Close() error {
|
||||
// Close source if it implements io.Closer
|
||||
if closer, ok := r.source.(io.Closer); ok {
|
||||
return closer.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// CreateFile creates the file and matches the mode & uid/gid of fi.
|
||||
func CreateFile(filename string, fi os.FileInfo) (*os.File, error) {
|
||||
mode := os.FileMode(0600)
|
||||
|
||||
128
litestream.go
128
litestream.go
@@ -4,12 +4,14 @@ import (
|
||||
"database/sql"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/superfly/ltx"
|
||||
_ "modernc.org/sqlite"
|
||||
@@ -187,3 +189,129 @@ func assert(condition bool, message string) {
|
||||
panic("assertion failed: " + message)
|
||||
}
|
||||
}
|
||||
|
||||
// Legacy v0.3.x format types and helpers.
|
||||
// These are used for backward compatibility when restoring from v0.3.x backups.
|
||||
|
||||
// LegacyPos represents a position in a v0.3.x replica.
|
||||
// In v0.3.x, positions were tracked by generation, index, and offset within the WAL.
|
||||
type LegacyPos struct {
|
||||
Generation string // 16-character hex string
|
||||
Index int // WAL index
|
||||
Offset int64 // Offset within the WAL segment
|
||||
}
|
||||
|
||||
// IsZero returns true if the position is empty.
|
||||
func (p LegacyPos) IsZero() bool {
|
||||
return p.Generation == "" && p.Index == 0 && p.Offset == 0
|
||||
}
|
||||
|
||||
// LegacySnapshotInfo represents metadata for a v0.3.x snapshot file.
|
||||
type LegacySnapshotInfo struct {
|
||||
Generation string // Generation ID (16-char hex)
|
||||
Index int // Snapshot index
|
||||
Size int64 // File size in bytes
|
||||
CreatedAt time.Time // File creation time
|
||||
}
|
||||
|
||||
// LegacyWALSegmentInfo represents metadata for a v0.3.x WAL segment file.
|
||||
type LegacyWALSegmentInfo struct {
|
||||
Generation string // Generation ID (16-char hex)
|
||||
Index int // WAL index
|
||||
Offset int64 // Offset within this segment
|
||||
Size int64 // File size in bytes
|
||||
CreatedAt time.Time // File creation time
|
||||
}
|
||||
|
||||
// Legacy path constants
|
||||
const (
|
||||
LegacyGenerationsDir = "generations"
|
||||
LegacySnapshotsDir = "snapshots"
|
||||
LegacyWALDir = "wal"
|
||||
)
|
||||
|
||||
// LegacyGenerationPath returns the path to a generation directory.
|
||||
func LegacyGenerationPath(root, generation string) string {
|
||||
return path.Join(root, LegacyGenerationsDir, generation)
|
||||
}
|
||||
|
||||
// LegacySnapshotDir returns the path to a generation's snapshot directory.
|
||||
func LegacySnapshotDir(root, generation string) string {
|
||||
return path.Join(LegacyGenerationPath(root, generation), LegacySnapshotsDir)
|
||||
}
|
||||
|
||||
// LegacyWALDir returns the path to a generation's WAL directory.
|
||||
func LegacyWALDirPath(root, generation string) string {
|
||||
return path.Join(LegacyGenerationPath(root, generation), LegacyWALDir)
|
||||
}
|
||||
|
||||
// LegacySnapshotPath returns the path to a specific snapshot file.
|
||||
func LegacySnapshotPath(root, generation string, index int) string {
|
||||
return path.Join(LegacySnapshotDir(root, generation), FormatLegacySnapshotFilename(index))
|
||||
}
|
||||
|
||||
// LegacyWALSegmentPath returns the path to a specific WAL segment file.
|
||||
func LegacyWALSegmentPath(root, generation string, index int, offset int64) string {
|
||||
return path.Join(LegacyWALDirPath(root, generation), FormatLegacyWALSegmentFilename(index, offset))
|
||||
}
|
||||
|
||||
// FormatLegacySnapshotFilename returns the filename for a legacy snapshot.
|
||||
// Format: <index>.snapshot.lz4
|
||||
func FormatLegacySnapshotFilename(index int) string {
|
||||
return strconv.Itoa(index) + ".snapshot.lz4"
|
||||
}
|
||||
|
||||
// FormatLegacyWALSegmentFilename returns the filename for a legacy WAL segment.
|
||||
// Format: <index>-<offset>.wal.lz4
|
||||
func FormatLegacyWALSegmentFilename(index int, offset int64) string {
|
||||
return strconv.Itoa(index) + "-" + strconv.FormatInt(offset, 10) + ".wal.lz4"
|
||||
}
|
||||
|
||||
// ParseLegacySnapshotFilename parses a legacy snapshot filename.
|
||||
// Returns the index or an error if the filename is invalid.
|
||||
func ParseLegacySnapshotFilename(name string) (index int, err error) {
|
||||
if !strings.HasSuffix(name, ".snapshot.lz4") {
|
||||
return 0, errors.New("invalid legacy snapshot filename")
|
||||
}
|
||||
name = strings.TrimSuffix(name, ".snapshot.lz4")
|
||||
return strconv.Atoi(name)
|
||||
}
|
||||
|
||||
// ParseLegacyWALSegmentFilename parses a legacy WAL segment filename.
|
||||
// Returns the index and offset or an error if the filename is invalid.
|
||||
func ParseLegacyWALSegmentFilename(name string) (index int, offset int64, err error) {
|
||||
if !strings.HasSuffix(name, ".wal.lz4") {
|
||||
return 0, 0, errors.New("invalid legacy WAL segment filename")
|
||||
}
|
||||
name = strings.TrimSuffix(name, ".wal.lz4")
|
||||
|
||||
parts := strings.Split(name, "-")
|
||||
if len(parts) != 2 {
|
||||
return 0, 0, errors.New("invalid legacy WAL segment filename format")
|
||||
}
|
||||
|
||||
index, err = strconv.Atoi(parts[0])
|
||||
if err != nil {
|
||||
return 0, 0, fmt.Errorf("invalid index: %w", err)
|
||||
}
|
||||
|
||||
offset, err = strconv.ParseInt(parts[1], 10, 64)
|
||||
if err != nil {
|
||||
return 0, 0, fmt.Errorf("invalid offset: %w", err)
|
||||
}
|
||||
|
||||
return index, offset, nil
|
||||
}
|
||||
|
||||
// IsValidGenerationID returns true if s is a valid 16-character hex generation ID.
|
||||
func IsValidGenerationID(s string) bool {
|
||||
if len(s) != 16 {
|
||||
return false
|
||||
}
|
||||
for _, c := range s {
|
||||
if !((c >= '0' && c <= '9') || (c >= 'a' && c <= 'f')) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
285
replica.go
285
replica.go
@@ -2,6 +2,7 @@ package litestream
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
@@ -505,6 +506,18 @@ func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
// Check if this is a legacy v0.3.x format backup.
|
||||
if detector, ok := r.Client.(LegacyDetector); ok {
|
||||
isLegacy, err := detector.IsLegacyFormat(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("check legacy format: %w", err)
|
||||
}
|
||||
if isLegacy {
|
||||
r.Logger().Info("detected v0.3.x legacy backup format, using legacy restore")
|
||||
return r.restoreLegacy(ctx, detector, opt)
|
||||
}
|
||||
}
|
||||
|
||||
infos, err := CalcRestorePlan(ctx, r.Client, opt.TXID, opt.Timestamp, r.Logger())
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot calc restore plan: %w", err)
|
||||
@@ -670,3 +683,275 @@ func CalcRestorePlan(ctx context.Context, client ReplicaClient, txID ltx.TXID, t
|
||||
|
||||
return infos, nil
|
||||
}
|
||||
|
||||
// restoreLegacy restores a database from a v0.3.x format backup.
|
||||
// This handles the legacy format with generations/, snapshots/, and wal/ directories.
|
||||
func (r *Replica) restoreLegacy(ctx context.Context, detector LegacyDetector, opt RestoreOptions) error {
|
||||
// Find the latest generation with snapshots.
|
||||
generations, err := detector.LegacyGenerations(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("list generations: %w", err)
|
||||
}
|
||||
if len(generations) == 0 {
|
||||
return fmt.Errorf("no generations found in legacy backup")
|
||||
}
|
||||
|
||||
// Find the best generation and snapshot to restore from.
|
||||
var bestGen string
|
||||
var bestSnapshot *LegacySnapshotInfo
|
||||
|
||||
for i := len(generations) - 1; i >= 0; i-- {
|
||||
gen := generations[i]
|
||||
snapshots, err := detector.LegacySnapshots(ctx, gen)
|
||||
if err != nil {
|
||||
r.Logger().Warn("failed to list snapshots for generation", "generation", gen, "error", err)
|
||||
continue
|
||||
}
|
||||
if len(snapshots) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// Find the latest snapshot before the target timestamp (if specified).
|
||||
for j := len(snapshots) - 1; j >= 0; j-- {
|
||||
snap := snapshots[j]
|
||||
if !opt.Timestamp.IsZero() && snap.CreatedAt.After(opt.Timestamp) {
|
||||
continue
|
||||
}
|
||||
bestGen = gen
|
||||
bestSnapshot = &snap
|
||||
break
|
||||
}
|
||||
|
||||
if bestSnapshot != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if bestSnapshot == nil {
|
||||
return fmt.Errorf("no suitable snapshot found for restore")
|
||||
}
|
||||
|
||||
r.Logger().Info("restoring from legacy backup",
|
||||
"generation", bestGen,
|
||||
"snapshot_index", bestSnapshot.Index,
|
||||
"snapshot_time", bestSnapshot.CreatedAt)
|
||||
|
||||
// Create parent directory if it doesn't exist.
|
||||
var dirInfo os.FileInfo
|
||||
if db := r.DB(); db != nil {
|
||||
dirInfo = db.dirInfo
|
||||
}
|
||||
if err := internal.MkdirAll(filepath.Dir(opt.OutputPath), dirInfo); err != nil {
|
||||
return fmt.Errorf("create parent directory: %w", err)
|
||||
}
|
||||
|
||||
// Create temp file for the restored database.
|
||||
tmpOutputPath := opt.OutputPath + ".tmp"
|
||||
defer os.Remove(tmpOutputPath)
|
||||
|
||||
// Download and decompress the snapshot.
|
||||
r.Logger().Debug("downloading legacy snapshot", "index", bestSnapshot.Index)
|
||||
if err := r.downloadLegacySnapshot(ctx, detector, bestGen, bestSnapshot.Index, tmpOutputPath); err != nil {
|
||||
return fmt.Errorf("download snapshot: %w", err)
|
||||
}
|
||||
|
||||
// Get WAL segments after the snapshot.
|
||||
walSegments, err := detector.LegacyWALSegments(ctx, bestGen)
|
||||
if err != nil {
|
||||
return fmt.Errorf("list wal segments: %w", err)
|
||||
}
|
||||
|
||||
// Filter WAL segments to only include those after the snapshot.
|
||||
var segmentsToApply []LegacyWALSegmentInfo
|
||||
for _, seg := range walSegments {
|
||||
if seg.Index < bestSnapshot.Index {
|
||||
continue
|
||||
}
|
||||
if !opt.Timestamp.IsZero() && seg.CreatedAt.After(opt.Timestamp) {
|
||||
continue
|
||||
}
|
||||
segmentsToApply = append(segmentsToApply, seg)
|
||||
}
|
||||
|
||||
if len(segmentsToApply) > 0 {
|
||||
r.Logger().Info("applying WAL segments", "count", len(segmentsToApply))
|
||||
if err := r.applyLegacyWALSegments(ctx, detector, bestGen, segmentsToApply, tmpOutputPath, opt.Parallelism); err != nil {
|
||||
return fmt.Errorf("apply wal segments: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Rename temp file to final path.
|
||||
r.Logger().Debug("renaming database from temporary location")
|
||||
return os.Rename(tmpOutputPath, opt.OutputPath)
|
||||
}
|
||||
|
||||
// downloadLegacySnapshot downloads and decompresses a legacy snapshot to the output path.
|
||||
func (r *Replica) downloadLegacySnapshot(ctx context.Context, detector LegacyDetector, generation string, index int, outputPath string) error {
|
||||
rc, err := detector.OpenLegacySnapshot(ctx, generation, index)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer rc.Close()
|
||||
|
||||
// Create output file.
|
||||
f, err := os.Create(outputPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
// Decompress LZ4 and write to file.
|
||||
lz4Reader := internal.NewLZ4Reader(rc)
|
||||
defer lz4Reader.Close()
|
||||
|
||||
if _, err := io.Copy(f, lz4Reader); err != nil {
|
||||
return fmt.Errorf("decompress snapshot: %w", err)
|
||||
}
|
||||
|
||||
if err := f.Sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return f.Close()
|
||||
}
|
||||
|
||||
// applyLegacyWALSegments downloads and applies WAL segments to the database.
|
||||
func (r *Replica) applyLegacyWALSegments(ctx context.Context, detector LegacyDetector, generation string, segments []LegacyWALSegmentInfo, dbPath string, parallelism int) error {
|
||||
if parallelism <= 0 {
|
||||
parallelism = DefaultRestoreParallelism
|
||||
}
|
||||
|
||||
// Create temp directory for WAL segments.
|
||||
tmpDir, err := os.MkdirTemp("", "litestream-legacy-wal-*")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
// Download WAL segments in parallel.
|
||||
type downloadResult struct {
|
||||
index int
|
||||
path string
|
||||
err error
|
||||
}
|
||||
|
||||
resultCh := make(chan downloadResult, len(segments))
|
||||
sem := make(chan struct{}, parallelism)
|
||||
|
||||
for i, seg := range segments {
|
||||
seg := seg
|
||||
i := i
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case sem <- struct{}{}:
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer func() { <-sem }()
|
||||
|
||||
walPath := filepath.Join(tmpDir, fmt.Sprintf("%08d-%016x.wal", seg.Index, seg.Offset))
|
||||
err := r.downloadLegacyWALSegment(ctx, detector, generation, seg.Index, seg.Offset, walPath)
|
||||
resultCh <- downloadResult{index: i, path: walPath, err: err}
|
||||
}()
|
||||
}
|
||||
|
||||
// Collect results.
|
||||
walPaths := make([]string, len(segments))
|
||||
for range segments {
|
||||
result := <-resultCh
|
||||
if result.err != nil {
|
||||
return fmt.Errorf("download wal segment %d: %w", result.index, result.err)
|
||||
}
|
||||
walPaths[result.index] = result.path
|
||||
}
|
||||
|
||||
// Apply WAL segments sequentially using SQLite.
|
||||
for i, walPath := range walPaths {
|
||||
if err := r.applyLegacyWAL(ctx, dbPath, walPath); err != nil {
|
||||
return fmt.Errorf("apply wal segment %d: %w", i, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// downloadLegacyWALSegment downloads and decompresses a legacy WAL segment.
|
||||
func (r *Replica) downloadLegacyWALSegment(ctx context.Context, detector LegacyDetector, generation string, index int, offset int64, outputPath string) error {
|
||||
rc, err := detector.OpenLegacyWALSegment(ctx, generation, index, offset)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer rc.Close()
|
||||
|
||||
f, err := os.Create(outputPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
// Decompress LZ4 and write to file.
|
||||
lz4Reader := internal.NewLZ4Reader(rc)
|
||||
defer lz4Reader.Close()
|
||||
|
||||
if _, err := io.Copy(f, lz4Reader); err != nil {
|
||||
return fmt.Errorf("decompress wal segment: %w", err)
|
||||
}
|
||||
|
||||
if err := f.Sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return f.Close()
|
||||
}
|
||||
|
||||
// applyLegacyWAL applies a single WAL file to the database using SQLite's recovery mechanism.
|
||||
func (r *Replica) applyLegacyWAL(ctx context.Context, dbPath, walPath string) error {
|
||||
// In v0.3.x, WAL files were applied by:
|
||||
// 1. Copying the WAL file to the database's WAL location
|
||||
// 2. Opening the database which triggers SQLite to replay the WAL
|
||||
// 3. Checkpointing to apply WAL to main database file
|
||||
|
||||
// Copy WAL file to expected location.
|
||||
dbWALPath := dbPath + "-wal"
|
||||
if err := copyFile(walPath, dbWALPath); err != nil {
|
||||
return fmt.Errorf("copy wal file: %w", err)
|
||||
}
|
||||
defer os.Remove(dbWALPath)
|
||||
|
||||
// Open the database which will trigger WAL recovery.
|
||||
db, err := sql.Open("sqlite", dbPath+"?_journal_mode=WAL")
|
||||
if err != nil {
|
||||
return fmt.Errorf("open database: %w", err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
// Force a checkpoint to apply WAL changes to main database.
|
||||
if _, err := db.ExecContext(ctx, "PRAGMA wal_checkpoint(TRUNCATE)"); err != nil {
|
||||
return fmt.Errorf("checkpoint: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// copyFile copies a file from src to dst.
|
||||
func copyFile(src, dst string) error {
|
||||
srcFile, err := os.Open(src)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer srcFile.Close()
|
||||
|
||||
dstFile, err := os.Create(dst)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer dstFile.Close()
|
||||
|
||||
if _, err := io.Copy(dstFile, srcFile); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return dstFile.Sync()
|
||||
}
|
||||
|
||||
@@ -154,3 +154,31 @@ func FetchPage(ctx context.Context, client ReplicaClient, level int, minTXID, ma
|
||||
}
|
||||
return ltx.DecodePageData(b)
|
||||
}
|
||||
|
||||
// LegacyDetector is an optional interface that ReplicaClient implementations
|
||||
// can implement to support restoring from v0.3.x format backups.
|
||||
type LegacyDetector interface {
|
||||
// IsLegacyFormat returns true if the replica contains v0.3.x format backups.
|
||||
// This is detected by checking for the presence of a "generations/" directory.
|
||||
IsLegacyFormat(ctx context.Context) (bool, error)
|
||||
|
||||
// LegacyGenerations returns a list of generation IDs (16-char hex strings)
|
||||
// found in the replica. Returns empty slice if no generations exist.
|
||||
LegacyGenerations(ctx context.Context) ([]string, error)
|
||||
|
||||
// LegacySnapshots returns a list of snapshots for a given generation.
|
||||
// Results are sorted by index in ascending order.
|
||||
LegacySnapshots(ctx context.Context, generation string) ([]LegacySnapshotInfo, error)
|
||||
|
||||
// LegacyWALSegments returns a list of WAL segments for a given generation.
|
||||
// Results are sorted by index and offset in ascending order.
|
||||
LegacyWALSegments(ctx context.Context, generation string) ([]LegacyWALSegmentInfo, error)
|
||||
|
||||
// OpenLegacySnapshot opens a reader for a legacy snapshot file.
|
||||
// The returned reader contains LZ4-compressed data.
|
||||
OpenLegacySnapshot(ctx context.Context, generation string, index int) (io.ReadCloser, error)
|
||||
|
||||
// OpenLegacyWALSegment opens a reader for a legacy WAL segment file.
|
||||
// The returned reader contains LZ4-compressed data.
|
||||
OpenLegacyWALSegment(ctx context.Context, generation string, index int, offset int64) (io.ReadCloser, error)
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ import (
|
||||
"os"
|
||||
"path"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -1441,3 +1442,225 @@ func parseS3DebugEnv() aws.ClientLogMode {
|
||||
}
|
||||
return logMode
|
||||
}
|
||||
|
||||
// Ensure ReplicaClient implements the LegacyDetector interface.
|
||||
var _ litestream.LegacyDetector = (*ReplicaClient)(nil)
|
||||
|
||||
// IsLegacyFormat returns true if the replica contains v0.3.x format backups.
|
||||
func (c *ReplicaClient) IsLegacyFormat(ctx context.Context) (bool, error) {
|
||||
if err := c.Init(ctx); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// Check for the presence of the generations/ directory by listing objects
|
||||
prefix := c.Path + "/" + litestream.LegacyGenerationsDir + "/"
|
||||
out, err := c.s3.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
|
||||
Bucket: aws.String(c.Bucket),
|
||||
Prefix: aws.String(prefix),
|
||||
MaxKeys: aws.Int32(1),
|
||||
})
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("s3: check legacy format: %w", err)
|
||||
}
|
||||
|
||||
return len(out.Contents) > 0, nil
|
||||
}
|
||||
|
||||
// LegacyGenerations returns a list of generation IDs found in the replica.
|
||||
func (c *ReplicaClient) LegacyGenerations(ctx context.Context) ([]string, error) {
|
||||
if err := c.Init(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
prefix := c.Path + "/" + litestream.LegacyGenerationsDir + "/"
|
||||
|
||||
// Use CommonPrefixes with delimiter to get generation directories
|
||||
paginator := s3.NewListObjectsV2Paginator(c.s3, &s3.ListObjectsV2Input{
|
||||
Bucket: aws.String(c.Bucket),
|
||||
Prefix: aws.String(prefix),
|
||||
Delimiter: aws.String("/"),
|
||||
})
|
||||
|
||||
generationSet := make(map[string]struct{})
|
||||
for paginator.HasMorePages() {
|
||||
page, err := paginator.NextPage(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("s3: list generations: %w", err)
|
||||
}
|
||||
|
||||
for _, commonPrefix := range page.CommonPrefixes {
|
||||
p := aws.ToString(commonPrefix.Prefix)
|
||||
// Extract generation ID from prefix like "path/generations/abc123def456/"
|
||||
p = strings.TrimPrefix(p, prefix)
|
||||
p = strings.TrimSuffix(p, "/")
|
||||
if litestream.IsValidGenerationID(p) {
|
||||
generationSet[p] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
generations := make([]string, 0, len(generationSet))
|
||||
for g := range generationSet {
|
||||
generations = append(generations, g)
|
||||
}
|
||||
sort.Strings(generations)
|
||||
return generations, nil
|
||||
}
|
||||
|
||||
// LegacySnapshots returns a list of snapshots for a given generation.
|
||||
func (c *ReplicaClient) LegacySnapshots(ctx context.Context, generation string) ([]litestream.LegacySnapshotInfo, error) {
|
||||
if err := c.Init(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
prefix := c.Path + "/" + litestream.LegacyGenerationsDir + "/" + generation + "/" + litestream.LegacySnapshotsDir + "/"
|
||||
|
||||
paginator := s3.NewListObjectsV2Paginator(c.s3, &s3.ListObjectsV2Input{
|
||||
Bucket: aws.String(c.Bucket),
|
||||
Prefix: aws.String(prefix),
|
||||
})
|
||||
|
||||
var infos []litestream.LegacySnapshotInfo
|
||||
for paginator.HasMorePages() {
|
||||
page, err := paginator.NextPage(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("s3: list snapshots: %w", err)
|
||||
}
|
||||
|
||||
for _, obj := range page.Contents {
|
||||
key := aws.ToString(obj.Key)
|
||||
filename := path.Base(key)
|
||||
|
||||
index, err := litestream.ParseLegacySnapshotFilename(filename)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
infos = append(infos, litestream.LegacySnapshotInfo{
|
||||
Generation: generation,
|
||||
Index: index,
|
||||
Size: aws.ToInt64(obj.Size),
|
||||
CreatedAt: aws.ToTime(obj.LastModified).UTC(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Sort by index ascending
|
||||
sort.Slice(infos, func(i, j int) bool {
|
||||
return infos[i].Index < infos[j].Index
|
||||
})
|
||||
|
||||
return infos, nil
|
||||
}
|
||||
|
||||
// LegacyWALSegments returns a list of WAL segments for a given generation.
|
||||
func (c *ReplicaClient) LegacyWALSegments(ctx context.Context, generation string) ([]litestream.LegacyWALSegmentInfo, error) {
|
||||
if err := c.Init(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
prefix := c.Path + "/" + litestream.LegacyGenerationsDir + "/" + generation + "/" + litestream.LegacyWALDir + "/"
|
||||
|
||||
paginator := s3.NewListObjectsV2Paginator(c.s3, &s3.ListObjectsV2Input{
|
||||
Bucket: aws.String(c.Bucket),
|
||||
Prefix: aws.String(prefix),
|
||||
})
|
||||
|
||||
var infos []litestream.LegacyWALSegmentInfo
|
||||
for paginator.HasMorePages() {
|
||||
page, err := paginator.NextPage(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("s3: list wal segments: %w", err)
|
||||
}
|
||||
|
||||
for _, obj := range page.Contents {
|
||||
key := aws.ToString(obj.Key)
|
||||
filename := path.Base(key)
|
||||
|
||||
index, offset, err := litestream.ParseLegacyWALSegmentFilename(filename)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
infos = append(infos, litestream.LegacyWALSegmentInfo{
|
||||
Generation: generation,
|
||||
Index: index,
|
||||
Offset: offset,
|
||||
Size: aws.ToInt64(obj.Size),
|
||||
CreatedAt: aws.ToTime(obj.LastModified).UTC(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Sort by index and offset ascending
|
||||
sort.Slice(infos, func(i, j int) bool {
|
||||
if infos[i].Index != infos[j].Index {
|
||||
return infos[i].Index < infos[j].Index
|
||||
}
|
||||
return infos[i].Offset < infos[j].Offset
|
||||
})
|
||||
|
||||
return infos, nil
|
||||
}
|
||||
|
||||
// OpenLegacySnapshot opens a reader for a legacy snapshot file.
|
||||
func (c *ReplicaClient) OpenLegacySnapshot(ctx context.Context, generation string, index int) (io.ReadCloser, error) {
|
||||
if err := c.Init(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
key := c.Path + "/" + litestream.LegacyGenerationsDir + "/" + generation + "/" +
|
||||
litestream.LegacySnapshotsDir + "/" + litestream.FormatLegacySnapshotFilename(index)
|
||||
|
||||
input := &s3.GetObjectInput{
|
||||
Bucket: aws.String(c.Bucket),
|
||||
Key: aws.String(key),
|
||||
}
|
||||
|
||||
// Add SSE-C parameters if configured
|
||||
if c.SSECustomerKey != "" {
|
||||
input.SSECustomerAlgorithm = aws.String(c.SSECustomerAlgorithm)
|
||||
input.SSECustomerKey = aws.String(c.SSECustomerKey)
|
||||
input.SSECustomerKeyMD5 = aws.String(c.SSECustomerKeyMD5)
|
||||
}
|
||||
|
||||
out, err := c.s3.GetObject(ctx, input)
|
||||
if err != nil {
|
||||
if isNotExists(err) {
|
||||
return nil, os.ErrNotExist
|
||||
}
|
||||
return nil, fmt.Errorf("s3: get legacy snapshot %s: %w", key, err)
|
||||
}
|
||||
return out.Body, nil
|
||||
}
|
||||
|
||||
// OpenLegacyWALSegment opens a reader for a legacy WAL segment file.
|
||||
func (c *ReplicaClient) OpenLegacyWALSegment(ctx context.Context, generation string, index int, offset int64) (io.ReadCloser, error) {
|
||||
if err := c.Init(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
key := c.Path + "/" + litestream.LegacyGenerationsDir + "/" + generation + "/" +
|
||||
litestream.LegacyWALDir + "/" + litestream.FormatLegacyWALSegmentFilename(index, offset)
|
||||
|
||||
input := &s3.GetObjectInput{
|
||||
Bucket: aws.String(c.Bucket),
|
||||
Key: aws.String(key),
|
||||
}
|
||||
|
||||
// Add SSE-C parameters if configured
|
||||
if c.SSECustomerKey != "" {
|
||||
input.SSECustomerAlgorithm = aws.String(c.SSECustomerAlgorithm)
|
||||
input.SSECustomerKey = aws.String(c.SSECustomerKey)
|
||||
input.SSECustomerKeyMD5 = aws.String(c.SSECustomerKeyMD5)
|
||||
}
|
||||
|
||||
out, err := c.s3.GetObject(ctx, input)
|
||||
if err != nil {
|
||||
if isNotExists(err) {
|
||||
return nil, os.ErrNotExist
|
||||
}
|
||||
return nil, fmt.Errorf("s3: get legacy wal segment %s: %w", key, err)
|
||||
}
|
||||
return out.Body, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user