Handle all dequeueing in one place

This commit is contained in:
Philip O'Toole
2025-07-01 23:31:18 -04:00
committed by GitHub
parent 8902d30995
commit 40450e7e0b
2 changed files with 32 additions and 65 deletions

View File

@@ -4,7 +4,7 @@
- [PR #2128](https://github.com/rqlite/rqlite/pull/2128): Support CDC highwatermarking control.
- [PR #2129](https://github.com/rqlite/rqlite/pull/2129): CDC Service improvements and refactoring.
- [PR #2131](https://github.com/rqlite/rqlite/pull/2131): FIFO queue for CDC service.
- [PR #2132](https://github.com/rqlite/rqlite/pull/2132): Move to channel-based FIFO service.
- [PR #2132](https://github.com/rqlite/rqlite/pull/2132), [PR #2133](https://github.com/rqlite/rqlite/pull/2133): Move to channel-based FIFO service.
## v8.38.2 (June 23rd 2025)
### Implementation changes and bug fixes

View File

@@ -49,7 +49,7 @@ type Queue struct {
// NewQueue creates or opens a new persistent queue at the given file path.
func NewQueue(path string) (*Queue, error) {
db, err := bbolt.Open(path, 0600, &bbolt.Options{Timeout: time.Second})
db, err := bbolt.Open(path, 0600, &bbolt.Options{Timeout: time.Second, NoFreelistSync: true})
if err != nil {
return nil, fmt.Errorf("failed to open boltdb: %w", err)
}
@@ -113,13 +113,6 @@ func (q *Queue) run(nextKey []byte, highestKey uint64) {
var waitingDequeues []dequeueReq // A list of callers waiting for an item.
for {
// If the queue is empty, we can't process a dequeue request.
// So we only listen on the dequeueChan if there are items.
var activeDequeueChan chan dequeueReq
if len(waitingDequeues) > 0 || nextKey != nil {
activeDequeueChan = q.dequeueChan
}
select {
case req := <-q.enqueueChan:
if req.idx <= highestKey {
@@ -149,64 +142,10 @@ func (q *Queue) run(nextKey []byte, highestKey uint64) {
nextKey = key
}
// Fulfill any waiting dequeue request immediately.
for len(waitingDequeues) > 0 && nextKey != nil {
waiter := waitingDequeues[0]
waitingDequeues = waitingDequeues[1:] // Pop from waitlist
var resp dequeueResp
err := q.db.View(func(tx *bbolt.Tx) error {
c := tx.Bucket(bucketName).Cursor()
_, val := c.Seek(nextKey)
if val == nil {
return fmt.Errorf("item not found for key %x", nextKey)
}
resp.idx = btouint64(nextKey)
resp.val = make([]byte, len(val))
copy(resp.val, val)
nk, _ := c.Next()
if nk != nil {
copy(nextKey, nk)
} else {
nextKey = nil // No more items available
}
return nil
})
resp.err = err
waiter.respChan <- resp
}
req.respChan <- enqueueResp{err: err}
case req := <-activeDequeueChan:
// If a request arrived but nextKey is nil it means there are no items available.
if nextKey == nil {
waitingDequeues = append(waitingDequeues, req)
continue
}
var resp dequeueResp
err := q.db.View(func(tx *bbolt.Tx) error {
c := tx.Bucket(bucketName).Cursor()
_, val := c.Seek(nextKey)
if val == nil {
return fmt.Errorf("item not found for key %x", nextKey)
}
resp.idx = btouint64(nextKey)
resp.val = make([]byte, len(val))
copy(resp.val, val)
nk, _ := c.Next()
if nk != nil {
copy(nextKey, nk)
} else {
nextKey = nil // No more items available
}
return nil
})
resp.err = err
req.respChan <- resp
case req := <-q.dequeueChan:
waitingDequeues = append(waitingDequeues, req)
case req := <-q.deleteRangeChan:
err := q.db.Update(func(tx *bbolt.Tx) error {
@@ -255,6 +194,34 @@ func (q *Queue) run(nextKey []byte, highestKey uint64) {
}
return
}
// Fulfill any waiting dequeue requests.
for len(waitingDequeues) > 0 && nextKey != nil {
waiter := waitingDequeues[0]
waitingDequeues = waitingDequeues[1:] // Pop from waitlist
var resp dequeueResp
err := q.db.View(func(tx *bbolt.Tx) error {
c := tx.Bucket(bucketName).Cursor()
_, val := c.Seek(nextKey)
if val == nil {
return fmt.Errorf("item not found for key %x", nextKey)
}
resp.idx = btouint64(nextKey)
resp.val = make([]byte, len(val))
copy(resp.val, val)
nk, _ := c.Next()
if nk != nil {
copy(nextKey, nk)
} else {
nextKey = nil // No more items available
}
return nil
})
resp.err = err
waiter.respChan <- resp
}
}
}