Snapshot Sink sets MSRW name

This commit is contained in:
Philip O'Toole
2026-01-03 15:12:03 -05:00
committed by GitHub
parent 08e2a081a0
commit eadf4d8e79
4 changed files with 39 additions and 28 deletions

View File

@@ -1,6 +1,7 @@
## v9.3.8 (unreleased) ## v9.3.8 (unreleased)
### Implementation changes and bug fixes ### Implementation changes and bug fixes
- [PR #2419](https://github.com/rqlite/rqlite/pull/2419): Improve MSRW Error structure. - [PR #2419](https://github.com/rqlite/rqlite/pull/2419): Improve MSRW Error structure.
- [PR #2420](https://github.com/rqlite/rqlite/pull/2420): Snapshot Sink sets owner when taking MSRW.
## v9.3.7 (January 2nd 2026) ## v9.3.7 (January 2nd 2026)
### Implementation changes and bug fixes ### Implementation changes and bug fixes

View File

@@ -1,6 +1,7 @@
package rsync package rsync
import ( import (
"fmt"
"sync" "sync"
) )
@@ -15,16 +16,16 @@ func (e *ErrMRSWConflict) Error() string {
} }
// NewErrMRSWConflict creates a new ErrMRSWConflict with the given message. // NewErrMRSWConflict creates a new ErrMRSWConflict with the given message.
func NewErrMRSWConflict(msg string) error { func NewErrMRSWConflict(m string) error {
return &ErrMRSWConflict{msg: msg} return &ErrMRSWConflict{msg: m}
} }
// MultiRSW is a simple concurrency control mechanism that allows // MultiRSW is a simple concurrency control mechanism that allows
// multiple readers or a single writer to execute a critical section at a time. // multiple readers or a single writer to execute a critical section at a time.
type MultiRSW struct { type MultiRSW struct {
writerActive bool owner string
numReaders int numReaders int
mu sync.Mutex mu sync.Mutex
} }
// NewMultiRSW creates a new MultiRSW instance. // NewMultiRSW creates a new MultiRSW instance.
@@ -36,8 +37,8 @@ func NewMultiRSW() *MultiRSW {
func (r *MultiRSW) BeginRead() error { func (r *MultiRSW) BeginRead() error {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
if r.writerActive { if r.owner != "" {
return NewErrMRSWConflict("MSRW conflict") return NewErrMRSWConflict("MSRW conflict owner: " + r.owner)
} }
r.numReaders++ r.numReaders++
return nil return nil
@@ -54,13 +55,19 @@ func (r *MultiRSW) EndRead() {
} }
// BeginWrite attempts to enter the critical section as a writer. // BeginWrite attempts to enter the critical section as a writer.
func (r *MultiRSW) BeginWrite() error { func (r *MultiRSW) BeginWrite(owner string) error {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
if r.writerActive || r.numReaders > 0 { if owner == "" {
return NewErrMRSWConflict("MSRW conflict") panic("owner cannot be empty")
} }
r.writerActive = true if r.owner != "" {
return NewErrMRSWConflict("MSRW conflict owner: " + r.owner)
}
if r.numReaders > 0 {
return NewErrMRSWConflict(fmt.Sprintf("MSRW conflict %d readers active", r.numReaders))
}
r.owner = owner
return nil return nil
} }
@@ -68,25 +75,28 @@ func (r *MultiRSW) BeginWrite() error {
func (r *MultiRSW) EndWrite() { func (r *MultiRSW) EndWrite() {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
if !r.writerActive { if r.owner == "" {
panic("write done received but no write is active") panic("write done received but no write is active")
} }
r.writerActive = false r.owner = ""
} }
// UpgradeToWriter attempts to upgrade a read lock to a write lock. The // UpgradeToWriter attempts to upgrade a read lock to a write lock. The
// client must be the only reader in order to upgrade, and must already // client must be the only reader in order to upgrade, and must already
// be in a read lock. // be in a read lock.
func (r *MultiRSW) UpgradeToWriter() error { func (r *MultiRSW) UpgradeToWriter(owner string) error {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
if r.writerActive || r.numReaders > 1 { if r.owner != "" {
return NewErrMRSWConflict("MSRW conflict") return NewErrMRSWConflict("MSRW conflict owner: " + r.owner)
}
if r.numReaders > 1 {
return NewErrMRSWConflict(fmt.Sprintf("MSRW conflict %d readers active", r.numReaders))
} }
if r.numReaders == 0 { if r.numReaders == 0 {
panic("upgrade attempted with no readers") panic("upgrade attempted with no readers")
} }
r.writerActive = true r.owner = owner
r.numReaders = 0 r.numReaders = 0
return nil return nil
} }

View File

@@ -14,20 +14,20 @@ func Test_MultiRSW(t *testing.T) {
r.EndRead() r.EndRead()
// Test successful write lock // Test successful write lock
if err := r.BeginWrite(); err != nil { if err := r.BeginWrite("owner1"); err != nil {
t.Fatalf("Failed to acquire write lock: %v", err) t.Fatalf("Failed to acquire write lock: %v", err)
} }
r.EndWrite() r.EndWrite()
// Test that a write blocks other writers and readers. // Test that a write blocks other writers and readers.
err := r.BeginWrite() err := r.BeginWrite("owner2")
if err != nil { if err != nil {
t.Fatalf("Failed to acquire write lock in goroutine: %v", err) t.Fatalf("Failed to acquire write lock in goroutine: %v", err)
} }
if err := r.BeginRead(); err == nil { if err := r.BeginRead(); err == nil {
t.Fatalf("Expected error when reading during active write, got none") t.Fatalf("Expected error when reading during active write, got none")
} }
if err := r.BeginWrite(); err == nil { if err := r.BeginWrite("owner3"); err == nil {
t.Fatalf("Expected error when writing during active write, got none") t.Fatalf("Expected error when writing during active write, got none")
} }
r.EndWrite() r.EndWrite()
@@ -37,7 +37,7 @@ func Test_MultiRSW(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("Failed to acquire read lock in goroutine: %v", err) t.Fatalf("Failed to acquire read lock in goroutine: %v", err)
} }
if err := r.BeginWrite(); err == nil { if err := r.BeginWrite("owner4"); err == nil {
t.Fatalf("Expected error when writing during active read, got none") t.Fatalf("Expected error when writing during active read, got none")
} }
r.EndRead() r.EndRead()
@@ -61,7 +61,7 @@ func Test_MultiRSW_Upgrade(t *testing.T) {
if err := r.BeginRead(); err != nil { if err := r.BeginRead(); err != nil {
t.Fatalf("Failed to acquire read lock: %v", err) t.Fatalf("Failed to acquire read lock: %v", err)
} }
if err := r.UpgradeToWriter(); err != nil { if err := r.UpgradeToWriter("owner11"); err != nil {
t.Fatalf("Failed to upgrade to write lock: %v", err) t.Fatalf("Failed to upgrade to write lock: %v", err)
} }
r.EndWrite() r.EndWrite()
@@ -73,7 +73,7 @@ func Test_MultiRSW_Upgrade(t *testing.T) {
if err := r.BeginRead(); err != nil { if err := r.BeginRead(); err != nil {
t.Fatalf("Failed to acquire read lock: %v", err) t.Fatalf("Failed to acquire read lock: %v", err)
} }
if err := r.UpgradeToWriter(); err == nil { if err := r.UpgradeToWriter("owner5"); err == nil {
t.Fatalf("Expected error when upgrading with multiple readers, got none") t.Fatalf("Expected error when upgrading with multiple readers, got none")
} }
r.EndRead() r.EndRead()
@@ -83,19 +83,19 @@ func Test_MultiRSW_Upgrade(t *testing.T) {
if err := r.BeginRead(); err != nil { if err := r.BeginRead(); err != nil {
t.Fatalf("Failed to acquire read lock: %v", err) t.Fatalf("Failed to acquire read lock: %v", err)
} }
if err := r.UpgradeToWriter(); err != nil { if err := r.UpgradeToWriter("owner6"); err != nil {
t.Fatalf("Failed to upgrade to write lock: %v", err) t.Fatalf("Failed to upgrade to write lock: %v", err)
} }
if err := r.UpgradeToWriter(); err == nil { if err := r.UpgradeToWriter("owner7"); err == nil {
t.Fatalf("Expected error when double-ugrading, got none") t.Fatalf("Expected error when double-ugrading, got none")
} }
r.EndWrite() r.EndWrite()
// Test that upgrades are blocked by other writers // Test that upgrades are blocked by other writers
if err := r.BeginWrite(); err != nil { if err := r.BeginWrite("owner8"); err != nil {
t.Fatalf("Failed to acquire write lock: %v", err) t.Fatalf("Failed to acquire write lock: %v", err)
} }
if err := r.UpgradeToWriter(); err == nil { if err := r.UpgradeToWriter("owner9"); err == nil {
t.Fatalf("Expected error when upgrading with an active writer, got none") t.Fatalf("Expected error when upgrading with an active writer, got none")
} }
r.EndWrite() r.EndWrite()

View File

@@ -171,7 +171,7 @@ func NewStore(dir string) (*Store, error) {
// be a problem, since snapshots are taken infrequently in one at a time. // be a problem, since snapshots are taken infrequently in one at a time.
func (s *Store) Create(version raft.SnapshotVersion, index, term uint64, configuration raft.Configuration, func (s *Store) Create(version raft.SnapshotVersion, index, term uint64, configuration raft.Configuration,
configurationIndex uint64, trans raft.Transport) (retSink raft.SnapshotSink, retErr error) { configurationIndex uint64, trans raft.Transport) (retSink raft.SnapshotSink, retErr error) {
if err := s.mrsw.BeginWrite(); err != nil { if err := s.mrsw.BeginWrite(fmt.Sprintf("snapshot-create-sink:%s", snapshotName(term, index))); err != nil {
stats.Add(snapshotCreateMRSWFail, 1) stats.Add(snapshotCreateMRSWFail, 1)
return nil, err return nil, err
} }