Fix bug in queuer

This commit is contained in:
Philip O'Toole
2022-05-17 10:16:45 -04:00
parent 523f11ac0f
commit d52946aac1
3 changed files with 88 additions and 7 deletions

View File

@@ -76,7 +76,7 @@ func (q *Queue) run() {
timer := time.NewTimer(q.timeout)
timer.Stop()
writeFn := func(stmts []*command.Statement) {
writeFn := func() {
newStmts := make([]*command.Statement, len(stmts))
copy(newStmts, stmts)
q.sendCh <- newStmts
@@ -93,12 +93,12 @@ func (q *Queue) run() {
timer.Reset(q.timeout)
}
if len(stmts) >= q.batchSize {
writeFn(stmts)
writeFn()
}
case <-timer.C:
writeFn(stmts)
writeFn()
case <-q.flush:
writeFn(stmts)
writeFn()
case <-q.done:
timer.Stop()
return

View File

@@ -104,3 +104,82 @@ func Test_NewQueueWriteTimeout(t *testing.T) {
t.Fatalf("timed out waiting for statement")
}
}
// Test_NewQueueWriteTimeoutMulti ensures that timer expiring
// twice in a row works fine.
func Test_NewQueueWriteTimeoutMulti(t *testing.T) {
q := New(1024, 10, 1*time.Second)
defer q.Close()
if err := q.Write(testStmt); err != nil {
t.Fatalf("failed to write: %s", err.Error())
}
select {
case stmts := <-q.C:
if len(stmts) != 1 {
t.Fatalf("received wrong length slice")
}
if stmts[0].Sql != "SELECT * FROM foo" {
t.Fatalf("received wrong SQL")
}
case <-time.NewTimer(5 * time.Second).C:
t.Fatalf("timed out waiting for first statement")
}
if err := q.Write(testStmt); err != nil {
t.Fatalf("failed to write: %s", err.Error())
}
select {
case stmts := <-q.C:
if len(stmts) != 1 {
t.Fatalf("received wrong length slice")
}
if stmts[0].Sql != "SELECT * FROM foo" {
t.Fatalf("received wrong SQL")
}
case <-time.NewTimer(5 * time.Second).C:
t.Fatalf("timed out waiting for second statement")
}
}
// Test_NewQueueWriteTimeoutBatch ensures that timer expiring
// followed by a batch, works fine.
func Test_NewQueueWriteTimeoutBatch(t *testing.T) {
q := New(1024, 2, 2*time.Second)
defer q.Close()
if err := q.Write(testStmt); err != nil {
t.Fatalf("failed to write: %s", err.Error())
}
select {
case stmts := <-q.C:
if len(stmts) != 1 {
t.Fatalf("received wrong length slice")
}
if stmts[0].Sql != "SELECT * FROM foo" {
t.Fatalf("received wrong SQL")
}
case <-time.NewTimer(5 * time.Second).C:
t.Fatalf("timed out waiting for statement")
}
if err := q.Write(testStmt); err != nil {
t.Fatalf("failed to write: %s", err.Error())
}
if err := q.Write(testStmt); err != nil {
t.Fatalf("failed to write: %s", err.Error())
}
select {
case stmts := <-q.C:
if len(stmts) != 2 {
t.Fatalf("received wrong length slice")
}
if stmts[0].Sql != "SELECT * FROM foo" {
t.Fatalf("received wrong SQL")
}
case <-time.NewTimer(500 * time.Millisecond).C:
// Should happen before the timeout expires.
t.Fatalf("timed out waiting for statement")
}
}