WIP queue improvements

This commit is contained in:
Philip O'Toole
2025-07-26 10:47:27 -04:00
parent 7c8a6ffb50
commit c7c83fa45a
2 changed files with 28 additions and 1 deletions

View File

@@ -173,7 +173,9 @@ func (q *Queue[T]) Close() error {
return nil return nil
} }
// Depth returns the number of queued requests // Depth returns the number of queued requests. Requests which have
// been dequeued but not yet sent to C are not counted
// in this number. XXX THIS IS NOT CLEARL DEFINED ACTUALLY.
func (q *Queue[T]) Depth() int { func (q *Queue[T]) Depth() int {
return len(q.batchCh) return len(q.batchCh)
} }
@@ -220,6 +222,8 @@ func (q *Queue[T]) run() {
for { for {
select { select {
case s := <-q.batchCh: case s := <-q.batchCh:
// Not sure I like this. Maybe we should just
// block on the channel until it reaches the batch size? XXX
queuedStmts = append(queuedStmts, s) queuedStmts = append(queuedStmts, s)
if len(queuedStmts) == 1 { if len(queuedStmts) == 1 {
// First item in queue, start the timer so that if // First item in queue, start the timer so that if

View File

@@ -474,7 +474,30 @@ func Test_Queue_FlushReset(t *testing.T) {
t.Fatalf("failed to write: %s", err.Error()) t.Fatalf("failed to write: %s", err.Error())
} }
testPoll(t, func() bool {
return q.Depth() == 1
}, time.Second)
if err := q.Reset(); err != nil { if err := q.Reset(); err != nil {
t.Fatalf("failed to reset queue: %s", err.Error()) t.Fatalf("failed to reset queue: %s", err.Error())
} }
} }
func testPoll(t *testing.T, fn func() bool, timeout time.Duration) {
t.Helper()
timer := time.NewTimer(timeout)
defer timer.Stop()
ticker := time.NewTicker(50 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-timer.C:
t.Fatalf("timed out waiting for condition")
case <-ticker.C:
if fn() {
return
}
}
}
}