package queue import ( "errors" "expvar" "sync" "time" ) // stats captures stats for the Queue. var stats *expvar.Map const ( numObjectsRx = "objects_rx" numObjectsTx = "objects_tx" numTimeout = "num_timeout" numFlush = "num_flush" ) func init() { stats = expvar.NewMap("queue") ResetStats() } // ResetStats resets the expvar stats for this module. Mostly for test purposes. func ResetStats() { stats.Init() stats.Add(numObjectsRx, 0) stats.Add(numObjectsTx, 0) stats.Add(numTimeout, 0) stats.Add(numFlush, 0) } // FlushChannel is the type passed to the Queue, if caller wants // to know when a specific set of objects has been processed. type FlushChannel chan bool // Request represents a batch of objects to be processed. type Request[T any] struct { SequenceNumber int64 Objects []T flushChans []FlushChannel } // Close closes a request, closing any associated flush channels. func (r *Request[T]) Close() { for _, c := range r.flushChans { close(c) } } type queuedObjects[T any] struct { SequenceNumber int64 Objects []T flushChan FlushChannel } func mergeQueued[T any](qs []*queuedObjects[T]) *Request[T] { var o *Request[T] if len(qs) > 0 { o = &Request[T]{ SequenceNumber: qs[0].SequenceNumber, flushChans: make([]FlushChannel, 0), } } for i := range qs { if o.SequenceNumber < qs[i].SequenceNumber { o.SequenceNumber = qs[i].SequenceNumber } o.Objects = append(o.Objects, qs[i].Objects...) if qs[i].flushChan != nil { o.flushChans = append(o.flushChans, qs[i].flushChan) } } return o } // Queue is a batching queue with a timeout. type Queue[T any] struct { maxSize int batchSize int timeout time.Duration batchCh chan *queuedObjects[T] sendCh chan *Request[T] C <-chan *Request[T] done chan struct{} closed chan struct{} flush chan *flushReq reset chan *resetReq seqMu sync.Mutex seqNum int64 // Whitebox unit-testing numTimeouts int } // New returns a instance of a Queue func New[T any](maxSize, batchSize int, t time.Duration) *Queue[T] { q := &Queue[T]{ maxSize: maxSize, batchSize: batchSize, timeout: t, batchCh: make(chan *queuedObjects[T], maxSize), sendCh: make(chan *Request[T], 1), done: make(chan struct{}), closed: make(chan struct{}), flush: make(chan *flushReq, 1), reset: make(chan *resetReq, 1), seqNum: time.Now().UnixNano(), } q.C = q.sendCh go q.run() return q } // Write queues a request, and returns a monotonically increasing // sequence number associated with the slice of objects. A slice with // a lower sequence number than second slice will always be transmitted // on the Queue's C object before the second slice. // // c is an optional channel. If non-nil, it will be closed when the Request // containing these objects is closed. Normally this close operation should // be called by whatever is processing the objects read from the queue, to // indicate that the objects have been processed. The canonical use of this // is to allow the caller to block until the objects are processed. func (q *Queue[T]) Write(objects []T, c FlushChannel) (int64, error) { select { case <-q.done: return 0, errors.New("queue is closed") default: } // Take the lock and don't release it until the function returns. // This ensures that the incremented sequence number and write to // batch channel are synchronized. q.seqMu.Lock() defer q.seqMu.Unlock() q.seqNum++ q.batchCh <- &queuedObjects[T]{ SequenceNumber: q.seqNum, Objects: objects, flushChan: c, } stats.Add(numObjectsRx, int64(len(objects))) return q.seqNum, nil } // Flush flushes the queue func (q *Queue[T]) Flush() error { req := &flushReq{ ch: make(chan struct{}), } q.flush <- req <-req.ch return nil } // Close closes the queue. A closed queue should not be used. func (q *Queue[T]) Close() error { select { case <-q.done: default: close(q.done) <-q.closed } return nil } // Depth returns the number of queued requests. Requests which have // been dequeued but not yet sent to C are not counted // in this number. XXX THIS IS NOT CLEARL DEFINED ACTUALLY. func (q *Queue[T]) Depth() int { return len(q.batchCh) } // Stats returns stats on this queue. func (q *Queue[T]) Stats() (map[string]any, error) { return map[string]any{ "max_size": q.maxSize, "batch_size": q.batchSize, "timeout": q.timeout.String(), }, nil } // Reset resets to the queue to be empty. Any queued objects are dropped. // This function should not be called concurrently with Write. func (q *Queue[T]) Reset() error { req := &resetReq{ ch: make(chan struct{}), } q.reset <- req <-req.ch return nil } func (q *Queue[T]) run() { defer close(q.closed) queuedStmts := make([]*queuedObjects[T], 0) // Create an initial timer, in the stopped state. timer := time.NewTimer(0) <-timer.C writeFn := func() { // mergeQueued returns a new object, ownership will pass // implicitly to the other side of sendCh. req := mergeQueued(queuedStmts) if req != nil { q.sendCh <- req stats.Add(numObjectsTx, int64(len(req.Objects))) queuedStmts = queuedStmts[:0] // Better on the GC than setting to nil. } } for { select { case s := <-q.batchCh: // Not sure I like this. Maybe we should just // block on the channel until it reaches the batch size? XXX queuedStmts = append(queuedStmts, s) if len(queuedStmts) == 1 { // First item in queue, start the timer so that if // we don't get in a batch, we'll still write. timer.Reset(q.timeout) } if len(queuedStmts) == q.batchSize { stopTimer(timer) writeFn() } case <-timer.C: stats.Add(numTimeout, 1) q.numTimeouts++ writeFn() case req := <-q.flush: stats.Add(numFlush, 1) stopTimer(timer) for len(q.batchCh) > 0 { s := <-q.batchCh queuedStmts = append(queuedStmts, s) } writeFn() close(req.ch) case req := <-q.reset: stopTimer(timer) queuedStmts = queuedStmts[:0] for len(q.batchCh) > 0 { _ = <-q.batchCh } close(req.ch) case <-q.done: stopTimer(timer) return } } } func stopTimer(timer *time.Timer) { if !timer.Stop() && len(timer.C) > 0 { <-timer.C } } type resetReq struct { ch chan struct{} } type flushReq struct { ch chan struct{} }