From f600d0f48bd6e19270321c259f3591735f69dc8f Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Mon, 8 Sep 2025 20:46:46 -0400 Subject: [PATCH] Correct batching queue flush --- CHANGELOG.md | 1 + queue/queue.go | 38 ++++++++++++++++++++------------------ queue/queue_test.go | 41 ++++++++++++++++++++++++++++++++++++++++- 3 files changed, 61 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ed9adb6f..9c808902 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ - [PR #2297](https://github.com/rqlite/rqlite/pull/2297): Record commit timestamp for CDC events. - [PR #2298](https://github.com/rqlite/rqlite/pull/2298), [PR #2299](https://github.com/rqlite/rqlite/pull/2299), [PR #2300](https://github.com/rqlite/rqlite/pull/2301), [PR #2301](https://github.com/rqlite/rqlite/pull/2301): CDC regex filtering for table names. - [PR #2307](https://github.com/rqlite/rqlite/pull/2307): Add `SyncChannels` synchronization primitive. +- [PR #2308](https://github.com/rqlite/rqlite/pull/2308): Correctly implement Batching Queue flushing. ## v8.43.4 (August 27th 2025) ### Implementation changes and bug fixes diff --git a/queue/queue.go b/queue/queue.go index bfce064b..b23a9434 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -56,24 +56,25 @@ type queuedObjects[T any] struct { } func mergeQueued[T any](qs []*queuedObjects[T]) *Request[T] { - var o *Request[T] - if len(qs) > 0 { - o = &Request[T]{ - SequenceNumber: qs[0].SequenceNumber, - flushChans: make([]FlushChannel, 0), - } + if len(qs) == 0 { + return nil + } + var req *Request[T] + req = &Request[T]{ + SequenceNumber: qs[0].SequenceNumber, + flushChans: make([]FlushChannel, 0), } for i := range qs { - if o.SequenceNumber < qs[i].SequenceNumber { - o.SequenceNumber = qs[i].SequenceNumber + if req.SequenceNumber < qs[i].SequenceNumber { + req.SequenceNumber = qs[i].SequenceNumber } - o.Objects = append(o.Objects, qs[i].Objects...) + req.Objects = append(req.Objects, qs[i].Objects...) if qs[i].flushChan != nil { - o.flushChans = append(o.flushChans, qs[i].flushChan) + req.flushChans = append(req.flushChans, qs[i].flushChan) } } - return o + return req } // Queue is a batching queue with a timeout. @@ -89,7 +90,6 @@ type Queue[T any] struct { done chan struct{} closed chan struct{} - flush chan struct{} seqMu sync.Mutex seqNum int64 @@ -111,7 +111,6 @@ func New[T any](maxSize, batchSize int, t time.Duration) *Queue[T] { sendCh: make(chan *Request[T], 1), done: make(chan struct{}), closed: make(chan struct{}), - flush: make(chan struct{}), seqNum: time.Now().UnixNano(), } @@ -161,7 +160,7 @@ func (q *Queue[T]) WriteOne(object T, c FlushChannel) (int64, error) { // Flush flushes the queue func (q *Queue[T]) Flush() error { - q.flush <- struct{}{} + q.batchCh <- nil return nil } @@ -212,6 +211,13 @@ func (q *Queue[T]) run() { for { select { case s := <-q.batchCh: + if s == nil { // flush marker + stats.Add(numFlush, 1) + stopTimer(timer) + writeFn() + break + } + qObjs = append(qObjs, s) if len(qObjs) == 1 { if q.timeout != 0 { @@ -228,10 +234,6 @@ func (q *Queue[T]) run() { stats.Add(numTimeout, 1) q.numTimeouts++ writeFn() - case <-q.flush: - stats.Add(numFlush, 1) - stopTimer(timer) - writeFn() case <-q.done: stopTimer(timer) return diff --git a/queue/queue_test.go b/queue/queue_test.go index 89ee589a..0f2bf6b6 100644 --- a/queue/queue_test.go +++ b/queue/queue_test.go @@ -152,6 +152,46 @@ func Test_QueueWriteOne(t *testing.T) { } } +func Test_QueueWriteOne_FlushEmpty(t *testing.T) { + q := New[*command.Statement](1024, 2, 60*time.Second) + defer q.Close() + + if err := q.Flush(); err != nil { + t.Fatalf("failed to flush: %s", err.Error()) + } + + select { + case req := <-q.C: + t.Fatalf("received unexpected request on empty flush: %v", req) + case <-time.After(100 * time.Millisecond): + // Expected, nothing to receive. + } +} + +func Test_QueueWriteOne_Flush(t *testing.T) { + q := New[*command.Statement](1024, 2, 60*time.Second) + defer q.Close() + + if _, err := q.WriteOne(testStmtFoo, nil); err != nil { + t.Fatalf("failed to write: %s", err.Error()) + } + if err := q.Flush(); err != nil { + t.Fatalf("failed to flush: %s", err.Error()) + } + + select { + case req := <-q.C: + if exp, got := 1, len(req.Objects); exp != got { + t.Fatalf("received wrong length slice, exp %d, got %d", exp, got) + } + if !reflect.DeepEqual(req.Objects[0], testStmtFoo) { + t.Fatalf("received wrong statement, got: %v, want: %v", req.Objects[0], testStmtFoo) + } + case <-time.After(time.Second): + t.Fatalf("timed out waiting for statement") + } +} + func Test_QueueWriteBatchSizeSingle(t *testing.T) { q := New[*command.Statement](1024, 1, 60*time.Second) defer q.Close() @@ -495,5 +535,4 @@ func Test_QueueOrdering(t *testing.T) { case <-time.After(testTimeout): t.Fatalf("timed out waiting for all indexes") } - }