Don't generate CDC events for COMMIT-only operations

This commit is contained in:
Philip O'Toole
2025-09-11 10:17:30 -04:00
committed by GitHub
parent dbafbeb13c
commit 72533950b2
5 changed files with 81 additions and 78 deletions

View File

@@ -25,6 +25,7 @@
- [PR #2311](https://github.com/rqlite/rqlite/pull/2311): Test all CDC event types.
- [PR #2312](https://github.com/rqlite/rqlite/pull/2312): Upgrade Go dependencies.
- [PR #2313](https://github.com/rqlite/rqlite/pull/2313): Database layer supports querying just for a table's column types.
- [PR #2314](https://github.com/rqlite/rqlite/pull/2314): Don't generate CDC events for COMMIT-only operations.
## v8.43.4 (August 27th 2025)
### Implementation changes and bug fixes

View File

@@ -53,6 +53,13 @@ func (s *CDCStreamer) PreupdateHook(ev *command.CDCEvent) error {
// CommitHook is called after the transaction is committed. It sends the
// pending events to the out channel and clears the pending events.
func (s *CDCStreamer) CommitHook() bool {
if len(s.pending.Events) == 0 {
// No CDC events to send, but let the transaction proceed.
// CREATE TABLE statements, for example, result in a COMMIT
// but do not generate CDC events.
return true
}
s.pending.CommitTimestamp = time.Now().UnixNano()
select {
case s.out <- s.pending:

View File

@@ -3,11 +3,12 @@ package db
import (
"reflect"
"testing"
"time"
command "github.com/rqlite/rqlite/v8/command/proto"
)
func Test_NewCDCStreamer(t *testing.T) {
func Test_CDCStreamer_New(t *testing.T) {
ch := make(chan *command.CDCIndexedEventGroup, 10)
streamer := NewCDCStreamer(ch)
if streamer == nil {
@@ -21,33 +22,7 @@ func Test_NewCDCStreamer(t *testing.T) {
}
}
func Test_NewCDCStreamer_CommitEmpty(t *testing.T) {
ch := make(chan *command.CDCIndexedEventGroup, 10)
streamer := NewCDCStreamer(ch)
streamer.Reset(1234)
streamer.CommitHook()
if len(streamer.pending.Events) != 0 {
t.Fatalf("expected no pending events after commit, got %d", len(streamer.pending.Events))
}
events := <-ch
if events.Index != 1234 {
t.Fatalf("expected index value to be 1234, got %d", events.Index)
}
if len(events.Events) != 0 {
t.Fatalf("expected no events, got %d", len(events.Events))
}
if len(ch) != 0 {
t.Fatalf("expected channel to be empty after commit, got %d", len(ch))
}
if err := streamer.Close(); err != nil {
t.Fatalf("expected no error on close, got %v", err)
}
}
func Test_NewCDCStreamer_CommitOne(t *testing.T) {
func Test_CDCStreamer_CommitOne(t *testing.T) {
ch := make(chan *command.CDCIndexedEventGroup, 10)
streamer := NewCDCStreamer(ch)
@@ -67,15 +42,19 @@ func Test_NewCDCStreamer_CommitOne(t *testing.T) {
t.Fatalf("expected no pending events after commit, got %d", len(streamer.pending.Events))
}
ev := <-ch
if ev.Index != 5678 {
t.Fatalf("expected index value to be 5678, got %d", ev.Index)
}
if len(ev.Events) != 1 {
t.Fatalf("expected 1 event, got %d", len(ev.Events))
}
if !reflect.DeepEqual(change, ev.Events[0]) {
t.Fatalf("received event does not match sent event: expected %v, got %v", change, ev.Events[0])
select {
case ev := <-ch:
if ev.Index != 5678 {
t.Fatalf("expected index value to be 5678, got %d", ev.Index)
}
if len(ev.Events) != 1 {
t.Fatalf("expected 1 event, got %d", len(ev.Events))
}
if !reflect.DeepEqual(change, ev.Events[0]) {
t.Fatalf("received event does not match sent event: expected %v, got %v", change, ev.Events[0])
}
case <-time.After(1 * time.Second):
t.Fatalf("timeout waiting for event on channel")
}
if err := streamer.Close(); err != nil {
@@ -83,7 +62,7 @@ func Test_NewCDCStreamer_CommitOne(t *testing.T) {
}
}
func Test_NewCDCStreamer_CommitTwo(t *testing.T) {
func Test_CDCStreamer_CommitTwo(t *testing.T) {
ch := make(chan *command.CDCIndexedEventGroup, 10)
streamer := NewCDCStreamer(ch)
@@ -113,18 +92,22 @@ func Test_NewCDCStreamer_CommitTwo(t *testing.T) {
t.Fatalf("expected no pending events after commit, got %d", len(streamer.pending.Events))
}
ev := <-ch
if ev.Index != 9012 {
t.Fatalf("expected index value to be 9012, got %d", ev.GetIndex())
}
if len(ev.Events) != 2 {
t.Fatalf("expected 2 events, got %d", len(ev.Events))
}
if !reflect.DeepEqual(change1, ev.Events[0]) {
t.Fatalf("received first event does not match sent event: expected %v, got %v", change1, ev.Events[0])
}
if !reflect.DeepEqual(change2, ev.Events[1]) {
t.Fatalf("received second event does not match sent event: expected %v, got %v", change2, ev.Events[1])
select {
case ev := <-ch:
if ev.Index != 9012 {
t.Fatalf("expected index value to be 9012, got %d", ev.GetIndex())
}
if len(ev.Events) != 2 {
t.Fatalf("expected 2 events, got %d", len(ev.Events))
}
if !reflect.DeepEqual(change1, ev.Events[0]) {
t.Fatalf("received first event does not match sent event: expected %v, got %v", change1, ev.Events[0])
}
if !reflect.DeepEqual(change2, ev.Events[1]) {
t.Fatalf("received second event does not match sent event: expected %v, got %v", change2, ev.Events[1])
}
case <-time.After(1 * time.Second):
t.Fatalf("timeout waiting for event on channel")
}
if err := streamer.Close(); err != nil {
@@ -135,7 +118,7 @@ func Test_NewCDCStreamer_CommitTwo(t *testing.T) {
// Test_NewCDCStreamer_ResetThenPreupdate tests the behavior of the CDCStreamer
// when predupdate is called followed by a reset. It ensures that the reset
// clears out any pending events.
func Test_NewCDCStreamer_ResetThenPreupdate(t *testing.T) {
func Test_CDCStreamer_ResetThenPreupdate(t *testing.T) {
ch := make(chan *command.CDCIndexedEventGroup, 10)
streamer := NewCDCStreamer(ch)
@@ -172,15 +155,19 @@ func Test_NewCDCStreamer_ResetThenPreupdate(t *testing.T) {
t.Fatalf("expected no pending events after commit, got %d", len(streamer.pending.Events))
}
ev := <-ch
if ev.Index != 5678 {
t.Fatalf("expected K value to be 5678, got %d", ev.Index)
}
if len(ev.Events) != 1 {
t.Fatalf("expected 1 event, got %d", len(ev.Events))
}
if !reflect.DeepEqual(change2, ev.Events[0]) {
t.Fatalf("received event does not match sent event: expected %v, got %v", change2, ev.Events[0])
select {
case ev := <-ch:
if ev.Index != 5678 {
t.Fatalf("expected K value to be 5678, got %d", ev.Index)
}
if len(ev.Events) != 1 {
t.Fatalf("expected 1 event, got %d", len(ev.Events))
}
if !reflect.DeepEqual(change2, ev.Events[0]) {
t.Fatalf("received event does not match sent event: expected %v, got %v", change2, ev.Events[0])
}
case <-time.After(1 * time.Second):
t.Fatalf("timeout waiting for event on channel")
}
if err := streamer.Close(); err != nil {

View File

@@ -50,8 +50,8 @@ func Test_CDC_SingleNode(t *testing.T) {
}
testPoll(t, func() (bool, error) {
// 1 create, 1 insert, 1 update, 1 delete
return testEndpoint.GetMessageCount() == 4, nil
// 1 insert, 1 update, 1 delete
return testEndpoint.GetMessageCount() == 3, nil
}, 100*time.Millisecond, 10*time.Second)
}
@@ -112,8 +112,8 @@ func Test_CDC_SingleNode_Snapshot(t *testing.T) {
}
testPoll(t, func() (bool, error) {
// 1 create, 1 insert, 1 update, 1 delete
return testEndpoint.GetMessageCount() == 4, nil
// 1 insert, 1 update, 1 delete
return testEndpoint.GetMessageCount() == 3, nil
}, 100*time.Millisecond, 10*time.Second)
}
@@ -163,8 +163,8 @@ func Test_CDC_SingleNode_LaterStart(t *testing.T) {
defer testEndpoint.Close()
testPoll(t, func() (bool, error) {
// 1 create, 1 insert, 1 update, 1 delete
return testEndpoint.GetMessageCount() == 4, nil
// 1 insert, 1 update, 1 delete
return testEndpoint.GetMessageCount() == 3, nil
}, 100*time.Millisecond, 5*time.Second)
}
@@ -209,8 +209,8 @@ func Test_CDC_SingleNode_PostLoadBoot(t *testing.T) {
defer testEndpoint.Close()
testPoll(t, func() (bool, error) {
// 1 create, 1 insert, 1 update, 1 delete
return testEndpoint.GetMessageCount() == 4, nil
// 1 insert, 1 update, 1 delete
return testEndpoint.GetMessageCount() == 3, nil
}, 100*time.Millisecond, 5*time.Second)
// Load the node, and ensure CDC continues to work.
@@ -221,8 +221,12 @@ func Test_CDC_SingleNode_PostLoadBoot(t *testing.T) {
if err != nil {
t.Fatalf("failed to create table: %v", err)
}
_, err = node.Execute(`INSERT INTO qux1 (id, name) VALUES (100, 'Declan')`)
if err != nil {
t.Fatalf("failed to insert data: %v", err)
}
testPoll(t, func() (bool, error) {
return testEndpoint.GetMessageCount() == 5, nil
return testEndpoint.GetMessageCount() == 4, nil
}, 100*time.Millisecond, 5*time.Second)
// Boot the node, and ensure CDC continues to work.
@@ -233,8 +237,12 @@ func Test_CDC_SingleNode_PostLoadBoot(t *testing.T) {
if err != nil {
t.Fatalf("failed to create table: %v", err)
}
_, err = node.Execute(`INSERT INTO qux2 (id, name) VALUES (100, 'Carol')`)
if err != nil {
t.Fatalf("failed to insert data: %v", err)
}
testPoll(t, func() (bool, error) {
return testEndpoint.GetMessageCount() == 6, nil
return testEndpoint.GetMessageCount() == 5, nil
}, 100*time.Millisecond, 5*time.Second)
}
@@ -299,8 +307,8 @@ func Test_CDC_MultiNode(t *testing.T) {
}
testPoll(t, func() (bool, error) {
// 1 create, 1 insert, 1 update, 1 delete
return testEndpoint.GetMessageCount() == 4, nil
// 1 insert, 1 update, 1 delete
return testEndpoint.GetMessageCount() == 3, nil
}, 100*time.Millisecond, 10*time.Second)
hi := testEndpoint.GetHighestMessageIndex()

View File

@@ -27,7 +27,7 @@ class TestSingleNode_CDC(unittest.TestCase):
j = n.execute('INSERT INTO bar(name) VALUES("fiona")')
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
server.wait_message_count(2)
server.wait_message_count(1)
deprovision_node(n)
@@ -62,8 +62,8 @@ class TestMultiNode_CDC(unittest.TestCase):
j = leader.execute('INSERT INTO foo(name) VALUES("bob")')
self.assertEqual(j, d_("{'results': [{'last_insert_id': 2, 'rows_affected': 1}]}"))
# Ensure just the right number of events are sent (3: CREATE TABLE + 2 INSERTs)
server.wait_message_count(3)
# Ensure just the right number of events are sent (2: 2 INSERTs)
server.wait_message_count(2)
# Clean up
cluster.deprovision()
@@ -100,7 +100,7 @@ class TestMultiNode_CDC(unittest.TestCase):
self.assertEqual(j, d_("{'results': [{'last_insert_id': 2, 'rows_affected': 1}]}"))
# Wait for initial events
server.wait_message_count(3)
server.wait_message_count(2)
# Stop the current leader
leader.stop()
@@ -112,8 +112,8 @@ class TestMultiNode_CDC(unittest.TestCase):
j = new_leader.execute('INSERT INTO foo(name) VALUES("charlie")')
self.assertEqual(j, d_("{'results': [{'last_insert_id': 3, 'rows_affected': 1}]}"))
# Ensure just that event is sent (total should now be 4)
server.wait_message_count(4)
# Ensure just that event is sent
server.wait_message_count(3)
# Clean up
cluster.deprovision()