mirror of
https://github.com/rqlite/rqlite.git
synced 2026-01-25 04:16:26 +00:00
Queue orderering unit test
This commit is contained in:
@@ -103,7 +103,7 @@ func Test_MergeQueuedObjects(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func Test_NewQueue(t *testing.T) {
|
||||
func Test_QueueNew(t *testing.T) {
|
||||
q := New[*command.Statement](1, 1, 100*time.Millisecond)
|
||||
if q == nil {
|
||||
t.Fatalf("failed to create new Queue")
|
||||
@@ -111,7 +111,7 @@ func Test_NewQueue(t *testing.T) {
|
||||
defer q.Close()
|
||||
}
|
||||
|
||||
func Test_NewQueueClosedWrite(t *testing.T) {
|
||||
func Test_QueueClosedWrite(t *testing.T) {
|
||||
q := New[*command.Statement](1, 1, 100*time.Millisecond)
|
||||
if q == nil {
|
||||
t.Fatalf("failed to create new Queue")
|
||||
@@ -122,7 +122,7 @@ func Test_NewQueueClosedWrite(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func Test_NewQueueWriteNil(t *testing.T) {
|
||||
func Test_QueueWriteNil(t *testing.T) {
|
||||
q := New[*command.Statement](1, 1, 60*time.Second)
|
||||
defer q.Close()
|
||||
|
||||
@@ -131,7 +131,7 @@ func Test_NewQueueWriteNil(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func Test_NewQueueWriteOne(t *testing.T) {
|
||||
func Test_QueueWriteOne(t *testing.T) {
|
||||
q := New[*command.Statement](1024, 1, 60*time.Second)
|
||||
defer q.Close()
|
||||
|
||||
@@ -152,7 +152,7 @@ func Test_NewQueueWriteOne(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func Test_NewQueueWriteBatchSizeSingle(t *testing.T) {
|
||||
func Test_QueueWriteBatchSizeSingle(t *testing.T) {
|
||||
q := New[*command.Statement](1024, 1, 60*time.Second)
|
||||
defer q.Close()
|
||||
|
||||
@@ -173,7 +173,7 @@ func Test_NewQueueWriteBatchSizeSingle(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func Test_NewQueueWriteBatchSizeDouble(t *testing.T) {
|
||||
func Test_QueueWriteBatchSizeDouble(t *testing.T) {
|
||||
q := New[*command.Statement](1024, 1, 60*time.Second)
|
||||
defer q.Close()
|
||||
|
||||
@@ -209,7 +209,7 @@ func Test_NewQueueWriteBatchSizeDouble(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func Test_NewQueueWriteNilAndOne(t *testing.T) {
|
||||
func Test_QueueWriteNilAndOne(t *testing.T) {
|
||||
q := New[*command.Statement](1024, 2, 60*time.Second)
|
||||
defer q.Close()
|
||||
|
||||
@@ -231,7 +231,7 @@ func Test_NewQueueWriteNilAndOne(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func Test_NewQueueWriteBatchSizeSingleChan(t *testing.T) {
|
||||
func Test_QueueWriteBatchSizeSingleChan(t *testing.T) {
|
||||
q := New[*command.Statement](1024, 1, 60*time.Second)
|
||||
defer q.Close()
|
||||
|
||||
@@ -262,7 +262,7 @@ func Test_NewQueueWriteBatchSizeSingleChan(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func Test_NewQueueWriteNilSingleChan(t *testing.T) {
|
||||
func Test_QueueWriteNilSingleChan(t *testing.T) {
|
||||
q := New[*command.Statement](1024, 1, 60*time.Second)
|
||||
defer q.Close()
|
||||
|
||||
@@ -293,7 +293,7 @@ func Test_NewQueueWriteNilSingleChan(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func Test_NewQueueWriteBatchSizeMulti(t *testing.T) {
|
||||
func Test_QueueWriteBatchSizeMulti(t *testing.T) {
|
||||
q := New[*command.Statement](1024, 5, 60*time.Second)
|
||||
defer q.Close()
|
||||
|
||||
@@ -334,7 +334,7 @@ func Test_NewQueueWriteBatchSizeMulti(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func Test_NewQueueWriteTimeout(t *testing.T) {
|
||||
func Test_QueueWriteTimeout(t *testing.T) {
|
||||
q := New[*command.Statement](1024, 10, 1*time.Second)
|
||||
defer q.Close()
|
||||
|
||||
@@ -358,9 +358,9 @@ func Test_NewQueueWriteTimeout(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// Test_NewQueueWriteTimeoutMulti ensures that timer expiring
|
||||
// Test_QueueWriteTimeoutMulti ensures that timer expiring
|
||||
// twice in a row works fine.
|
||||
func Test_NewQueueWriteTimeoutMulti(t *testing.T) {
|
||||
func Test_QueueWriteTimeoutMulti(t *testing.T) {
|
||||
q := New[*command.Statement](1024, 10, 1*time.Second)
|
||||
defer q.Close()
|
||||
|
||||
@@ -401,9 +401,9 @@ func Test_NewQueueWriteTimeoutMulti(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// Test_NewQueueWriteTimeoutBatch ensures that timer expiring
|
||||
// Test_QueueWriteTimeoutBatch ensures that timer expiring
|
||||
// followed by a batch, works fine.
|
||||
func Test_NewQueueWriteTimeoutBatch(t *testing.T) {
|
||||
func Test_QueueWriteTimeoutBatch(t *testing.T) {
|
||||
q := New[*command.Statement](1024, 2, 1*time.Second)
|
||||
defer q.Close()
|
||||
|
||||
@@ -448,3 +448,52 @@ func Test_NewQueueWriteTimeoutBatch(t *testing.T) {
|
||||
t.Fatalf("timed out waiting for statement")
|
||||
}
|
||||
}
|
||||
|
||||
type testObj struct {
|
||||
id int
|
||||
}
|
||||
|
||||
// Test_QueueOrdering ensures that ordering is preserved in the queue.
|
||||
func Test_QueueOrdering(t *testing.T) {
|
||||
batchTimeout := 100 * time.Millisecond
|
||||
testTimeout := 3 * batchTimeout
|
||||
|
||||
q := New[*testObj](100, 13, batchTimeout)
|
||||
if q == nil {
|
||||
t.Fatalf("failed to create new Queue")
|
||||
}
|
||||
defer q.Close()
|
||||
|
||||
num := 1063
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
expIdx := 0
|
||||
for req := range q.C {
|
||||
for _, r := range req.Objects {
|
||||
if r.id != expIdx {
|
||||
t.Errorf("out of order: exp %d, got %d", expIdx, r.id)
|
||||
return
|
||||
}
|
||||
expIdx++
|
||||
}
|
||||
req.Close()
|
||||
if expIdx == num {
|
||||
close(done)
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for i := range num {
|
||||
if _, err := q.Write([]*testObj{{i}}, nil); err != nil {
|
||||
t.Fatalf("failed to write: %s", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(testTimeout):
|
||||
t.Fatalf("timed out waiting for all indexes")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user