Correct batching queue flush

This commit is contained in:
Philip O'Toole
2025-09-08 20:46:46 -04:00
committed by GitHub
parent e8e7b8a5e7
commit f600d0f48b
3 changed files with 61 additions and 19 deletions

View File

@@ -19,6 +19,7 @@
- [PR #2297](https://github.com/rqlite/rqlite/pull/2297): Record commit timestamp for CDC events.
- [PR #2298](https://github.com/rqlite/rqlite/pull/2298), [PR #2299](https://github.com/rqlite/rqlite/pull/2299), [PR #2300](https://github.com/rqlite/rqlite/pull/2301), [PR #2301](https://github.com/rqlite/rqlite/pull/2301): CDC regex filtering for table names.
- [PR #2307](https://github.com/rqlite/rqlite/pull/2307): Add `SyncChannels` synchronization primitive.
- [PR #2308](https://github.com/rqlite/rqlite/pull/2308): Correctly implement Batching Queue flushing.
## v8.43.4 (August 27th 2025)
### Implementation changes and bug fixes

View File

@@ -56,24 +56,25 @@ type queuedObjects[T any] struct {
}
func mergeQueued[T any](qs []*queuedObjects[T]) *Request[T] {
var o *Request[T]
if len(qs) > 0 {
o = &Request[T]{
if len(qs) == 0 {
return nil
}
var req *Request[T]
req = &Request[T]{
SequenceNumber: qs[0].SequenceNumber,
flushChans: make([]FlushChannel, 0),
}
}
for i := range qs {
if o.SequenceNumber < qs[i].SequenceNumber {
o.SequenceNumber = qs[i].SequenceNumber
if req.SequenceNumber < qs[i].SequenceNumber {
req.SequenceNumber = qs[i].SequenceNumber
}
o.Objects = append(o.Objects, qs[i].Objects...)
req.Objects = append(req.Objects, qs[i].Objects...)
if qs[i].flushChan != nil {
o.flushChans = append(o.flushChans, qs[i].flushChan)
req.flushChans = append(req.flushChans, qs[i].flushChan)
}
}
return o
return req
}
// Queue is a batching queue with a timeout.
@@ -89,7 +90,6 @@ type Queue[T any] struct {
done chan struct{}
closed chan struct{}
flush chan struct{}
seqMu sync.Mutex
seqNum int64
@@ -111,7 +111,6 @@ func New[T any](maxSize, batchSize int, t time.Duration) *Queue[T] {
sendCh: make(chan *Request[T], 1),
done: make(chan struct{}),
closed: make(chan struct{}),
flush: make(chan struct{}),
seqNum: time.Now().UnixNano(),
}
@@ -161,7 +160,7 @@ func (q *Queue[T]) WriteOne(object T, c FlushChannel) (int64, error) {
// Flush flushes the queue
func (q *Queue[T]) Flush() error {
q.flush <- struct{}{}
q.batchCh <- nil
return nil
}
@@ -212,6 +211,13 @@ func (q *Queue[T]) run() {
for {
select {
case s := <-q.batchCh:
if s == nil { // flush marker
stats.Add(numFlush, 1)
stopTimer(timer)
writeFn()
break
}
qObjs = append(qObjs, s)
if len(qObjs) == 1 {
if q.timeout != 0 {
@@ -228,10 +234,6 @@ func (q *Queue[T]) run() {
stats.Add(numTimeout, 1)
q.numTimeouts++
writeFn()
case <-q.flush:
stats.Add(numFlush, 1)
stopTimer(timer)
writeFn()
case <-q.done:
stopTimer(timer)
return

View File

@@ -152,6 +152,46 @@ func Test_QueueWriteOne(t *testing.T) {
}
}
func Test_QueueWriteOne_FlushEmpty(t *testing.T) {
q := New[*command.Statement](1024, 2, 60*time.Second)
defer q.Close()
if err := q.Flush(); err != nil {
t.Fatalf("failed to flush: %s", err.Error())
}
select {
case req := <-q.C:
t.Fatalf("received unexpected request on empty flush: %v", req)
case <-time.After(100 * time.Millisecond):
// Expected, nothing to receive.
}
}
func Test_QueueWriteOne_Flush(t *testing.T) {
q := New[*command.Statement](1024, 2, 60*time.Second)
defer q.Close()
if _, err := q.WriteOne(testStmtFoo, nil); err != nil {
t.Fatalf("failed to write: %s", err.Error())
}
if err := q.Flush(); err != nil {
t.Fatalf("failed to flush: %s", err.Error())
}
select {
case req := <-q.C:
if exp, got := 1, len(req.Objects); exp != got {
t.Fatalf("received wrong length slice, exp %d, got %d", exp, got)
}
if !reflect.DeepEqual(req.Objects[0], testStmtFoo) {
t.Fatalf("received wrong statement, got: %v, want: %v", req.Objects[0], testStmtFoo)
}
case <-time.After(time.Second):
t.Fatalf("timed out waiting for statement")
}
}
func Test_QueueWriteBatchSizeSingle(t *testing.T) {
q := New[*command.Statement](1024, 1, 60*time.Second)
defer q.Close()
@@ -495,5 +535,4 @@ func Test_QueueOrdering(t *testing.T) {
case <-time.After(testTimeout):
t.Fatalf("timed out waiting for all indexes")
}
}