Test all database CDC types

This commit is contained in:
Philip O'Toole
2025-09-11 09:02:22 -04:00
committed by GitHub
parent 09439d3813
commit dbafbeb13c
5 changed files with 384 additions and 71 deletions

View File

@@ -22,6 +22,7 @@
- [PR #2308](https://github.com/rqlite/rqlite/pull/2308): Correctly implement Batching Queue flushing.
- [PR #2309](https://github.com/rqlite/rqlite/pull/2309): Integrate Store Snapshotting and CDC Service.
- [PR #2310](https://github.com/rqlite/rqlite/pull/2310): Correct key deletion in BoltDB.
- [PR #2311](https://github.com/rqlite/rqlite/pull/2311): Test all CDC event types.
- [PR #2312](https://github.com/rqlite/rqlite/pull/2312): Upgrade Go dependencies.
- [PR #2313](https://github.com/rqlite/rqlite/pull/2313): Database layer supports querying just for a table's column types.

69
command/comparators.go Normal file
View File

@@ -0,0 +1,69 @@
package command
import (
"bytes"
"github.com/rqlite/rqlite/v8/command/proto"
)
// CDCValueEqual returns true if two CDCValue objects are equal.
func CDCValueEqual(v1, v2 *proto.CDCValue) bool {
if v1 == nil && v2 == nil {
return true
}
if v1 == nil || v2 == nil {
return false
}
if v1.GetValue() == nil && v2.GetValue() == nil {
return true
}
switch v1.GetValue().(type) {
case *proto.CDCValue_I:
v2i, ok := v2.GetValue().(*proto.CDCValue_I)
if !ok {
return false
}
return v1.GetI() == v2i.I
case *proto.CDCValue_D:
v2d, ok := v2.GetValue().(*proto.CDCValue_D)
if !ok {
return false
}
return v1.GetD() == v2d.D
case *proto.CDCValue_S:
v2s, ok := v2.GetValue().(*proto.CDCValue_S)
if !ok {
return false
}
return v1.GetS() == v2s.S
case *proto.CDCValue_B:
v2b, ok := v2.GetValue().(*proto.CDCValue_B)
if !ok {
return false
}
return v1.GetB() == v2b.B
case *proto.CDCValue_Y:
v2y, ok := v2.GetValue().(*proto.CDCValue_Y)
if !ok {
return false
}
return bytes.Equal(v1.GetY(), v2y.Y)
default:
return false
}
}
// CDCValuesEqual returns true if two slices of CDCValue objects are equal.
func CDCValuesEqual(v1, v2 []*proto.CDCValue) bool {
if len(v1) != len(v2) {
return false
}
for i := range v1 {
if !CDCValueEqual(v1[i], v2[i]) {
return false
}
}
return true
}

View File

@@ -0,0 +1,65 @@
package command
import (
"testing"
"github.com/rqlite/rqlite/v8/command/proto"
)
func Test_CDCValueComparator(t *testing.T) {
for i, tt := range []struct {
v1 *proto.CDCValue
v2 *proto.CDCValue
equal bool
}{
{v1: nil, v2: nil, equal: true},
{v1: nil, v2: &proto.CDCValue{}, equal: false},
{v1: &proto.CDCValue{}, v2: nil, equal: false},
{v1: &proto.CDCValue{Value: &proto.CDCValue_I{I: 1}}, v2: &proto.CDCValue{Value: &proto.CDCValue_I{I: 1}}, equal: true},
{v1: &proto.CDCValue{Value: &proto.CDCValue_I{I: 1}}, v2: &proto.CDCValue{Value: &proto.CDCValue_I{I: 2}}, equal: false},
{v1: &proto.CDCValue{Value: &proto.CDCValue_I{I: 1}}, v2: &proto.CDCValue{Value: &proto.CDCValue_D{D: 1.0}}, equal: false},
{v1: &proto.CDCValue{Value: &proto.CDCValue_D{D: 1.0}}, v2: &proto.CDCValue{Value: &proto.CDCValue_D{D: 1.0}}, equal: true},
{v1: &proto.CDCValue{Value: &proto.CDCValue_D{D: 1.0}}, v2: &proto.CDCValue{Value: &proto.CDCValue_D{D: 2.0}}, equal: false},
{v1: &proto.CDCValue{Value: &proto.CDCValue_D{D: 1.0}}, v2: &proto.CDCValue{Value: &proto.CDCValue_I{I: 1}}, equal: false},
{v1: &proto.CDCValue{Value: &proto.CDCValue_S{S: "foo"}}, v2: &proto.CDCValue{Value: &proto.CDCValue_S{S: "foo"}}, equal: true},
{v1: &proto.CDCValue{Value: &proto.CDCValue_S{S: "foo"}}, v2: &proto.CDCValue{Value: &proto.CDCValue_S{S: "bar"}}, equal: false},
{v1: &proto.CDCValue{Value: &proto.CDCValue_S{S: "foo"}}, v2: &proto.CDCValue{Value: &proto.CDCValue_I{I: 1}}, equal: false},
{v1: &proto.CDCValue{Value: &proto.CDCValue_B{B: true}}, v2: &proto.CDCValue{Value: &proto.CDCValue_B{B: true}}, equal: true},
{v1: &proto.CDCValue{Value: &proto.CDCValue_B{B: true}}, v2: &proto.CDCValue{Value: &proto.CDCValue_B{B: false}}, equal: false},
{v1: &proto.CDCValue{Value: &proto.CDCValue_B{B: true}}, v2: &proto.CDCValue{Value: &proto.CDCValue_I{I: 1}}, equal: false},
{v1: &proto.CDCValue{Value: &proto.CDCValue_Y{Y: []byte{0x01, 0x02}}}, v2: &proto.CDCValue{Value: &proto.CDCValue_Y{Y: []byte{0x01, 0x02}}}, equal: true},
{v1: &proto.CDCValue{Value: &proto.CDCValue_Y{Y: []byte{0x01, 0x02}}}, v2: &proto.CDCValue{Value: &proto.CDCValue_Y{Y: []byte{0x01, 0x03}}}, equal: false},
{v1: &proto.CDCValue{Value: &proto.CDCValue_Y{Y: []byte{0x01, 0x02}}}, v2: &proto.CDCValue{Value: &proto.CDCValue_I{I: 1}}, equal: false},
} {
eq := CDCValueEqual(tt.v1, tt.v2)
if eq != tt.equal {
t.Fatalf("case %d: unexpected equality result: got %v, want %v", i, eq, tt.equal)
}
}
}
func Test_CDCValuesComparator(t *testing.T) {
for i, tt := range []struct {
v1 []*proto.CDCValue
v2 []*proto.CDCValue
equal bool
}{
{v1: nil, v2: nil, equal: true},
{v1: nil, v2: []*proto.CDCValue{}, equal: true},
{v1: []*proto.CDCValue{}, v2: nil, equal: true},
{v1: []*proto.CDCValue{}, v2: []*proto.CDCValue{}, equal: true},
{v1: []*proto.CDCValue{{Value: &proto.CDCValue_I{I: 1}}}, v2: []*proto.CDCValue{{Value: &proto.CDCValue_I{I: 1}}}, equal: true},
{v1: []*proto.CDCValue{{Value: &proto.CDCValue_I{I: 1}}}, v2: []*proto.CDCValue{{Value: &proto.CDCValue_I{I: 2}}}, equal: false},
{v1: []*proto.CDCValue{{Value: &proto.CDCValue_I{I: 1}}}, v2: []*proto.CDCValue{{Value: &proto.CDCValue_D{D: 1.0}}}, equal: false},
{v1: []*proto.CDCValue{{Value: &proto.CDCValue_I{I: 1}}, {Value: &proto.CDCValue_S{S: "foo"}}}, v2: []*proto.CDCValue{{Value: &proto.CDCValue_I{I: 1}}, {Value: &proto.CDCValue_S{S: "foo"}}}, equal: true},
{v1: []*proto.CDCValue{{Value: &proto.CDCValue_I{I: 1}}, {Value: &proto.CDCValue_S{S: "foo"}}}, v2: []*proto.CDCValue{{Value: &proto.CDCValue_I{I: 1}}, {Value: &proto.CDCValue_S{S: "bar"}}}, equal: false},
{v1: []*proto.CDCValue{{Value: &proto.CDCValue_I{I: 1}}, {Value: &proto.CDCValue_S{S: "foo"}}}, v2: []*proto.CDCValue{{Value: &proto.CDCValue_I{I: 1}}, {Value: &proto.CDCValue_I{I: 1}}}, equal: false},
{v1: []*proto.CDCValue{{Value: &proto.CDCValue_I{I: 1}}, {Value: &proto.CDCValue_S{S: "foo"}}}, v2: []*proto.CDCValue{{Value: &proto.CDCValue_I{I: 1}}}, equal: false},
{v1: []*proto.CDCValue{{Value: &proto.CDCValue_I{I: 1}}}, v2: []*proto.CDCValue{{Value: &proto.CDCValue_I{I: 1}}, {Value: &proto.CDCValue_S{S: "foo"}}}, equal: false},
} {
eq := CDCValuesEqual(tt.v1, tt.v2)
if eq != tt.equal {
t.Fatalf("case %d: unexpected equality result: got %v, want %v", i, eq, tt.equal)
}
}
}

View File

@@ -1859,18 +1859,10 @@ func normalizeCDCValues(row []any) (*command.CDCRow, error) {
},
}
case []byte:
if true {
cdcRow.Values[i] = &command.CDCValue{
Value: &command.CDCValue_S{
S: string(val),
},
}
} else {
cdcRow.Values[i] = &command.CDCValue{
Value: &command.CDCValue_Y{
Y: val,
},
}
cdcRow.Values[i] = &command.CDCValue{
Value: &command.CDCValue_Y{
Y: val,
},
}
case time.Time:
rfc3339, err := val.MarshalText()
@@ -1883,7 +1875,9 @@ func normalizeCDCValues(row []any) (*command.CDCRow, error) {
},
}
case nil:
continue
cdcRow.Values[i] = &command.CDCValue{
Value: nil,
}
default:
return nil, fmt.Errorf("unhandled column type: %T %v", val, val)
}

View File

@@ -8,7 +8,8 @@ import (
"sync/atomic"
"testing"
command "github.com/rqlite/rqlite/v8/command/proto"
"github.com/rqlite/rqlite/v8/command"
"github.com/rqlite/rqlite/v8/command/proto"
)
// Test_Preupdate_Basic tests the basic functionality of the preupdate hook, ensuring
@@ -25,7 +26,7 @@ func Test_Preupdate_Basic(t *testing.T) {
mustExecute(db, "CREATE TABLE foo (id INTEGER PRIMARY KEY, name TEXT)")
count := &atomic.Int32{}
hook := func(ev *command.CDCEvent) error {
hook := func(ev *proto.CDCEvent) error {
count.Add(1)
return nil
}
@@ -88,6 +89,189 @@ func Test_Preupdate_Basic(t *testing.T) {
}
}
func Test_Preupdate_AllTypes(t *testing.T) {
path := mustTempPath()
defer os.Remove(path)
db, err := Open(path, false, false)
if err != nil {
t.Fatalf("error opening database")
}
defer db.Close()
mustExecute(db, `CREATE TABLE foo (
id INTEGER PRIMARY KEY,
name TEXT,
employer VARCHAR(255),
ssn CHAR(11),
age INT,
weight FLOAT,
dob DATE,
active BOOLEAN,
data BLOB)`)
for i, tt := range []struct {
sql string
ev *proto.CDCEvent
}{
{
sql: `INSERT INTO foo(id, name, employer, ssn, age, weight, dob, active, data) VALUES(
5,
"fiona",
"Acme",
NULL,
21,
167.3,
'1990-01-02',
true,
x'010203')`,
ev: &proto.CDCEvent{
Table: "foo",
Op: proto.CDCEvent_INSERT,
NewRowId: 5,
NewRow: &proto.CDCRow{
Values: []*proto.CDCValue{
{Value: &proto.CDCValue_I{I: 5}},
{Value: &proto.CDCValue_S{S: "fiona"}},
{Value: &proto.CDCValue_S{S: "Acme"}},
{Value: nil},
{Value: &proto.CDCValue_I{I: 21}},
{Value: &proto.CDCValue_D{D: 167.3}},
{Value: &proto.CDCValue_S{S: "1990-01-02"}},
{Value: &proto.CDCValue_I{I: 1}},
{Value: &proto.CDCValue_Y{Y: []byte{1, 2, 3}}},
},
},
},
},
{
sql: `UPDATE foo SET
name="fiona2",
employer="Acme2",
ssn="123-45-6789",
age=22,
weight=170.1,
dob='1991-02-03',
active=false,
data=x'040506'
WHERE id=5`,
ev: &proto.CDCEvent{
Table: "foo",
Op: proto.CDCEvent_UPDATE,
OldRowId: 5,
NewRowId: 5,
OldRow: &proto.CDCRow{
Values: []*proto.CDCValue{
{Value: &proto.CDCValue_I{I: 5}},
{Value: &proto.CDCValue_S{S: "fiona"}},
{Value: &proto.CDCValue_S{S: "Acme"}},
{Value: nil},
{Value: &proto.CDCValue_I{I: 21}},
{Value: &proto.CDCValue_D{D: 167.3}},
{Value: &proto.CDCValue_S{S: "1990-01-02"}},
{Value: &proto.CDCValue_I{I: 1}},
{Value: &proto.CDCValue_Y{Y: []byte{1, 2, 3}}},
},
},
NewRow: &proto.CDCRow{
Values: []*proto.CDCValue{
{Value: &proto.CDCValue_I{I: 5}},
{Value: &proto.CDCValue_S{S: "fiona2"}},
{Value: &proto.CDCValue_S{S: "Acme2"}},
{Value: &proto.CDCValue_S{S: "123-45-6789"}},
{Value: &proto.CDCValue_I{I: 22}},
{Value: &proto.CDCValue_D{D: 170.1}},
{Value: &proto.CDCValue_S{S: "1991-02-03"}},
{Value: &proto.CDCValue_I{I: 0}},
{Value: &proto.CDCValue_Y{Y: []byte{4, 5, 6}}},
},
},
},
},
{
sql: "DELETE FROM foo WHERE id=5",
ev: &proto.CDCEvent{
Table: "foo",
Op: proto.CDCEvent_DELETE,
OldRowId: 5,
OldRow: &proto.CDCRow{
Values: []*proto.CDCValue{
{Value: &proto.CDCValue_I{I: 5}},
{Value: &proto.CDCValue_S{S: "fiona2"}},
{Value: &proto.CDCValue_S{S: "Acme2"}},
{Value: &proto.CDCValue_S{S: "123-45-6789"}},
{Value: &proto.CDCValue_I{I: 22}},
{Value: &proto.CDCValue_D{D: 170.1}},
{Value: &proto.CDCValue_S{S: "1991-02-03"}},
{Value: &proto.CDCValue_I{I: 0}},
{Value: &proto.CDCValue_Y{Y: []byte{4, 5, 6}}},
},
},
},
},
} {
var wg sync.WaitGroup
if err := db.RegisterPreUpdateHook(func(ev *proto.CDCEvent) error {
defer wg.Done()
if exp, got := tt.ev.Table, ev.Table; exp != got {
t.Fatalf("test %d: expected table %s, got %s", i, exp, got)
}
if exp, got := tt.ev.Op, ev.Op; exp != got {
t.Fatalf("test %d: expected operation %s, got %s", i, exp, got)
}
// Old row checks.
if tt.ev.OldRowId != 0 {
if exp, got := tt.ev.OldRowId, ev.OldRowId; exp != got {
t.Fatalf("test %d: expected old Row ID %d, got %d", i, exp, got)
}
}
if tt.ev.OldRow != nil {
if ev.OldRow == nil {
t.Fatalf("test %d: exp non-nil new row, got nil new row", i)
}
if len(tt.ev.OldRow.Values) != len(ev.OldRow.Values) {
t.Fatalf("test %d: exp %d old values, got %d values", i, len(tt.ev.OldRow.Values), len(ev.OldRow.Values))
}
for i := range tt.ev.OldRow.Values {
if !command.CDCValueEqual(tt.ev.OldRow.Values[i], ev.OldRow.Values[i]) {
t.Fatalf("test %d: exp new value at index %d (%v) does not equal got new value at index %d (%v)",
i, i, tt.ev.OldRow.Values[i], i, ev.OldRow.Values[i])
}
}
}
// New row checks.
if tt.ev.NewRowId != 0 {
if exp, got := tt.ev.NewRowId, ev.NewRowId; exp != got {
t.Fatalf("test %d: expected new Row ID %d, got %d", i, exp, got)
}
}
if tt.ev.NewRow != nil {
if ev.NewRow == nil {
t.Fatalf("test %d: exp non-nil new row, got nil new row", i)
}
if len(tt.ev.NewRow.Values) != len(ev.NewRow.Values) {
t.Fatalf("test %d: exp %d new values, got %d values", i, len(tt.ev.NewRow.Values), len(ev.NewRow.Values))
}
for i := range tt.ev.NewRow.Values {
if !command.CDCValueEqual(tt.ev.NewRow.Values[i], ev.NewRow.Values[i]) {
t.Fatalf("test %d: exp new value at index %d (%v) does not equal got new value at index %d (%v)",
i, i, tt.ev.NewRow.Values[i], i, ev.NewRow.Values[i])
}
}
}
return nil
}, nil, false); err != nil {
t.Fatalf("error registering preupdate hook: %s", err)
}
wg.Add(1)
mustExecute(db, tt.sql)
wg.Wait()
}
}
func Test_Preupdate_Basic_Regex(t *testing.T) {
path := mustTempPath()
defer os.Remove(path)
@@ -100,7 +284,7 @@ func Test_Preupdate_Basic_Regex(t *testing.T) {
mustExecute(db, "CREATE TABLE foobar (id INTEGER PRIMARY KEY, name TEXT)")
count := &atomic.Int32{}
hook := func(ev *command.CDCEvent) error {
hook := func(ev *proto.CDCEvent) error {
count.Add(1)
return nil
}
@@ -188,7 +372,7 @@ func Test_Preupdate_Constraint(t *testing.T) {
mustExecute(db, "CREATE TABLE foo (id INTEGER PRIMARY KEY, name TEXT UNIQUE)")
count := &atomic.Int32{}
hook := func(ev *command.CDCEvent) error {
hook := func(ev *proto.CDCEvent) error {
count.Add(1)
return nil
}
@@ -233,12 +417,12 @@ func Test_Preupdate_RowIDs(t *testing.T) {
// Insert a row, with an explicit ID.
var wg sync.WaitGroup
hook := func(ev *command.CDCEvent) error {
hook := func(ev *proto.CDCEvent) error {
defer wg.Done()
if ev.Table != "foo" {
t.Fatalf("expected table foo, got %s", ev.Table)
}
if ev.Op != command.CDCEvent_INSERT {
if ev.Op != proto.CDCEvent_INSERT {
t.Fatalf("expected operation insert, got %s", ev.Op)
}
if exp, got := int64(5), ev.OldRowId; exp != got {
@@ -261,12 +445,12 @@ func Test_Preupdate_RowIDs(t *testing.T) {
wg.Wait()
// Update a row.
hook = func(ev *command.CDCEvent) error {
hook = func(ev *proto.CDCEvent) error {
defer wg.Done()
if ev.Table != "foo" {
t.Fatalf("expected table foo, got %s", ev.Table)
}
if ev.Op != command.CDCEvent_UPDATE {
if ev.Op != proto.CDCEvent_UPDATE {
t.Fatalf("expected operation update, got %s", ev.Op)
}
if exp, got := int64(5), ev.OldRowId; exp != got {
@@ -289,12 +473,12 @@ func Test_Preupdate_RowIDs(t *testing.T) {
wg.Wait()
// Delete a row.
hook = func(ev *command.CDCEvent) error {
hook = func(ev *proto.CDCEvent) error {
defer wg.Done()
if ev.Table != "foo" {
t.Fatalf("expected table foo, got %s", ev.Table)
}
if ev.Op != command.CDCEvent_DELETE {
if ev.Op != proto.CDCEvent_DELETE {
t.Fatalf("expected operation delete, got %s", ev.Op)
}
if exp, got := int64(5), ev.OldRowId; exp != got {
@@ -330,19 +514,19 @@ func Test_Preupdate_Data(t *testing.T) {
/////////////////////////////////////////////////////////////////
// Insert a row, with an explicit ID.
var wg sync.WaitGroup
hook := func(got *command.CDCEvent) error {
hook := func(got *proto.CDCEvent) error {
defer wg.Done()
exp := &command.CDCEvent{
exp := &proto.CDCEvent{
Table: "foo",
Op: command.CDCEvent_INSERT,
Op: proto.CDCEvent_INSERT,
OldRowId: 5,
NewRowId: 5,
OldRow: nil,
NewRow: &command.CDCRow{
Values: []*command.CDCValue{
{Value: &command.CDCValue_I{I: 5}},
{Value: &command.CDCValue_S{S: "fiona"}},
{Value: &command.CDCValue_D{D: 2.4}},
NewRow: &proto.CDCRow{
Values: []*proto.CDCValue{
{Value: &proto.CDCValue_I{I: 5}},
{Value: &proto.CDCValue_S{S: "fiona"}},
{Value: &proto.CDCValue_D{D: 2.4}},
},
},
}
@@ -358,19 +542,19 @@ func Test_Preupdate_Data(t *testing.T) {
/////////////////////////////////////////////////////////////////
// Insert a row with more complex data
hook = func(got *command.CDCEvent) error {
hook = func(got *proto.CDCEvent) error {
defer wg.Done()
exp := &command.CDCEvent{
exp := &proto.CDCEvent{
Table: "foo",
Op: command.CDCEvent_INSERT,
Op: proto.CDCEvent_INSERT,
OldRowId: 20,
NewRowId: 20,
OldRow: nil,
NewRow: &command.CDCRow{
Values: []*command.CDCValue{
{Value: &command.CDCValue_I{I: 20}},
{Value: &command.CDCValue_S{S: "😃💁 People 大鹿"}},
{Value: &command.CDCValue_D{D: 1.23}},
NewRow: &proto.CDCRow{
Values: []*proto.CDCValue{
{Value: &proto.CDCValue_I{I: 20}},
{Value: &proto.CDCValue_S{S: "😃💁 People 大鹿"}},
{Value: &proto.CDCValue_D{D: 1.23}},
},
},
}
@@ -386,19 +570,19 @@ func Test_Preupdate_Data(t *testing.T) {
/////////////////////////////////////////////////////////////////
// Insert a row, adding subset of columns
hook = func(got *command.CDCEvent) error {
hook = func(got *proto.CDCEvent) error {
defer wg.Done()
exp := &command.CDCEvent{
exp := &proto.CDCEvent{
Table: "foo",
Op: command.CDCEvent_INSERT,
Op: proto.CDCEvent_INSERT,
OldRowId: 6,
NewRowId: 6,
OldRow: nil,
NewRow: &command.CDCRow{
Values: []*command.CDCValue{
{Value: &command.CDCValue_I{I: 6}},
nil,
{Value: &command.CDCValue_D{D: 3.7}},
NewRow: &proto.CDCRow{
Values: []*proto.CDCValue{
{Value: &proto.CDCValue_I{I: 6}},
{Value: nil},
{Value: &proto.CDCValue_D{D: 3.7}},
},
},
}
@@ -414,25 +598,25 @@ func Test_Preupdate_Data(t *testing.T) {
/////////////////////////////////////////////////////////////////
// Update a row.
hook = func(got *command.CDCEvent) error {
hook = func(got *proto.CDCEvent) error {
defer wg.Done()
exp := &command.CDCEvent{
exp := &proto.CDCEvent{
Table: "foo",
Op: command.CDCEvent_UPDATE,
Op: proto.CDCEvent_UPDATE,
OldRowId: 5,
NewRowId: 5,
OldRow: &command.CDCRow{
Values: []*command.CDCValue{
{Value: &command.CDCValue_I{I: 5}},
{Value: &command.CDCValue_S{S: "fiona"}},
{Value: &command.CDCValue_D{D: 2.4}},
OldRow: &proto.CDCRow{
Values: []*proto.CDCValue{
{Value: &proto.CDCValue_I{I: 5}},
{Value: &proto.CDCValue_S{S: "fiona"}},
{Value: &proto.CDCValue_D{D: 2.4}},
},
},
NewRow: &command.CDCRow{
Values: []*command.CDCValue{
{Value: &command.CDCValue_I{I: 5}},
{Value: &command.CDCValue_S{S: "fiona2"}},
{Value: &command.CDCValue_D{D: 2.4}},
NewRow: &proto.CDCRow{
Values: []*proto.CDCValue{
{Value: &proto.CDCValue_I{I: 5}},
{Value: &proto.CDCValue_S{S: "fiona2"}},
{Value: &proto.CDCValue_D{D: 2.4}},
},
},
}
@@ -447,18 +631,18 @@ func Test_Preupdate_Data(t *testing.T) {
wg.Wait()
// Delete a row.
hook = func(got *command.CDCEvent) error {
hook = func(got *proto.CDCEvent) error {
defer wg.Done()
exp := &command.CDCEvent{
exp := &proto.CDCEvent{
Table: "foo",
Op: command.CDCEvent_DELETE,
Op: proto.CDCEvent_DELETE,
OldRowId: 5,
NewRowId: 5,
OldRow: &command.CDCRow{
Values: []*command.CDCValue{
{Value: &command.CDCValue_I{I: 5}},
{Value: &command.CDCValue_S{S: "fiona2"}},
{Value: &command.CDCValue_D{D: 2.4}},
OldRow: &proto.CDCRow{
Values: []*proto.CDCValue{
{Value: &proto.CDCValue_I{I: 5}},
{Value: &proto.CDCValue_S{S: "fiona2"}},
{Value: &proto.CDCValue_D{D: 2.4}},
},
},
NewRow: nil,
@@ -489,7 +673,7 @@ func Test_Preupdate_Multi(t *testing.T) {
mustExecute(db, "INSERT INTO foo(id, name) VALUES(2, 'fiona')")
var wg sync.WaitGroup
hook := func(got *command.CDCEvent) error {
hook := func(got *proto.CDCEvent) error {
defer wg.Done()
return nil
}
@@ -514,7 +698,7 @@ func Test_Preupdate_Tx(t *testing.T) {
mustExecute(db, "CREATE TABLE foo (id INTEGER PRIMARY KEY, name TEXT)")
var wg sync.WaitGroup
hook := func(got *command.CDCEvent) error {
hook := func(got *proto.CDCEvent) error {
defer wg.Done()
return nil
}
@@ -528,7 +712,7 @@ func Test_Preupdate_Tx(t *testing.T) {
wg.Wait()
}
func compareEvents(t *testing.T, exp, got *command.CDCEvent) {
func compareEvents(t *testing.T, exp, got *proto.CDCEvent) {
t.Helper()
if exp, got := exp.Table, got.Table; exp != got {
t.Fatalf("expected table %s, got %s", exp, got)