Minor improvements to Queue handling

This commit is contained in:
Philip O'Toole
2024-05-17 09:15:27 -04:00
parent 5d08af33c1
commit bb9f62a13d

View File

@@ -118,9 +118,9 @@ func New[T any](maxSize, batchSize int, t time.Duration) *Queue[T] {
}
// Write queues a request, and returns a monotonically incrementing
// sequence number associated with the slice of objects. If one
// slice has a larger sequence number than a number, the former slice
// will always be committed to Raft before the latter slice.
// sequence number associated with the slice of objects. A slice with
// a lower sequence number than second slice will always be transmitted
// on the Queue's C object before the second slice.
//
// c is an optional channel. If non-nil, it will be closed when the Request
// containing these statements is closed.
@@ -131,6 +131,9 @@ func (q *Queue[T]) Write(objects []T, c FlushChannel) (int64, error) {
default:
}
// Take the lock and don't release it until the function returns.
// This ensures that the incremented sequence number and write to
// batch channel are synchronized.
q.seqMu.Lock()
defer q.seqMu.Unlock()
q.seqNum++
@@ -187,9 +190,11 @@ func (q *Queue[T]) run() {
// mergeQueued returns a new object, ownership will pass
// implicitly to the other side of sendCh.
req := mergeQueued(queuedStmts)
q.sendCh <- req
stats.Add(numObjectsTx, int64(len(req.Objects)))
queuedStmts = queuedStmts[:0] // Better on the GC than setting to nil.
if req != nil {
q.sendCh <- req
stats.Add(numObjectsTx, int64(len(req.Objects)))
queuedStmts = queuedStmts[:0] // Better on the GC than setting to nil.
}
}
for {