Start Store-Sink testing

This commit is contained in:
Philip O'Toole
2026-01-11 22:39:04 -05:00
parent f1deb2c3f5
commit f970939036
12 changed files with 399 additions and 32 deletions

View File

@@ -140,10 +140,12 @@ func (s *Sink) Cancel() error {
return nil
}
s.opened = false
if err := s.sinkW.Close(); err != nil {
return err
if s.sinkW != nil {
if err := s.sinkW.Close(); err != nil {
return err
}
s.sinkW = nil
}
s.sinkW = nil
return os.RemoveAll(s.snapTmpDirPath)
}

View File

@@ -13,8 +13,8 @@ func Test_DB_WAL_Sinks(t *testing.T) {
"DBSink",
func(dir string, m *proto.SnapshotDBFile) sinker {
return NewDBSink(dir, m)
}, "testdata/test.db", func() *proto.SnapshotDBFile {
return mustCreateSnapshotDBFileFromFile(t, "testdata/test.db", true)
}, "testdata/db-and-wals/full2.db", func() *proto.SnapshotDBFile {
return mustCreateSnapshotDBFileFromFile(t, "testdata/db-and-wals/full2.db", true)
})
test_SnapshotSinkDBTests(
@@ -22,15 +22,15 @@ func Test_DB_WAL_Sinks(t *testing.T) {
"WALSink",
func(dir string, m *proto.SnapshotWALFile) sinker {
return NewWALSink(dir, m)
}, "testdata/wal", func() *proto.SnapshotWALFile {
return mustCreateSnapshotWALFileFromFile(t, "testdata/wal", true)
}, "testdata/db-and-wals/wal-00", func() *proto.SnapshotWALFile {
return mustCreateSnapshotWALFileFromFile(t, "testdata/db-and-wals/wal-00", true)
})
}
func Test_NewDBSink(t *testing.T) {
dir := t.TempDir()
m := mustCreateSnapshotDBFileFromFile(t, "testdata/test.db", false)
m := mustCreateSnapshotDBFileFromFile(t, "testdata/db-and-wals/full2.db", false)
sink := NewDBSink(dir, m)
if sink == nil {
t.Fatalf("expected non-nil sink")
@@ -40,7 +40,7 @@ func Test_NewDBSink(t *testing.T) {
func Test_NewWALSink(t *testing.T) {
dir := t.TempDir()
m := mustCreateSnapshotWALFileFromFile(t, "testdata/wal", false)
m := mustCreateSnapshotWALFileFromFile(t, "testdata/db-and-wals/wal-00", false)
sink := NewWALSink(dir, m)
if sink == nil {
t.Fatalf("expected non-nil sink")

View File

@@ -2,6 +2,7 @@ package snapshot2
import (
"encoding/json"
"expvar"
"fmt"
"io"
"log"
@@ -15,10 +16,48 @@ import (
)
const (
metaFileName = "meta.json"
tmpSuffix = ".tmp"
metaFileName = "meta.json"
tmpSuffix = ".tmp"
fullNeededFile = "FULL_NEEDED"
)
const (
persistSize = "latest_persist_size"
persistDuration = "latest_persist_duration"
upgradeOk = "upgrade_ok"
upgradeFail = "upgrade_fail"
snapshotsReaped = "snapshots_reaped"
snapshotsReapedFail = "snapshots_reaped_failed"
snapshotCreateMRSWFail = "snapshot_create_mrsw_fail"
snapshotOpenMRSWFail = "snapshot_open_mrsw_fail"
)
var (
// ErrSnapshotNotFound is returned when a snapshot cannot be found.
ErrSnapshotNotFound = fmt.Errorf("snapshot not found")
)
// stats captures stats for the Store.
var stats *expvar.Map
func init() {
stats = expvar.NewMap("snapshot")
ResetStats()
}
// ResetStats resets the expvar stats for this module. Mostly for test purposes.
func ResetStats() {
stats.Init()
stats.Add(persistSize, 0)
stats.Add(persistDuration, 0)
stats.Add(upgradeOk, 0)
stats.Add(upgradeFail, 0)
stats.Add(snapshotsReaped, 0)
stats.Add(snapshotsReapedFail, 0)
stats.Add(snapshotCreateMRSWFail, 0)
stats.Add(snapshotOpenMRSWFail, 0)
}
type SnapshotMetaType int
// SnapshotMetaType is an enum
@@ -37,8 +76,12 @@ type SnapshotMeta struct {
// Store stores snapshots in the Raft system.
type Store struct {
dir string
logger *log.Logger
dir string
fullNeededPath string
logger *log.Logger
reapDisabled bool
LogReaping bool
}
// NewStore creates a new store.
@@ -48,8 +91,9 @@ func NewStore(dir string) (*Store, error) {
}
str := &Store{
dir: dir,
logger: log.New(os.Stderr, "[snapshot-store] ", log.LstdFlags),
dir: dir,
fullNeededPath: filepath.Join(dir, fullNeededFile),
logger: log.New(os.Stderr, "[snapshot-store] ", log.LstdFlags),
}
str.logger.Printf("store initialized using %s", dir)
@@ -92,9 +136,124 @@ func (s *Store) List() ([]*raft.SnapshotMeta, error) {
return snapMetaSlice(ms).RaftMetaSlice(), nil
}
// Len returns the number of snapshots in the Store.
func (s *Store) Len() int {
snapshots, err := s.getSnapshots()
if err != nil {
return 0
}
return len(snapshots)
}
// LatestIndexTerm returns the index and term of the most recent
// snapshot in the Store.
func (s *Store) LatestIndexTerm() (uint64, uint64, error) {
snapshots, err := s.getSnapshots()
if err != nil {
return 0, 0, err
}
if len(snapshots) == 0 {
return 0, 0, nil
}
latest := snapshots[len(snapshots)-1]
return latest.Index, latest.Term, nil
}
// Dir returns the directory where the snapshots are stored.
func (s *Store) Dir() string {
return s.dir
}
// Open opens the snapshot with the given ID for reading.
func (s *Store) Open(id string) (*raft.SnapshotMeta, io.ReadCloser, error) {
return nil, nil, nil
if !dirExists(filepath.Join(s.dir, id)) {
return nil, nil, ErrSnapshotNotFound
}
meta, err := readMeta(filepath.Join(s.dir, id))
if err != nil {
return nil, nil, err
}
fd, err := os.Open(filepath.Join(s.dir, id+".db"))
if err != nil {
return nil, nil, err
}
return meta.SnapshotMeta, fd, nil
}
// Reap reaps all snapshots, except the most recent one. Returns the number of
// snapshots reaped.
func (s *Store) Reap() (retN int, retErr error) {
defer func() {
if retErr != nil {
stats.Add(snapshotsReapedFail, 1)
} else {
stats.Add(snapshotsReaped, int64(retN))
}
}()
if s.reapDisabled {
return 0, nil
}
snapshots, err := s.getSnapshots()
if err != nil {
return 0, err
}
if len(snapshots) <= 1 {
return 0, nil
}
// Remove all snapshots, and all associated data, except the newest one.
n := 0
for _, snap := range snapshots[:len(snapshots)-1] {
if err := removeAllPrefix(s.dir, snap.ID); err != nil {
return n, err
}
if s.LogReaping {
s.logger.Printf("reaped snapshot %s", snap.ID)
}
n++
}
return n, nil
}
// FullNeeded returns true if a full snapshot is needed.
func (s *Store) FullNeeded() (bool, error) {
if fileExists(s.fullNeededPath) {
return true, nil
}
snaps, err := s.getSnapshots()
if err != nil {
return false, err
}
return len(snaps) == 0, nil
}
// SetFullNeeded sets the flag that indicates a full snapshot is needed.
// This flag will be cleared when a snapshot is successfully persisted.
func (s *Store) SetFullNeeded() error {
f, err := os.Create(s.fullNeededPath)
if err != nil {
return err
}
return f.Close()
}
// Stats returns stats about the Snapshot Store. This function may return
// an error if the Store is in an inconsistent state. In that case the stats
// returned may be incomplete or invalid.
func (s *Store) Stats() (map[string]any, error) {
snapshots, err := s.getSnapshots()
if err != nil {
return nil, err
}
snapsAsIDs := make([]string, len(snapshots))
for i, snap := range snapshots {
snapsAsIDs[i] = snap.ID
}
return map[string]any{
"dir": s.dir,
"snapshots": snapsAsIDs,
}, nil
}
// check checks the Store for any inconsistencies, and repairs
@@ -104,8 +263,12 @@ func (s *Store) check() error {
return nil
}
func (s *Store) getSnapshots() ([]*SnapshotMeta, error) {
return getSnapshots(s.dir)
}
// getSnapshots returns the list of snapshots in the given directory,
// sorted from newest to oldest.
// sorted from oldest to new.
func getSnapshots(dir string) ([]*SnapshotMeta, error) {
// Get the eligible snapshots
snapshots, err := os.ReadDir(dir)
@@ -147,18 +310,6 @@ func getSnapshots(dir string) ([]*SnapshotMeta, error) {
return snapMeta, nil
}
type cmpSnapshotMeta SnapshotMeta
func (c *cmpSnapshotMeta) Less(other *cmpSnapshotMeta) bool {
if c.Term != other.Term {
return c.Term < other.Term
}
if c.Index != other.Index {
return c.Index < other.Index
}
return c.ID < other.ID
}
type snapMetaSlice []*SnapshotMeta
// Len implements the sort interface for snapMetaSlice.
@@ -168,9 +319,15 @@ func (s snapMetaSlice) Len() int {
// Less implements the sort interface for snapMetaSlice.
func (s snapMetaSlice) Less(i, j int) bool {
si := (*cmpSnapshotMeta)(s[i])
sj := (*cmpSnapshotMeta)(s[j])
return si.Less(sj)
si := s[i]
sj := s[j]
if si.Term != sj.Term {
return si.Term < sj.Term
}
if si.Index != sj.Index {
return si.Index < sj.Index
}
return si.ID < sj.ID
}
// Swap implements the sort interface for snapMetaSlice.

174
snapshot2/store_test.go Normal file
View File

@@ -0,0 +1,174 @@
package snapshot2
import (
"os"
"sort"
"testing"
"github.com/hashicorp/raft"
)
func Test_SnapshotMetaSort(t *testing.T) {
metas := []*SnapshotMeta{
{
SnapshotMeta: &raft.SnapshotMeta{
ID: "2-1017-1704807719996",
Index: 1017,
Term: 2,
},
Filename: "db.sqlite",
Type: SnapshotMetaTypeFull,
},
{
SnapshotMeta: &raft.SnapshotMeta{
ID: "2-1131-1704807720976",
Index: 1131,
Term: 2,
},
Filename: "wal",
Type: SnapshotMetaTypeIncremental,
},
}
sort.Sort(snapMetaSlice(metas))
if metas[0].ID != "2-1017-1704807719996" {
t.Errorf("Expected first snapshot ID to be 2-1017-1704807719996, got %s", metas[0].ID)
}
if metas[1].ID != "2-1131-1704807720976" {
t.Errorf("Expected second snapshot ID to be 2-1131-1704807720976, got %s", metas[1].ID)
}
sort.Sort(sort.Reverse(snapMetaSlice(metas)))
if metas[0].ID != "2-1131-1704807720976" {
t.Errorf("Expected first snapshot ID to be 2-1131-1704807720976, got %s", metas[0].ID)
}
if metas[1].ID != "2-1017-1704807719996" {
t.Errorf("Expected second snapshot ID to be 2-1017-1704807719996, got %s", metas[1].ID)
}
}
func Test_NewStore(t *testing.T) {
dir := t.TempDir()
store, err := NewStore(dir)
if err != nil {
t.Fatalf("Failed to create new store: %v", err)
}
if store.Dir() != dir {
t.Errorf("Expected store directory to be %s, got %s", dir, store.Dir())
}
if store.Len() != 0 {
t.Errorf("Expected store to have 0 snapshots, got %d", store.Len())
}
}
func Test_StoreEmpty(t *testing.T) {
dir := t.TempDir()
store, _ := NewStore(dir)
snaps, err := store.List()
if err != nil {
t.Fatalf("Failed to list snapshots: %v", err)
}
if len(snaps) != 0 {
t.Errorf("Expected no snapshots, got %d", len(snaps))
}
if fn, err := store.FullNeeded(); err != nil {
t.Fatalf("Failed to check if full snapshot needed: %v", err)
} else if !fn {
t.Errorf("Expected full snapshot needed, but it is not")
}
_, _, err = store.Open("nonexistent")
if err != ErrSnapshotNotFound {
t.Fatalf("Expected ErrSnapshotNotFound, got %v", err)
}
n, err := store.Reap()
if err != nil {
t.Fatalf("Failed to reap snapshots from empty store: %v", err)
}
if n != 0 {
t.Errorf("Expected no snapshots reaped, got %d", n)
}
if _, err := store.Stats(); err != nil {
t.Fatalf("Failed to get stats from empty store: %v", err)
}
li, tm, err := store.LatestIndexTerm()
if err != nil {
t.Fatalf("Failed to get latest index and term from empty store: %v", err)
}
if li != 0 {
t.Fatalf("Expected latest index to be 0, got %d", li)
}
if tm != 0 {
t.Fatalf("Expected latest term to be 0, got %d", tm)
}
if store.Len() != 0 {
t.Errorf("Expected store to have 0 snapshots, got %d", store.Len())
}
}
func Test_StoreCreateCancel(t *testing.T) {
dir := t.TempDir()
store, err := NewStore(dir)
if err != nil {
t.Fatalf("Failed to create new store: %v", err)
}
sink, err := store.Create(1, 2, 3, makeTestConfiguration("1", "localhost:1"), 1, nil)
if err != nil {
t.Fatalf("Failed to create sink: %v", err)
}
if sink.ID() == "" {
t.Errorf("Expected sink ID to not be empty, got empty string")
}
tmpSnapDir := dir + "/" + sink.ID() + tmpSuffix
// Should be a tmp directory with the name of the sink ID
if !pathExists(tmpSnapDir) {
t.Errorf("Expected directory with name %s, but it does not exist", sink.ID())
}
// Test writing to the sink
if n, err := sink.Write([]byte("hello")); err != nil {
t.Fatalf("Failed to write to sink: %v", err)
} else if n != 5 {
t.Errorf("Expected 5 bytes written, got %d", n)
}
// Test canceling the sink
if err := sink.Cancel(); err != nil {
t.Fatalf("Failed to cancel sink: %v", err)
}
// Should not be a tmp directory with the name of the sink ID
if pathExists(tmpSnapDir) {
t.Errorf("Expected directory with name %s to not exist, but it does", sink.ID())
}
if store.Len() != 0 {
t.Errorf("Expected store to have 0 snapshots, got %d", store.Len())
}
}
func makeTestConfiguration(i, a string) raft.Configuration {
return raft.Configuration{
Servers: []raft.Server{
{
ID: raft.ServerID(i),
Address: raft.ServerAddress(a),
},
},
}
}
func pathExists(path string) bool {
_, err := os.Stat(path)
return err == nil
}

View File

@@ -0,0 +1,34 @@
import sqlite3
import shutil
import os
# Database file
db_file = 'mydatabase.db'
# Open a connection to SQLite database
conn = sqlite3.connect(db_file)
cursor = conn.cursor()
# Enable WAL mode and disable automatic checkpointing
cursor.execute("PRAGMA journal_mode=WAL;")
cursor.execute("PRAGMA wal_autocheckpoint=0;")
cursor.execute("CREATE TABLE foo (id INTEGER PRIMARY KEY, value TEXT);")
conn.commit()
# Checkpoint the WAL file so we've got just a SQLite file
conn.execute("PRAGMA wal_checkpoint(TRUNCATE);")
shutil.copy(db_file, 'backup.db')
for i in range(0, 4):
# Write a new row
cursor.execute(f"INSERT INTO foo (value) VALUES ('Row {i}');")
conn.commit()
# Copy the newly-created WAL
shutil.copy(db_file + '-wal', f'wal-{i:02}')
# Checkpoint the WAL file
conn.execute("PRAGMA wal_checkpoint(TRUNCATE);")
conn.commit()
conn.close()

View File

BIN
snapshot2/testdata/db-and-wals/wal-00 vendored Normal file

Binary file not shown.

BIN
snapshot2/testdata/db-and-wals/wal-01 vendored Normal file

Binary file not shown.

BIN
snapshot2/testdata/db-and-wals/wal-02 vendored Normal file

Binary file not shown.

BIN
snapshot2/testdata/db-and-wals/wal-03 vendored Normal file

Binary file not shown.