Move Queue to use Generics

I may have other uses for this Queue in the future.
This commit is contained in:
Philip O'Toole
2024-03-18 09:49:29 -04:00
parent 47dfcd9d56
commit cfce363e42
4 changed files with 102 additions and 99 deletions

View File

@@ -1,3 +1,7 @@
## 8.23.1 (unreleased)
### Implementation changes and bug fixes
- [PR #1731](https://github.com/rqlite/rqlite/pull/1731): Queue now uses generic types.
## 8.23.0 (March 14th 2024)
Upgrading to this release can be performed via a rolling restart if necessary. However, until all nodes in a cluster are running 8.23.0, writes to that cluster **might** return an error. Queries should operate without issue during any rolling restart.
### New features

View File

@@ -24,6 +24,7 @@ import (
clstrPB "github.com/rqlite/rqlite/v8/cluster/proto"
"github.com/rqlite/rqlite/v8/command/encoding"
"github.com/rqlite/rqlite/v8/command/proto"
command "github.com/rqlite/rqlite/v8/command/proto"
"github.com/rqlite/rqlite/v8/command/sql"
"github.com/rqlite/rqlite/v8/db"
"github.com/rqlite/rqlite/v8/queue"
@@ -310,7 +311,7 @@ type Service struct {
store Store // The Raft-backed database store.
queueDone chan struct{}
stmtQueue *queue.Queue // Queue for queued executes
stmtQueue *queue.Queue[*command.Statement] // Queue for queued executes
cluster Cluster // The Cluster service.
@@ -404,7 +405,7 @@ func (s *Service) Start() error {
s.closeCh = make(chan struct{})
s.queueDone = make(chan struct{})
s.stmtQueue = queue.New(s.DefaultQueueCap, s.DefaultQueueBatchSz, s.DefaultQueueTimeout)
s.stmtQueue = queue.New[*command.Statement](s.DefaultQueueCap, s.DefaultQueueBatchSz, s.DefaultQueueTimeout)
go s.runQueue()
s.logger.Printf("execute queue processing started with capacity %d, batch size %d, timeout %s",
s.DefaultQueueCap, s.DefaultQueueBatchSz, s.DefaultQueueTimeout.String())
@@ -1527,11 +1528,11 @@ func (s *Service) runQueue() {
case req := <-s.stmtQueue.C:
er := &proto.ExecuteRequest{
Request: &proto.Request{
Statements: req.Statements,
Statements: req.Objects,
Transaction: s.DefaultQueueTx,
},
}
stats.Add(numQueuedExecutionsStmtsRx, int64(len(req.Statements)))
stats.Add(numQueuedExecutionsStmtsRx, int64(len(req.Objects)))
// Nil statements are valid, as clients may want to just send
// a "checkpoint" through the queue.
@@ -1579,7 +1580,7 @@ func (s *Service) runQueue() {
s.seqNum = req.SequenceNumber
s.seqNumMu.Unlock()
req.Close()
stats.Add(numQueuedExecutionsStmtsTx, int64(len(req.Statements)))
stats.Add(numQueuedExecutionsStmtsTx, int64(len(req.Objects)))
stats.Add(numQueuedExecutionsOK, 1)
}
}

View File

@@ -5,18 +5,16 @@ import (
"expvar"
"sync"
"time"
command "github.com/rqlite/rqlite/v8/command/proto"
)
// stats captures stats for the Queue.
var stats *expvar.Map
const (
numStatementsRx = "statements_rx"
numStatementsTx = "statements_tx"
numTimeout = "num_timeout"
numFlush = "num_flush"
numObjectsRx = "objects_rx"
numObjectsTx = "objects_tx"
numTimeout = "num_timeout"
numFlush = "num_flush"
)
func init() {
@@ -27,40 +25,40 @@ func init() {
// ResetStats resets the expvar stats for this module. Mostly for test purposes.
func ResetStats() {
stats.Init()
stats.Add(numStatementsRx, 0)
stats.Add(numStatementsTx, 0)
stats.Add(numObjectsRx, 0)
stats.Add(numObjectsTx, 0)
stats.Add(numTimeout, 0)
stats.Add(numFlush, 0)
}
// FlushChannel is the type passed to the Queue, if caller wants
// to know when a specific set of statements has been processed.
// to know when a specific set of objects has been processed.
type FlushChannel chan bool
// Request represents a batch of statements to be processed.
type Request struct {
// Request represents a batch of objects to be processed.
type Request[T any] struct {
SequenceNumber int64
Statements []*command.Statement
Objects []T
flushChans []FlushChannel
}
// Close closes a request, closing any associated flush channels.
func (r *Request) Close() {
func (r *Request[T]) Close() {
for _, c := range r.flushChans {
close(c)
}
}
type queuedStatements struct {
type queuedObjects[T any] struct {
SequenceNumber int64
Statements []*command.Statement
Objects []T
flushChan FlushChannel
}
func mergeQueued(qs []*queuedStatements) *Request {
var o *Request
func mergeQueued[T any](qs []*queuedObjects[T]) *Request[T] {
var o *Request[T]
if len(qs) > 0 {
o = &Request{
o = &Request[T]{
SequenceNumber: qs[0].SequenceNumber,
flushChans: make([]FlushChannel, 0),
}
@@ -70,7 +68,7 @@ func mergeQueued(qs []*queuedStatements) *Request {
if o.SequenceNumber < qs[i].SequenceNumber {
o.SequenceNumber = qs[i].SequenceNumber
}
o.Statements = append(o.Statements, qs[i].Statements...)
o.Objects = append(o.Objects, qs[i].Objects...)
if qs[i].flushChan != nil {
o.flushChans = append(o.flushChans, qs[i].flushChan)
}
@@ -79,15 +77,15 @@ func mergeQueued(qs []*queuedStatements) *Request {
}
// Queue is a batching queue with a timeout.
type Queue struct {
type Queue[T any] struct {
maxSize int
batchSize int
timeout time.Duration
batchCh chan *queuedStatements
batchCh chan *queuedObjects[T]
sendCh chan *Request
C <-chan *Request
sendCh chan *Request[T]
C <-chan *Request[T]
done chan struct{}
closed chan struct{}
@@ -101,13 +99,13 @@ type Queue struct {
}
// New returns a instance of a Queue
func New(maxSize, batchSize int, t time.Duration) *Queue {
q := &Queue{
func New[T any](maxSize, batchSize int, t time.Duration) *Queue[T] {
q := &Queue[T]{
maxSize: maxSize,
batchSize: batchSize,
timeout: t,
batchCh: make(chan *queuedStatements, maxSize),
sendCh: make(chan *Request, 1),
batchCh: make(chan *queuedObjects[T], maxSize),
sendCh: make(chan *Request[T], 1),
done: make(chan struct{}),
closed: make(chan struct{}),
flush: make(chan struct{}),
@@ -120,13 +118,13 @@ func New(maxSize, batchSize int, t time.Duration) *Queue {
}
// Write queues a request, and returns a monotonically incrementing
// sequence number associated with the slice of statements. If one
// sequence number associated with the slice of objects. If one
// slice has a larger sequence number than a number, the former slice
// will always be committed to Raft before the latter slice.
//
// c is an optional channel. If non-nil, it will be closed when the Request
// containing these statements is closed.
func (q *Queue) Write(stmts []*command.Statement, c FlushChannel) (int64, error) {
func (q *Queue[T]) Write(objects []T, c FlushChannel) (int64, error) {
select {
case <-q.done:
return 0, errors.New("queue is closed")
@@ -137,23 +135,23 @@ func (q *Queue) Write(stmts []*command.Statement, c FlushChannel) (int64, error)
defer q.seqMu.Unlock()
q.seqNum++
q.batchCh <- &queuedStatements{
q.batchCh <- &queuedObjects[T]{
SequenceNumber: q.seqNum,
Statements: stmts,
Objects: objects,
flushChan: c,
}
stats.Add(numStatementsRx, int64(len(stmts)))
stats.Add(numObjectsRx, int64(len(objects)))
return q.seqNum, nil
}
// Flush flushes the queue
func (q *Queue) Flush() error {
func (q *Queue[T]) Flush() error {
q.flush <- struct{}{}
return nil
}
// Close closes the queue. A closed queue should not be used.
func (q *Queue) Close() error {
func (q *Queue[T]) Close() error {
select {
case <-q.done:
default:
@@ -164,12 +162,12 @@ func (q *Queue) Close() error {
}
// Depth returns the number of queued requests
func (q *Queue) Depth() int {
func (q *Queue[T]) Depth() int {
return len(q.batchCh)
}
// Stats returns stats on this queue.
func (q *Queue) Stats() (map[string]interface{}, error) {
func (q *Queue[T]) Stats() (map[string]interface{}, error) {
return map[string]interface{}{
"max_size": q.maxSize,
"batch_size": q.batchSize,
@@ -177,10 +175,10 @@ func (q *Queue) Stats() (map[string]interface{}, error) {
}, nil
}
func (q *Queue) run() {
func (q *Queue[T]) run() {
defer close(q.closed)
queuedStmts := make([]*queuedStatements, 0)
queuedStmts := make([]*queuedObjects[T], 0)
// Create an initial timer, in the stopped state.
timer := time.NewTimer(0)
<-timer.C
@@ -190,7 +188,7 @@ func (q *Queue) run() {
// implicitly to the other side of sendCh.
req := mergeQueued(queuedStmts)
q.sendCh <- req
stats.Add(numStatementsTx, int64(len(req.Statements)))
stats.Add(numObjectsTx, int64(len(req.Objects)))
queuedStmts = queuedStmts[:0] // Better on the GC than setting to nil.
}

View File

@@ -19,68 +19,68 @@ var (
flushChan2 = make(FlushChannel)
)
func Test_MergeQueuedStatements(t *testing.T) {
if mergeQueued(nil) != nil {
func Test_MergeQueuedObjects(t *testing.T) {
if mergeQueued[*command.Statement](nil) != nil {
t.Fatalf("merging of nil failed")
}
tests := []struct {
qs []*queuedStatements
exp *Request
qs []*queuedObjects[*command.Statement]
exp *Request[*command.Statement]
}{
{
qs: []*queuedStatements{
qs: []*queuedObjects[*command.Statement]{
{1, nil, flushChan1},
},
exp: &Request{1, nil, []FlushChannel{flushChan1}},
exp: &Request[*command.Statement]{1, nil, []FlushChannel{flushChan1}},
},
{
qs: []*queuedStatements{
qs: []*queuedObjects[*command.Statement]{
{1, nil, flushChan1},
{2, testStmtsFoo, nil},
},
exp: &Request{2, testStmtsFoo, []FlushChannel{flushChan1}},
exp: &Request[*command.Statement]{2, testStmtsFoo, []FlushChannel{flushChan1}},
},
{
qs: []*queuedStatements{
qs: []*queuedObjects[*command.Statement]{
{1, testStmtsFoo, nil},
},
exp: &Request{1, testStmtsFoo, nil},
exp: &Request[*command.Statement]{1, testStmtsFoo, nil},
},
{
qs: []*queuedStatements{
qs: []*queuedObjects[*command.Statement]{
{1, testStmtsFoo, nil},
{2, testStmtsBar, nil},
},
exp: &Request{2, testStmtsFooBar, nil},
exp: &Request[*command.Statement]{2, testStmtsFooBar, nil},
},
{
qs: []*queuedStatements{
qs: []*queuedObjects[*command.Statement]{
{1, testStmtsFooBar, nil},
{2, testStmtsFoo, nil},
},
exp: &Request{2, testStmtsFooBarFoo, nil},
exp: &Request[*command.Statement]{2, testStmtsFooBarFoo, nil},
},
{
qs: []*queuedStatements{
qs: []*queuedObjects[*command.Statement]{
{1, testStmtsFooBar, flushChan1},
{2, testStmtsFoo, flushChan2},
},
exp: &Request{2, testStmtsFooBarFoo, []FlushChannel{flushChan1, flushChan2}},
exp: &Request[*command.Statement]{2, testStmtsFooBarFoo, []FlushChannel{flushChan1, flushChan2}},
},
{
qs: []*queuedStatements{
qs: []*queuedObjects[*command.Statement]{
{1, testStmtsFooBar, nil},
{2, testStmtsFoo, flushChan2},
},
exp: &Request{2, testStmtsFooBarFoo, []FlushChannel{flushChan2}},
exp: &Request[*command.Statement]{2, testStmtsFooBarFoo, []FlushChannel{flushChan2}},
},
{
qs: []*queuedStatements{
qs: []*queuedObjects[*command.Statement]{
{2, testStmtsFooBar, nil},
{1, testStmtsFoo, flushChan2},
},
exp: &Request{2, testStmtsFooBarFoo, []FlushChannel{flushChan2}},
exp: &Request[*command.Statement]{2, testStmtsFooBarFoo, []FlushChannel{flushChan2}},
},
}
@@ -89,7 +89,7 @@ func Test_MergeQueuedStatements(t *testing.T) {
if got, exp := r.SequenceNumber, tt.exp.SequenceNumber; got != exp {
t.Fatalf("incorrect sequence number for test %d, exp %d, got %d", i, exp, got)
}
if !reflect.DeepEqual(r.Statements, tt.exp.Statements) {
if !reflect.DeepEqual(r.Objects, tt.exp.Objects) {
t.Fatalf("statements don't match for test %d", i)
}
if len(r.flushChans) != len(tt.exp.flushChans) {
@@ -104,7 +104,7 @@ func Test_MergeQueuedStatements(t *testing.T) {
}
func Test_NewQueue(t *testing.T) {
q := New(1, 1, 100*time.Millisecond)
q := New[*command.Statement](1, 1, 100*time.Millisecond)
if q == nil {
t.Fatalf("failed to create new Queue")
}
@@ -112,7 +112,7 @@ func Test_NewQueue(t *testing.T) {
}
func Test_NewQueueClosedWrite(t *testing.T) {
q := New(1, 1, 100*time.Millisecond)
q := New[*command.Statement](1, 1, 100*time.Millisecond)
if q == nil {
t.Fatalf("failed to create new Queue")
}
@@ -123,7 +123,7 @@ func Test_NewQueueClosedWrite(t *testing.T) {
}
func Test_NewQueueWriteNil(t *testing.T) {
q := New(1, 1, 60*time.Second)
q := New[*command.Statement](1, 1, 60*time.Second)
defer q.Close()
if _, err := q.Write(nil, nil); err != nil {
@@ -132,7 +132,7 @@ func Test_NewQueueWriteNil(t *testing.T) {
}
func Test_NewQueueWriteBatchSizeSingle(t *testing.T) {
q := New(1024, 1, 60*time.Second)
q := New[*command.Statement](1024, 1, 60*time.Second)
defer q.Close()
if _, err := q.Write(testStmtsFoo, nil); err != nil {
@@ -141,10 +141,10 @@ func Test_NewQueueWriteBatchSizeSingle(t *testing.T) {
select {
case req := <-q.C:
if exp, got := 1, len(req.Statements); exp != got {
if exp, got := 1, len(req.Objects); exp != got {
t.Fatalf("received wrong length slice, exp %d, got %d", exp, got)
}
if req.Statements[0].Sql != "SELECT * FROM foo" {
if req.Objects[0].Sql != "SELECT * FROM foo" {
t.Fatalf("received wrong SQL")
}
case <-time.After(5 * time.Second):
@@ -153,7 +153,7 @@ func Test_NewQueueWriteBatchSizeSingle(t *testing.T) {
}
func Test_NewQueueWriteBatchSizeDouble(t *testing.T) {
q := New(1024, 1, 60*time.Second)
q := New[*command.Statement](1024, 1, 60*time.Second)
defer q.Close()
if _, err := q.Write(testStmtsFoo, nil); err != nil {
@@ -166,10 +166,10 @@ func Test_NewQueueWriteBatchSizeDouble(t *testing.T) {
// Just test that I get a batch size, each time.
select {
case req := <-q.C:
if exp, got := 1, len(req.Statements); exp != got {
if exp, got := 1, len(req.Objects); exp != got {
t.Fatalf("received wrong length slice, exp %d, got %d", exp, got)
}
if req.Statements[0].Sql != "SELECT * FROM foo" {
if req.Objects[0].Sql != "SELECT * FROM foo" {
t.Fatalf("received wrong SQL")
}
case <-time.After(5 * time.Second):
@@ -177,10 +177,10 @@ func Test_NewQueueWriteBatchSizeDouble(t *testing.T) {
}
select {
case req := <-q.C:
if exp, got := 1, len(req.Statements); exp != got {
if exp, got := 1, len(req.Objects); exp != got {
t.Fatalf("received wrong length slice, exp %d, got %d", exp, got)
}
if req.Statements[0].Sql != "SELECT * FROM bar" {
if req.Objects[0].Sql != "SELECT * FROM bar" {
t.Fatalf("received wrong SQL")
}
case <-time.After(5 * time.Second):
@@ -189,7 +189,7 @@ func Test_NewQueueWriteBatchSizeDouble(t *testing.T) {
}
func Test_NewQueueWriteNilAndOne(t *testing.T) {
q := New(1024, 2, 60*time.Second)
q := New[*command.Statement](1024, 2, 60*time.Second)
defer q.Close()
if _, err := q.Write(nil, nil); err != nil {
@@ -201,7 +201,7 @@ func Test_NewQueueWriteNilAndOne(t *testing.T) {
select {
case req := <-q.C:
if exp, got := 1, len(req.Statements); exp != got {
if exp, got := 1, len(req.Objects); exp != got {
t.Fatalf("received wrong length slice, exp %d, got %d", exp, got)
}
req.Close()
@@ -211,7 +211,7 @@ func Test_NewQueueWriteNilAndOne(t *testing.T) {
}
func Test_NewQueueWriteBatchSizeSingleChan(t *testing.T) {
q := New(1024, 1, 60*time.Second)
q := New[*command.Statement](1024, 1, 60*time.Second)
defer q.Close()
fc := make(FlushChannel)
@@ -221,10 +221,10 @@ func Test_NewQueueWriteBatchSizeSingleChan(t *testing.T) {
select {
case req := <-q.C:
if exp, got := 1, len(req.Statements); exp != got {
if exp, got := 1, len(req.Objects); exp != got {
t.Fatalf("received wrong length slice, exp %d, got %d", exp, got)
}
if req.Statements[0].Sql != "SELECT * FROM foo" {
if req.Objects[0].Sql != "SELECT * FROM foo" {
t.Fatalf("received wrong SQL")
}
req.Close()
@@ -242,7 +242,7 @@ func Test_NewQueueWriteBatchSizeSingleChan(t *testing.T) {
}
func Test_NewQueueWriteNilSingleChan(t *testing.T) {
q := New(1024, 1, 60*time.Second)
q := New[*command.Statement](1024, 1, 60*time.Second)
defer q.Close()
fc := make(FlushChannel)
@@ -252,7 +252,7 @@ func Test_NewQueueWriteNilSingleChan(t *testing.T) {
select {
case req := <-q.C:
if req.Statements != nil {
if req.Objects != nil {
t.Fatalf("statements slice is not nil")
}
if len(req.flushChans) != 1 && req.flushChans[0] != fc {
@@ -273,7 +273,7 @@ func Test_NewQueueWriteNilSingleChan(t *testing.T) {
}
func Test_NewQueueWriteBatchSizeMulti(t *testing.T) {
q := New(1024, 5, 60*time.Second)
q := New[*command.Statement](1024, 5, 60*time.Second)
defer q.Close()
// Write a batch size and wait for it.
@@ -284,7 +284,7 @@ func Test_NewQueueWriteBatchSizeMulti(t *testing.T) {
}
select {
case req := <-q.C:
if len(req.Statements) != 5 {
if len(req.Objects) != 5 {
t.Fatalf("received wrong length slice")
}
if q.numTimeouts != 0 {
@@ -302,7 +302,7 @@ func Test_NewQueueWriteBatchSizeMulti(t *testing.T) {
}
select {
case req := <-q.C:
if len(req.Statements) < 5 {
if len(req.Objects) < 5 {
t.Fatalf("received too-short slice")
}
if q.numTimeouts != 0 {
@@ -314,7 +314,7 @@ func Test_NewQueueWriteBatchSizeMulti(t *testing.T) {
}
func Test_NewQueueWriteTimeout(t *testing.T) {
q := New(1024, 10, 1*time.Second)
q := New[*command.Statement](1024, 10, 1*time.Second)
defer q.Close()
if _, err := q.Write(testStmtsFoo, nil); err != nil {
@@ -323,10 +323,10 @@ func Test_NewQueueWriteTimeout(t *testing.T) {
select {
case req := <-q.C:
if len(req.Statements) != 1 {
if len(req.Objects) != 1 {
t.Fatalf("received wrong length slice")
}
if req.Statements[0].Sql != "SELECT * FROM foo" {
if req.Objects[0].Sql != "SELECT * FROM foo" {
t.Fatalf("received wrong SQL")
}
if q.numTimeouts != 1 {
@@ -340,7 +340,7 @@ func Test_NewQueueWriteTimeout(t *testing.T) {
// Test_NewQueueWriteTimeoutMulti ensures that timer expiring
// twice in a row works fine.
func Test_NewQueueWriteTimeoutMulti(t *testing.T) {
q := New(1024, 10, 1*time.Second)
q := New[*command.Statement](1024, 10, 1*time.Second)
defer q.Close()
if _, err := q.Write(testStmtsFoo, nil); err != nil {
@@ -348,10 +348,10 @@ func Test_NewQueueWriteTimeoutMulti(t *testing.T) {
}
select {
case req := <-q.C:
if len(req.Statements) != 1 {
if len(req.Objects) != 1 {
t.Fatalf("received wrong length slice")
}
if req.Statements[0].Sql != "SELECT * FROM foo" {
if req.Objects[0].Sql != "SELECT * FROM foo" {
t.Fatalf("received wrong SQL")
}
if q.numTimeouts != 1 {
@@ -366,10 +366,10 @@ func Test_NewQueueWriteTimeoutMulti(t *testing.T) {
}
select {
case req := <-q.C:
if len(req.Statements) != 1 {
if len(req.Objects) != 1 {
t.Fatalf("received wrong length slice")
}
if req.Statements[0].Sql != "SELECT * FROM foo" {
if req.Objects[0].Sql != "SELECT * FROM foo" {
t.Fatalf("received wrong SQL")
}
if q.numTimeouts != 2 {
@@ -383,7 +383,7 @@ func Test_NewQueueWriteTimeoutMulti(t *testing.T) {
// Test_NewQueueWriteTimeoutBatch ensures that timer expiring
// followed by a batch, works fine.
func Test_NewQueueWriteTimeoutBatch(t *testing.T) {
q := New(1024, 2, 1*time.Second)
q := New[*command.Statement](1024, 2, 1*time.Second)
defer q.Close()
if _, err := q.Write(testStmtsFoo, nil); err != nil {
@@ -392,10 +392,10 @@ func Test_NewQueueWriteTimeoutBatch(t *testing.T) {
select {
case req := <-q.C:
if len(req.Statements) != 1 {
if len(req.Objects) != 1 {
t.Fatalf("received wrong length slice")
}
if req.Statements[0].Sql != "SELECT * FROM foo" {
if req.Objects[0].Sql != "SELECT * FROM foo" {
t.Fatalf("received wrong SQL")
}
if q.numTimeouts != 1 {
@@ -414,10 +414,10 @@ func Test_NewQueueWriteTimeoutBatch(t *testing.T) {
select {
case req := <-q.C:
// Should happen before the timeout expires.
if len(req.Statements) != 2 {
if len(req.Objects) != 2 {
t.Fatalf("received wrong length slice")
}
if req.Statements[0].Sql != "SELECT * FROM foo" {
if req.Objects[0].Sql != "SELECT * FROM foo" {
t.Fatalf("received wrong SQL")
}
if q.numTimeouts != 1 {