package queue import ( "errors" "expvar" "sync" "time" ) // stats captures stats for the Queue. var stats *expvar.Map const ( numObjectsRx = "objects_rx" numObjectsTx = "objects_tx" numTimeout = "num_timeout" numFlush = "num_flush" ) func init() { stats = expvar.NewMap("queue") ResetStats() } // ResetStats resets the expvar stats for this module. Mostly for test purposes. func ResetStats() { stats.Init() stats.Add(numObjectsRx, 0) stats.Add(numObjectsTx, 0) stats.Add(numTimeout, 0) stats.Add(numFlush, 0) } // FlushChannel is the type passed to the Queue, if caller wants // to know when a specific set of objects has been processed. type FlushChannel chan bool // Request represents a batch of objects to be processed. type Request[T any] struct { SequenceNumber int64 Objects []T flushChans []FlushChannel } // Close closes a request, closing any associated flush channels. func (r *Request[T]) Close() { for _, c := range r.flushChans { close(c) } } type queuedObjects[T any] struct { SequenceNumber int64 Objects []T flushChan FlushChannel } func mergeQueued[T any](qs []*queuedObjects[T]) *Request[T] { if len(qs) == 0 { return nil } var req *Request[T] req = &Request[T]{ SequenceNumber: qs[0].SequenceNumber, flushChans: make([]FlushChannel, 0), } for i := range qs { if req.SequenceNumber < qs[i].SequenceNumber { req.SequenceNumber = qs[i].SequenceNumber } req.Objects = append(req.Objects, qs[i].Objects...) if qs[i].flushChan != nil { req.flushChans = append(req.flushChans, qs[i].flushChan) } } return req } // Queue is a batching queue with a timeout. type Queue[T any] struct { maxSize int batchSize int timeout time.Duration batchCh chan *queuedObjects[T] sendCh chan *Request[T] C <-chan *Request[T] done chan struct{} closed chan struct{} seqMu sync.Mutex seqNum int64 // Whitebox unit-testing numTimeouts int } // New returns a instance of a Queue. t is the maximum time that // objects can remain in the queue before being sent, even if // the batch size has not been reached. If t is zero, there is no // timeout, and batches are sent only when the batch size is reached. func New[T any](maxSize, batchSize int, t time.Duration) *Queue[T] { q := &Queue[T]{ maxSize: maxSize, batchSize: batchSize, timeout: t, batchCh: make(chan *queuedObjects[T], maxSize), sendCh: make(chan *Request[T], 1), done: make(chan struct{}), closed: make(chan struct{}), seqNum: time.Now().UnixNano(), } q.C = q.sendCh go q.run() return q } // Write queues a request, and returns a monotonically increasing // sequence number associated with the slice of objects. A slice with // a lower sequence number than second slice will always be transmitted // on the Queue's C object before the second slice. // // c is an optional channel. If non-nil, it will be closed when the Request // containing these objects is closed. Normally this close operation should // be called by whatever is processing the objects read from the queue, to // indicate that the objects have been processed. The canonical use of this // is to allow the caller to block until the objects are processed. func (q *Queue[T]) Write(objects []T, c FlushChannel) (int64, error) { select { case <-q.done: return 0, errors.New("queue is closed") default: } // Take the lock and don't release it until the function returns. // This ensures that the incremented sequence number and write to // batch channel are synchronized. q.seqMu.Lock() defer q.seqMu.Unlock() q.seqNum++ q.batchCh <- &queuedObjects[T]{ SequenceNumber: q.seqNum, Objects: objects, flushChan: c, } stats.Add(numObjectsRx, int64(len(objects))) return q.seqNum, nil } // WriteOne queues a single object, and returns a monotonically increasing // sequence number associated with the object. See Write() for more details. func (q *Queue[T]) WriteOne(object T, c FlushChannel) (int64, error) { return q.Write([]T{object}, c) } // Flush flushes the queue func (q *Queue[T]) Flush() error { q.batchCh <- nil return nil } // Close closes the queue. A closed queue should not be used. func (q *Queue[T]) Close() error { select { case <-q.done: default: close(q.done) <-q.closed } return nil } // Depth returns the number of queued requests func (q *Queue[T]) Depth() int { return len(q.batchCh) } // Stats returns stats on this queue. func (q *Queue[T]) Stats() (map[string]any, error) { return map[string]any{ "max_size": q.maxSize, "batch_size": q.batchSize, "timeout": q.timeout.String(), }, nil } func (q *Queue[T]) run() { defer close(q.closed) qObjs := make([]*queuedObjects[T], 0) // Create an initial timer, in the stopped state. timer := time.NewTimer(0) <-timer.C writeFn := func() { // mergeQueued returns a new object, ownership will pass // implicitly to the other side of sendCh. req := mergeQueued(qObjs) if req != nil { q.sendCh <- req stats.Add(numObjectsTx, int64(len(req.Objects))) qObjs = qObjs[:0] // Better on the GC than setting to nil. } } for { select { case s := <-q.batchCh: if s == nil { // flush marker stats.Add(numFlush, 1) stopTimer(timer) writeFn() break } qObjs = append(qObjs, s) if len(qObjs) == 1 { if q.timeout != 0 { // First item in queue, start the timer so that if // we don't get in a batch, we'll still write. timer.Reset(q.timeout) } } if len(qObjs) == q.batchSize { stopTimer(timer) writeFn() } case <-timer.C: stats.Add(numTimeout, 1) q.numTimeouts++ writeFn() case <-q.done: stopTimer(timer) return } } } func stopTimer(timer *time.Timer) { if !timer.Stop() && len(timer.C) > 0 { <-timer.C } }