mirror of
https://github.com/rqlite/rqlite.git
synced 2026-01-25 04:16:26 +00:00
Add initial CDC Service skeleton
This commit is contained in:
@@ -1,3 +1,7 @@
|
||||
## v8.38.3 (unreleased)
|
||||
### Implementation changes and bug fixes
|
||||
- [PR #2106](https://github.com/rqlite/rqlite/pull/2106): Add initial CDC Service implementation.
|
||||
|
||||
## v8.38.2 (June 23rd 2025)
|
||||
### Implementation changes and bug fixes
|
||||
- [PR #2119](https://github.com/rqlite/rqlite/pull/2119): CLI supports `RQLITE_HOST` as connection environment variable. Fixes issue [#2088](https://github.com/rqlite/rqlite/issues/2088). Thanks @m04f
|
||||
|
||||
32
cdc/DESIGN.md
Normal file
32
cdc/DESIGN.md
Normal file
@@ -0,0 +1,32 @@
|
||||
CDC Service Design
|
||||
|
||||
Change-data-capture has been added to the database module of rqlite. This allows one to register two callbacks -- one when rows are about to be updated, and one which is called when the updates are actually committed. The changes has been modeled using protobufs, and the events are sent to a Go channel.
|
||||
|
||||
The next task is to design the "CDC Service" which will read this channel and transmit the events to a configurable webhook. rqlite is written in Go.
|
||||
|
||||
The events contain both "before" and "after" data for the changed rows, as well as the index of the Raft log containing the SQL statement that triggered the changes.
|
||||
|
||||
General structure
|
||||
|
||||
The CDC Service (CDCS) will be instantiated with:
|
||||
- a URL. Only HTTP or HTTPS will be supported for now as the scheme, but other schemes will be supported in the future.
|
||||
- a tls.Config object, allowing control over such things as mutual TLS. This object may be nil.
|
||||
- retry policy include number of times to retry POSTing a given event.
|
||||
- the CDCS can be configured to drop events after the retry limit is reached, or keeping retrying forever (though this will result in a growing data storage need -- see later).
|
||||
- a channel on which to read
|
||||
|
||||
When launched the first thing the CDCS will do is create a table call "_rqlite_cdc_state" in the SQLite database, via Raft consensus. This table is used by the CDCS for its own state management. The CDCS will start a goroutine to read the channel. As events are read they are added to a disk-backed, or BoltDB-backed, FIFO queue. The reason for this queue will be explained in a moment.
|
||||
|
||||
A second goroutine reads from the FIFO queue. It continues to read as many events are available (up to a configurable limit so there is a maximum delay with sending CDC events), creates a batch and then checks if it is the Leader. If it is the Leader it performs a HTTP POST to the endpoint with the body containing the JSON marshaled batch, implementing any retries as needed. If it is not the Leader it does not perform the POST, because it assumes the actualy Leader will transmit the events. It then returns to reading from the queue.
|
||||
|
||||
Assuming it is the Leader it records in memory the highest Raft Index corresponding to events it has successfully transmitted to the webhook -- call this the CDC high watermark. Concurrently a third goroutine (which has been launched at CDCS startup) periodically writes this high watermark to the SQLite database via Raft consensus to the _rqlite_cdc_state table. In this manner each node learns which events have been transmitted successfully, and this minimizes duplicate events to the webhook.
|
||||
|
||||
This means that in addition to checking if it (the CDCS) is leader, it also checks if it's reading events with a Raft index lower than the high watermark. If it does, it doesn't just skip sending those events, but actually deletes them from the queue.
|
||||
|
||||
Leadership changes
|
||||
|
||||
The CDCS will listen for leadership changes -- this functionality is already offered by the Raft subsystem. When a node becomes a leader it will start reading the queue from the start, implementing the scheme above. This ensures that every event is transmitted at least once. Careful reading of the above will show that there is a race -- a node could check that is is leader, determine it isn't, and not send an event. However leadership could be gained between the leader check and the decision not to transmit. So when leadership changes a node must resend all events that have not been positively recorded via the high watermark. This means that leadership changes may result in a duplicates.
|
||||
|
||||
# Snapshot xfer issues
|
||||
|
||||
See https://github.com/rqlite/rqlite/pull/2106/files#r2143667669
|
||||
283
cdc/service.go
Normal file
283
cdc/service.go
Normal file
@@ -0,0 +1,283 @@
|
||||
package cdc
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/tls"
|
||||
"expvar"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/rqlite/rqlite/v8/command/proto"
|
||||
"github.com/rqlite/rqlite/v8/queue"
|
||||
"google.golang.org/protobuf/encoding/protojson"
|
||||
)
|
||||
|
||||
const (
|
||||
highWatermarkKey = "high_watermark"
|
||||
leaderChanLen = 5 // Support any fast back-to-back leadership changes.
|
||||
)
|
||||
|
||||
const (
|
||||
numDroppedNotLeader = "dropped_not_leader"
|
||||
numDroppedFailedToSend = "dropped_failed_to_send"
|
||||
numSent = "sent_events"
|
||||
)
|
||||
|
||||
// stats captures stats for the CDC Service.
|
||||
var stats *expvar.Map
|
||||
|
||||
func init() {
|
||||
stats = expvar.NewMap("cdc_service")
|
||||
ResetStats()
|
||||
}
|
||||
|
||||
// ResetStats resets the expvar stats for this module. Mostly for test purposes.
|
||||
func ResetStats() {
|
||||
stats.Init()
|
||||
stats.Add(numDroppedNotLeader, 0)
|
||||
stats.Add(numDroppedFailedToSend, 0)
|
||||
stats.Add(numSent, 0)
|
||||
}
|
||||
|
||||
// Cluster is an interface that defines methods for cluster management.
|
||||
type Cluster interface {
|
||||
// IsLeader returns true if the node is the leader of the cluster.
|
||||
IsLeader() bool
|
||||
|
||||
// RegisterLeaderChange registers the given channel which will receive
|
||||
// a signal when the node detects that the Leader changes.
|
||||
RegisterLeaderChange(c chan<- struct{})
|
||||
}
|
||||
|
||||
// Store is an interface that defines methods for executing commands and querying
|
||||
// the state of the store. It is used by the CDC service to read and write its own state.
|
||||
type Store interface {
|
||||
// Execute allows us to write state to the store.
|
||||
Execute(er *proto.ExecuteRequest) ([]*proto.ExecuteQueryResponse, error)
|
||||
|
||||
// Query allows us to read state from the store.
|
||||
Query(qr *proto.QueryRequest) ([]*proto.QueryRows, error)
|
||||
}
|
||||
|
||||
// Service is a CDC service that reads events from a channel and processes them.
|
||||
// It is used to stream changes to a HTTP endpoint.
|
||||
type Service struct {
|
||||
clstr Cluster
|
||||
str Store
|
||||
|
||||
in <-chan *proto.CDCEvents
|
||||
tlsConfig *tls.Config
|
||||
|
||||
// endpoint is the HTTP endpoint to which the CDC events are sent.
|
||||
endpoint string
|
||||
|
||||
// httpClient is the HTTP client used to send requests to the endpoint.
|
||||
httpClient *http.Client
|
||||
|
||||
// maxBatchSz is the maximum number of events to send in a single batch to the endpoint.
|
||||
maxBatchSz int
|
||||
|
||||
// maxBatchDelay is the maximum delay before sending a batch of events, regardless
|
||||
// of the number of events ready for sending. This is used to ensure that
|
||||
// we don't wait too long for a batch to fill up.
|
||||
maxBatchDelay time.Duration
|
||||
|
||||
// queue is a queue of events to be sent to the webhook. It implements the
|
||||
// batching and timeout logic.
|
||||
queue *queue.Queue[*proto.CDCEvents]
|
||||
|
||||
// highWatermark is the index of the last event that was successfully sent to the webhook
|
||||
// by the cluster (which is not necessarily the same thing as this node).
|
||||
highWatermark atomic.Uint64
|
||||
|
||||
batchMarshaler protojson.MarshalOptions
|
||||
|
||||
// For CDC shutdown.
|
||||
wg sync.WaitGroup
|
||||
done chan struct{}
|
||||
|
||||
logger *log.Logger
|
||||
}
|
||||
|
||||
// NewService creates a new CDC service.
|
||||
func NewService(clstr Cluster, str Store, in <-chan *proto.CDCEvents, endpoint string, tlsConfig *tls.Config, maxBatchSz int, maxBatchDelay time.Duration) *Service {
|
||||
httpClient := &http.Client{
|
||||
Transport: &http.Transport{
|
||||
TLSClientConfig: tlsConfig,
|
||||
},
|
||||
Timeout: 5 * time.Second,
|
||||
}
|
||||
|
||||
return &Service{
|
||||
clstr: clstr,
|
||||
str: str,
|
||||
in: in,
|
||||
tlsConfig: tlsConfig,
|
||||
endpoint: endpoint,
|
||||
httpClient: httpClient,
|
||||
maxBatchSz: maxBatchSz,
|
||||
maxBatchDelay: maxBatchDelay,
|
||||
queue: queue.New[*proto.CDCEvents](maxBatchSz, maxBatchSz, maxBatchDelay),
|
||||
done: make(chan struct{}),
|
||||
batchMarshaler: protojson.MarshalOptions{},
|
||||
logger: log.New(os.Stdout, "[cdc service] ", log.LstdFlags),
|
||||
}
|
||||
}
|
||||
|
||||
// Start starts the CDC service.
|
||||
func (s *Service) Start() error {
|
||||
if err := s.createStateTable(); err != nil {
|
||||
return err
|
||||
}
|
||||
s.wg.Add(3)
|
||||
go s.readEvents()
|
||||
go s.postEvents()
|
||||
go s.writeHighWatermarkLoop()
|
||||
|
||||
obCh := make(chan struct{}, leaderChanLen)
|
||||
s.clstr.RegisterLeaderChange(obCh)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop stops the CDC service.
|
||||
func (s *Service) Stop() {
|
||||
close(s.done)
|
||||
s.wg.Wait()
|
||||
}
|
||||
|
||||
// HighWatermark returns the high watermark of the CDC service. This
|
||||
// is the index of the last event that was successfully sent to the webhook.
|
||||
func (s *Service) HighWatermark() uint64 {
|
||||
return s.highWatermark.Load()
|
||||
}
|
||||
|
||||
func (s *Service) readEvents() {
|
||||
defer s.wg.Done()
|
||||
for {
|
||||
select {
|
||||
case o := <-s.in:
|
||||
// Right now just write the event to the queue. Events should be
|
||||
// persisted to a disk-based queue for replay on Leader change.
|
||||
s.queue.Write([]*proto.CDCEvents{o}, nil)
|
||||
case <-s.done:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) postEvents() {
|
||||
defer s.wg.Done()
|
||||
for {
|
||||
select {
|
||||
case batch := <-s.queue.C:
|
||||
if batch == nil || len(batch.Objects) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// Only the Leader actually sends events.
|
||||
if !s.clstr.IsLeader() {
|
||||
stats.Add(numDroppedNotLeader, int64(len(batch.Objects)))
|
||||
continue
|
||||
}
|
||||
|
||||
batchMsg := &proto.CDCEventsBatch{Payload: batch.Objects}
|
||||
b, err := s.batchMarshaler.Marshal(batchMsg)
|
||||
if err != nil {
|
||||
s.logger.Printf("error marshalling batch: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("POST", s.endpoint, bytes.NewReader(b))
|
||||
if err != nil {
|
||||
s.logger.Printf("error creating HTTP request for endpoint: %v", err)
|
||||
continue
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
maxRetries := 5
|
||||
nAttempts := 0
|
||||
retryDelay := 500 * time.Millisecond
|
||||
for {
|
||||
nAttempts++
|
||||
resp, err := s.httpClient.Do(req)
|
||||
if err == nil && (resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusAccepted) {
|
||||
resp.Body.Close()
|
||||
s.highWatermark.Store(batch.Objects[len(batch.Objects)-1].Index)
|
||||
stats.Add(numSent, int64(len(batch.Objects)))
|
||||
break
|
||||
}
|
||||
if nAttempts >= maxRetries {
|
||||
s.logger.Printf("failed to send batch to endpoint after %d retries, last error: %v", nAttempts, err)
|
||||
stats.Add(numDroppedFailedToSend, int64(len(batch.Objects)))
|
||||
break
|
||||
}
|
||||
retryDelay *= 2
|
||||
time.Sleep(retryDelay)
|
||||
}
|
||||
case <-s.done:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) writeHighWatermarkLoop() {
|
||||
defer s.wg.Done()
|
||||
ticker := time.NewTicker(5 * time.Second)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if err := s.writeHighWatermark(s.highWatermark.Load()); err != nil {
|
||||
s.logger.Printf("error writing high watermark to store: %v", err)
|
||||
continue
|
||||
}
|
||||
case <-s.done:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) createStateTable() error {
|
||||
er := executeRequestFromString(`
|
||||
CREATE TABLE IF NOT EXISTS _rqlite_cdc_state (
|
||||
k TEXT PRIMARY KEY,
|
||||
v_blob BLOB,
|
||||
v_text TEXT,
|
||||
v_int INTEGER
|
||||
)`)
|
||||
_, err := s.str.Execute(er)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Service) writeHighWatermark(value uint64) error {
|
||||
sql := fmt.Sprintf(`INSERT OR REPLACE INTO _rqlite_cdc_state(k, v_int) VALUES ('%s', %d)`, highWatermarkKey, value)
|
||||
er := executeRequestFromString(sql)
|
||||
_, err := s.str.Execute(er)
|
||||
return err
|
||||
}
|
||||
|
||||
func executeRequestFromString(s string) *proto.ExecuteRequest {
|
||||
return executeRequestFromStrings([]string{s}, false, false)
|
||||
}
|
||||
|
||||
// executeRequestFromStrings converts a slice of strings into a proto.ExecuteRequest
|
||||
func executeRequestFromStrings(s []string, timings, tx bool) *proto.ExecuteRequest {
|
||||
stmts := make([]*proto.Statement, len(s))
|
||||
for i := range s {
|
||||
stmts[i] = &proto.Statement{
|
||||
Sql: s[i],
|
||||
}
|
||||
}
|
||||
return &proto.ExecuteRequest{
|
||||
Request: &proto.Request{
|
||||
Statements: stmts,
|
||||
Transaction: tx,
|
||||
},
|
||||
Timings: timings,
|
||||
}
|
||||
}
|
||||
352
cdc/service_test.go
Normal file
352
cdc/service_test.go
Normal file
@@ -0,0 +1,352 @@
|
||||
package cdc
|
||||
|
||||
import (
|
||||
"expvar"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"reflect"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/rqlite/rqlite/v8/command/proto"
|
||||
"google.golang.org/protobuf/encoding/protojson"
|
||||
)
|
||||
|
||||
func Test_ServiceSingleEvent(t *testing.T) {
|
||||
ResetStats()
|
||||
|
||||
// Channel for the service to receive events.
|
||||
eventsCh := make(chan *proto.CDCEvents, 1)
|
||||
|
||||
bodyCh := make(chan []byte, 1)
|
||||
testSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
defer r.Body.Close()
|
||||
b, _ := io.ReadAll(r.Body)
|
||||
bodyCh <- b
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
defer testSrv.Close()
|
||||
|
||||
// Mock cluster – this node is always leader.
|
||||
cl := &mockCluster{}
|
||||
cl.leader.Store(true)
|
||||
|
||||
// Construct and start the service.
|
||||
svc := NewService(
|
||||
cl,
|
||||
&mockStore{},
|
||||
eventsCh,
|
||||
testSrv.URL,
|
||||
nil, // no TLS
|
||||
1, // maxBatchSz – flush immediately
|
||||
50*time.Millisecond, // maxBatchDelay – short for test
|
||||
)
|
||||
if err := svc.Start(); err != nil {
|
||||
t.Fatalf("failed to start service: %v", err)
|
||||
}
|
||||
defer svc.Stop()
|
||||
|
||||
// Send one dummy event to the service.
|
||||
ev := &proto.CDCEvent{
|
||||
Op: proto.CDCEvent_INSERT,
|
||||
Table: "foo",
|
||||
NewRowId: 2,
|
||||
}
|
||||
evs := &proto.CDCEvents{
|
||||
Index: 1,
|
||||
Events: []*proto.CDCEvent{ev},
|
||||
}
|
||||
eventsCh <- evs
|
||||
|
||||
// Wait for the service to forward the batch.
|
||||
select {
|
||||
case got := <-bodyCh:
|
||||
var batch proto.CDCEventsBatch
|
||||
if err := protojson.Unmarshal(got, &batch); err != nil {
|
||||
t.Fatalf("invalid JSON received: %v", err)
|
||||
}
|
||||
if len(batch.Payload) != 1 || batch.Payload[0].Index != evs.Index {
|
||||
t.Fatalf("unexpected payload: %v", batch.Payload)
|
||||
}
|
||||
if len(batch.Payload[0].Events) != 1 {
|
||||
t.Fatalf("unexpected number of events in payload: %d", len(batch.Payload[0].Events))
|
||||
}
|
||||
if reflect.DeepEqual(batch.Payload[0].Events[0], evs.Events[0]) == false {
|
||||
t.Fatalf("unexpected events in payload: %v", batch.Payload[0].Events)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatalf("timeout waiting for HTTP POST")
|
||||
}
|
||||
|
||||
testPoll(t, func() bool {
|
||||
return svc.HighWatermark() == evs.Index
|
||||
}, 2*time.Second)
|
||||
|
||||
// Next emulate CDC not running on the Leader.
|
||||
cl.leader.Store(false)
|
||||
eventsCh <- evs
|
||||
pollExpvarUntil(t, numDroppedNotLeader, 1, 2*time.Second)
|
||||
}
|
||||
|
||||
func Test_ServiceMultiEvent(t *testing.T) {
|
||||
ResetStats()
|
||||
|
||||
// Channel for the service to receive events.
|
||||
eventsCh := make(chan *proto.CDCEvents, 1)
|
||||
|
||||
bodyCh := make(chan []byte, 1)
|
||||
testSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
defer r.Body.Close()
|
||||
b, _ := io.ReadAll(r.Body)
|
||||
bodyCh <- b
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
defer testSrv.Close()
|
||||
|
||||
// Mock cluster – this node is always leader.
|
||||
cl := &mockCluster{}
|
||||
cl.leader.Store(true)
|
||||
|
||||
// Construct and start the service.
|
||||
svc := NewService(
|
||||
cl,
|
||||
&mockStore{},
|
||||
eventsCh,
|
||||
testSrv.URL,
|
||||
nil, // no TLS
|
||||
2, // maxBatchSz – flush after 2 events
|
||||
1*time.Second,
|
||||
)
|
||||
if err := svc.Start(); err != nil {
|
||||
t.Fatalf("failed to start service: %v", err)
|
||||
}
|
||||
defer svc.Stop()
|
||||
|
||||
// Create the Events and send them.
|
||||
ev1 := &proto.CDCEvent{
|
||||
Op: proto.CDCEvent_INSERT,
|
||||
Table: "foo",
|
||||
NewRowId: 10,
|
||||
}
|
||||
evs1 := &proto.CDCEvents{
|
||||
Index: 1,
|
||||
Events: []*proto.CDCEvent{ev1},
|
||||
}
|
||||
ev2 := &proto.CDCEvent{
|
||||
Op: proto.CDCEvent_UPDATE,
|
||||
Table: "baz",
|
||||
OldRowId: 20,
|
||||
NewRowId: 30,
|
||||
}
|
||||
evs2 := &proto.CDCEvents{
|
||||
Index: 2,
|
||||
Events: []*proto.CDCEvent{ev2},
|
||||
}
|
||||
eventsCh <- evs1
|
||||
eventsCh <- evs2
|
||||
|
||||
// Wait for the service to forward the batch.
|
||||
select {
|
||||
case got := <-bodyCh:
|
||||
var batch proto.CDCEventsBatch
|
||||
if err := protojson.Unmarshal(got, &batch); err != nil {
|
||||
t.Fatalf("invalid JSON received: %v", err)
|
||||
}
|
||||
if len(batch.Payload) != 2 {
|
||||
t.Fatalf("unexpected payload length: %d", len(batch.Payload))
|
||||
}
|
||||
if batch.Payload[0].Index != evs1.Index || batch.Payload[1].Index != evs2.Index {
|
||||
t.Fatalf("unexpected payload indices: %d, %d", batch.Payload[0].Index, batch.Payload[1].Index)
|
||||
}
|
||||
if len(batch.Payload[0].Events) != 1 || len(batch.Payload[1].Events) != 1 {
|
||||
t.Fatalf("unexpected number of events in payload: %d, %d", len(batch.Payload[0].Events), len(batch.Payload[1].Events))
|
||||
}
|
||||
if !reflect.DeepEqual(batch.Payload[0].Events[0], evs1.Events[0]) || !reflect.DeepEqual(batch.Payload[1].Events[0], evs2.Events[0]) {
|
||||
t.Fatalf("unexpected events in payload: %v, %v", batch.Payload[0].Events[0], batch.Payload[1].Events[0])
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatalf("timeout waiting for HTTP POST")
|
||||
}
|
||||
|
||||
testPoll(t, func() bool {
|
||||
return svc.HighWatermark() == evs2.Index
|
||||
}, 2*time.Second)
|
||||
}
|
||||
|
||||
func Test_ServiceMultiEvent_Batch(t *testing.T) {
|
||||
ResetStats()
|
||||
|
||||
// Channel for the service to receive events.
|
||||
eventsCh := make(chan *proto.CDCEvents, 1)
|
||||
|
||||
bodyCh := make(chan []byte, 1)
|
||||
testSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
defer r.Body.Close()
|
||||
b, _ := io.ReadAll(r.Body)
|
||||
bodyCh <- b
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
defer testSrv.Close()
|
||||
|
||||
// Mock cluster – this node is always leader.
|
||||
cl := &mockCluster{}
|
||||
cl.leader.Store(true)
|
||||
|
||||
// Construct and start the service.
|
||||
svc := NewService(
|
||||
cl,
|
||||
&mockStore{},
|
||||
eventsCh,
|
||||
testSrv.URL,
|
||||
nil, // no TLS
|
||||
2, // maxBatchSz – flush after 2 events
|
||||
100*time.Millisecond,
|
||||
)
|
||||
if err := svc.Start(); err != nil {
|
||||
t.Fatalf("failed to start service: %v", err)
|
||||
}
|
||||
defer svc.Stop()
|
||||
|
||||
// Create the Events and send them.
|
||||
ev1 := &proto.CDCEvent{
|
||||
Op: proto.CDCEvent_INSERT,
|
||||
Table: "foo",
|
||||
NewRowId: 10,
|
||||
}
|
||||
evs1 := &proto.CDCEvents{
|
||||
Index: 1,
|
||||
Events: []*proto.CDCEvent{ev1},
|
||||
}
|
||||
ev2 := &proto.CDCEvent{
|
||||
Op: proto.CDCEvent_UPDATE,
|
||||
Table: "baz",
|
||||
OldRowId: 20,
|
||||
NewRowId: 30,
|
||||
}
|
||||
evs2 := &proto.CDCEvents{
|
||||
Index: 2,
|
||||
Events: []*proto.CDCEvent{ev2},
|
||||
}
|
||||
ev2 = &proto.CDCEvent{
|
||||
Op: proto.CDCEvent_DELETE,
|
||||
Table: "qux",
|
||||
OldRowId: 40,
|
||||
}
|
||||
evs3 := &proto.CDCEvents{
|
||||
Index: 3,
|
||||
Events: []*proto.CDCEvent{ev2},
|
||||
}
|
||||
eventsCh <- evs1
|
||||
eventsCh <- evs2
|
||||
eventsCh <- evs3
|
||||
|
||||
// Wait for the service to forward the first batch.
|
||||
select {
|
||||
case got := <-bodyCh:
|
||||
var batch proto.CDCEventsBatch
|
||||
if err := protojson.Unmarshal(got, &batch); err != nil {
|
||||
t.Fatalf("invalid JSON received: %v", err)
|
||||
}
|
||||
if len(batch.Payload) != 2 {
|
||||
t.Fatalf("unexpected payload length: %d", len(batch.Payload))
|
||||
}
|
||||
if batch.Payload[0].Index != evs1.Index || batch.Payload[1].Index != evs2.Index {
|
||||
t.Fatalf("unexpected payload indices: %d, %d", batch.Payload[0].Index, batch.Payload[1].Index)
|
||||
}
|
||||
if len(batch.Payload[0].Events) != 1 || len(batch.Payload[1].Events) != 1 {
|
||||
t.Fatalf("unexpected number of events in payload: %d, %d", len(batch.Payload[0].Events), len(batch.Payload[1].Events))
|
||||
}
|
||||
if !reflect.DeepEqual(batch.Payload[0].Events[0], evs1.Events[0]) || !reflect.DeepEqual(batch.Payload[1].Events[0], evs2.Events[0]) {
|
||||
t.Fatalf("unexpected events in payload: %v, %v", batch.Payload[0].Events[0], batch.Payload[1].Events[0])
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatalf("timeout waiting for HTTP POST")
|
||||
}
|
||||
|
||||
// Wait for the service to forward the second batch, which will be kicked out due to a timeout.
|
||||
select {
|
||||
case got := <-bodyCh:
|
||||
var batch proto.CDCEventsBatch
|
||||
if err := protojson.Unmarshal(got, &batch); err != nil {
|
||||
t.Fatalf("invalid JSON received: %v", err)
|
||||
}
|
||||
if len(batch.Payload) != 1 {
|
||||
t.Fatalf("unexpected payload length: %d", len(batch.Payload))
|
||||
}
|
||||
if batch.Payload[0].Index != evs3.Index {
|
||||
t.Fatalf("unexpected payload index: %d", batch.Payload[0].Index)
|
||||
}
|
||||
if len(batch.Payload[0].Events) != 1 {
|
||||
t.Fatalf("unexpected number of events in payload: %d", len(batch.Payload[0].Events))
|
||||
}
|
||||
if !reflect.DeepEqual(batch.Payload[0].Events[0], evs3.Events[0]) {
|
||||
t.Fatalf("unexpected events in payload: %v", batch.Payload[0].Events[0])
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatalf("timeout waiting for HTTP POST")
|
||||
}
|
||||
|
||||
testPoll(t, func() bool {
|
||||
return svc.HighWatermark() == evs3.Index
|
||||
}, 2*time.Second)
|
||||
}
|
||||
|
||||
type mockCluster struct {
|
||||
leader atomic.Bool
|
||||
}
|
||||
|
||||
func (m *mockCluster) IsLeader() bool { return m.leader.Load() }
|
||||
|
||||
func (m *mockCluster) RegisterLeaderChange(chan<- struct{}) {
|
||||
// Not needed for this simple test.
|
||||
}
|
||||
|
||||
type mockStore struct{}
|
||||
|
||||
func (m *mockStore) Execute(*proto.ExecuteRequest) ([]*proto.ExecuteQueryResponse, error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (m *mockStore) Query(*proto.QueryRequest) ([]*proto.QueryRows, error) { return nil, nil }
|
||||
|
||||
func pollExpvarUntil(t *testing.T, name string, expected int64, timeout time.Duration) {
|
||||
t.Helper()
|
||||
ticker := time.NewTicker(10 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
timer := time.NewTimer(timeout)
|
||||
defer timer.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
val := stats.Get(name)
|
||||
if val == nil {
|
||||
t.Fatalf("expvar %s not found", name)
|
||||
}
|
||||
if i, ok := val.(*expvar.Int); ok && i.Value() == expected {
|
||||
return
|
||||
}
|
||||
case <-timer.C:
|
||||
t.Fatalf("timed out waiting for expvar %s to reach %d", name, expected)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func testPoll(t *testing.T, condition func() bool, timeout time.Duration) {
|
||||
t.Helper()
|
||||
ticker := time.NewTicker(10 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
timer := time.NewTimer(timeout)
|
||||
defer timer.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if condition() {
|
||||
return
|
||||
}
|
||||
case <-timer.C:
|
||||
t.Fatalf("timed out waiting for condition")
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -290,7 +290,7 @@ func (x UpdateHookEvent_Operation) Number() protoreflect.EnumNumber {
|
||||
|
||||
// Deprecated: Use UpdateHookEvent_Operation.Descriptor instead.
|
||||
func (UpdateHookEvent_Operation) EnumDescriptor() ([]byte, []int) {
|
||||
return file_command_proto_rawDescGZIP(), []int{22, 0}
|
||||
return file_command_proto_rawDescGZIP(), []int{23, 0}
|
||||
}
|
||||
|
||||
type Parameter struct {
|
||||
@@ -1765,7 +1765,7 @@ func (x *CDCEvent) GetNewRow() *CDCRow {
|
||||
|
||||
type CDCEvents struct {
|
||||
state protoimpl.MessageState `protogen:"open.v1"`
|
||||
K uint64 `protobuf:"varint,1,opt,name=k,proto3" json:"k,omitempty"`
|
||||
Index uint64 `protobuf:"varint,1,opt,name=index,proto3" json:"index,omitempty"`
|
||||
Events []*CDCEvent `protobuf:"bytes,2,rep,name=events,proto3" json:"events,omitempty"`
|
||||
unknownFields protoimpl.UnknownFields
|
||||
sizeCache protoimpl.SizeCache
|
||||
@@ -1801,9 +1801,9 @@ func (*CDCEvents) Descriptor() ([]byte, []int) {
|
||||
return file_command_proto_rawDescGZIP(), []int{21}
|
||||
}
|
||||
|
||||
func (x *CDCEvents) GetK() uint64 {
|
||||
func (x *CDCEvents) GetIndex() uint64 {
|
||||
if x != nil {
|
||||
return x.K
|
||||
return x.Index
|
||||
}
|
||||
return 0
|
||||
}
|
||||
@@ -1815,6 +1815,50 @@ func (x *CDCEvents) GetEvents() []*CDCEvent {
|
||||
return nil
|
||||
}
|
||||
|
||||
type CDCEventsBatch struct {
|
||||
state protoimpl.MessageState `protogen:"open.v1"`
|
||||
Payload []*CDCEvents `protobuf:"bytes,1,rep,name=payload,proto3" json:"payload,omitempty"`
|
||||
unknownFields protoimpl.UnknownFields
|
||||
sizeCache protoimpl.SizeCache
|
||||
}
|
||||
|
||||
func (x *CDCEventsBatch) Reset() {
|
||||
*x = CDCEventsBatch{}
|
||||
mi := &file_command_proto_msgTypes[22]
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
|
||||
func (x *CDCEventsBatch) String() string {
|
||||
return protoimpl.X.MessageStringOf(x)
|
||||
}
|
||||
|
||||
func (*CDCEventsBatch) ProtoMessage() {}
|
||||
|
||||
func (x *CDCEventsBatch) ProtoReflect() protoreflect.Message {
|
||||
mi := &file_command_proto_msgTypes[22]
|
||||
if x != nil {
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
if ms.LoadMessageInfo() == nil {
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
return ms
|
||||
}
|
||||
return mi.MessageOf(x)
|
||||
}
|
||||
|
||||
// Deprecated: Use CDCEventsBatch.ProtoReflect.Descriptor instead.
|
||||
func (*CDCEventsBatch) Descriptor() ([]byte, []int) {
|
||||
return file_command_proto_rawDescGZIP(), []int{22}
|
||||
}
|
||||
|
||||
func (x *CDCEventsBatch) GetPayload() []*CDCEvents {
|
||||
if x != nil {
|
||||
return x.Payload
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type UpdateHookEvent struct {
|
||||
state protoimpl.MessageState `protogen:"open.v1"`
|
||||
Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"`
|
||||
@@ -1827,7 +1871,7 @@ type UpdateHookEvent struct {
|
||||
|
||||
func (x *UpdateHookEvent) Reset() {
|
||||
*x = UpdateHookEvent{}
|
||||
mi := &file_command_proto_msgTypes[22]
|
||||
mi := &file_command_proto_msgTypes[23]
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
@@ -1839,7 +1883,7 @@ func (x *UpdateHookEvent) String() string {
|
||||
func (*UpdateHookEvent) ProtoMessage() {}
|
||||
|
||||
func (x *UpdateHookEvent) ProtoReflect() protoreflect.Message {
|
||||
mi := &file_command_proto_msgTypes[22]
|
||||
mi := &file_command_proto_msgTypes[23]
|
||||
if x != nil {
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
if ms.LoadMessageInfo() == nil {
|
||||
@@ -1852,7 +1896,7 @@ func (x *UpdateHookEvent) ProtoReflect() protoreflect.Message {
|
||||
|
||||
// Deprecated: Use UpdateHookEvent.ProtoReflect.Descriptor instead.
|
||||
func (*UpdateHookEvent) Descriptor() ([]byte, []int) {
|
||||
return file_command_proto_rawDescGZIP(), []int{22}
|
||||
return file_command_proto_rawDescGZIP(), []int{23}
|
||||
}
|
||||
|
||||
func (x *UpdateHookEvent) GetError() string {
|
||||
@@ -2022,10 +2066,12 @@ const file_command_proto_rawDesc = "" +
|
||||
"\n" +
|
||||
"\x06UPDATE\x10\x02\x12\n" +
|
||||
"\n" +
|
||||
"\x06DELETE\x10\x03\"D\n" +
|
||||
"\tCDCEvents\x12\f\n" +
|
||||
"\x01k\x18\x01 \x01(\x04R\x01k\x12)\n" +
|
||||
"\x06events\x18\x02 \x03(\v2\x11.command.CDCEventR\x06events\"\xc6\x01\n" +
|
||||
"\x06DELETE\x10\x03\"L\n" +
|
||||
"\tCDCEvents\x12\x14\n" +
|
||||
"\x05index\x18\x01 \x01(\x04R\x05index\x12)\n" +
|
||||
"\x06events\x18\x02 \x03(\v2\x11.command.CDCEventR\x06events\">\n" +
|
||||
"\x0eCDCEventsBatch\x12,\n" +
|
||||
"\apayload\x18\x01 \x03(\v2\x12.command.CDCEventsR\apayload\"\xc6\x01\n" +
|
||||
"\x0fUpdateHookEvent\x12\x14\n" +
|
||||
"\x05error\x18\x01 \x01(\tR\x05error\x122\n" +
|
||||
"\x02op\x18\x02 \x01(\x0e2\".command.UpdateHookEvent.OperationR\x02op\x12\x14\n" +
|
||||
@@ -2053,7 +2099,7 @@ func file_command_proto_rawDescGZIP() []byte {
|
||||
}
|
||||
|
||||
var file_command_proto_enumTypes = make([]protoimpl.EnumInfo, 5)
|
||||
var file_command_proto_msgTypes = make([]protoimpl.MessageInfo, 23)
|
||||
var file_command_proto_msgTypes = make([]protoimpl.MessageInfo, 24)
|
||||
var file_command_proto_goTypes = []any{
|
||||
(QueryRequest_Level)(0), // 0: command.QueryRequest.Level
|
||||
(BackupRequest_Format)(0), // 1: command.BackupRequest.Format
|
||||
@@ -2082,7 +2128,8 @@ var file_command_proto_goTypes = []any{
|
||||
(*CDCRow)(nil), // 24: command.CDCRow
|
||||
(*CDCEvent)(nil), // 25: command.CDCEvent
|
||||
(*CDCEvents)(nil), // 26: command.CDCEvents
|
||||
(*UpdateHookEvent)(nil), // 27: command.UpdateHookEvent
|
||||
(*CDCEventsBatch)(nil), // 27: command.CDCEventsBatch
|
||||
(*UpdateHookEvent)(nil), // 28: command.UpdateHookEvent
|
||||
}
|
||||
var file_command_proto_depIdxs = []int32{
|
||||
5, // 0: command.Statement.parameters:type_name -> command.Parameter
|
||||
@@ -2103,12 +2150,13 @@ var file_command_proto_depIdxs = []int32{
|
||||
24, // 15: command.CDCEvent.old_row:type_name -> command.CDCRow
|
||||
24, // 16: command.CDCEvent.new_row:type_name -> command.CDCRow
|
||||
25, // 17: command.CDCEvents.events:type_name -> command.CDCEvent
|
||||
4, // 18: command.UpdateHookEvent.op:type_name -> command.UpdateHookEvent.Operation
|
||||
19, // [19:19] is the sub-list for method output_type
|
||||
19, // [19:19] is the sub-list for method input_type
|
||||
19, // [19:19] is the sub-list for extension type_name
|
||||
19, // [19:19] is the sub-list for extension extendee
|
||||
0, // [0:19] is the sub-list for field type_name
|
||||
26, // 18: command.CDCEventsBatch.payload:type_name -> command.CDCEvents
|
||||
4, // 19: command.UpdateHookEvent.op:type_name -> command.UpdateHookEvent.Operation
|
||||
20, // [20:20] is the sub-list for method output_type
|
||||
20, // [20:20] is the sub-list for method input_type
|
||||
20, // [20:20] is the sub-list for extension type_name
|
||||
20, // [20:20] is the sub-list for extension extendee
|
||||
0, // [0:20] is the sub-list for field type_name
|
||||
}
|
||||
|
||||
func init() { file_command_proto_init() }
|
||||
@@ -2141,7 +2189,7 @@ func file_command_proto_init() {
|
||||
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
|
||||
RawDescriptor: unsafe.Slice(unsafe.StringData(file_command_proto_rawDesc), len(file_command_proto_rawDesc)),
|
||||
NumEnums: 5,
|
||||
NumMessages: 23,
|
||||
NumMessages: 24,
|
||||
NumExtensions: 0,
|
||||
NumServices: 0,
|
||||
},
|
||||
|
||||
@@ -173,10 +173,14 @@ message CDCEvent {
|
||||
}
|
||||
|
||||
message CDCEvents {
|
||||
uint64 k = 1;
|
||||
uint64 index = 1;
|
||||
repeated CDCEvent events = 2;
|
||||
}
|
||||
|
||||
message CDCEventsBatch {
|
||||
repeated CDCEvents payload = 1;
|
||||
}
|
||||
|
||||
message UpdateHookEvent {
|
||||
enum Operation {
|
||||
UNKNOWN = 0;
|
||||
|
||||
@@ -29,7 +29,7 @@ func NewCDCStreamer(out chan<- *command.CDCEvents) *CDCStreamer {
|
||||
func (s *CDCStreamer) Reset(k uint64) {
|
||||
s.pending = &command.CDCEvents{
|
||||
Events: make([]*command.CDCEvent, 0),
|
||||
K: k,
|
||||
Index: k,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -29,8 +29,8 @@ func Test_NewCDCStreamer_CommitEmpty(t *testing.T) {
|
||||
}
|
||||
|
||||
events := <-ch
|
||||
if events.K != 1234 {
|
||||
t.Fatalf("expected K value to be 1234, got %d", events.K)
|
||||
if events.Index != 1234 {
|
||||
t.Fatalf("expected index value to be 1234, got %d", events.Index)
|
||||
}
|
||||
if len(events.Events) != 0 {
|
||||
t.Fatalf("expected no events, got %d", len(events.Events))
|
||||
@@ -61,8 +61,8 @@ func Test_NewCDCStreamer_CommitOne(t *testing.T) {
|
||||
}
|
||||
|
||||
ev := <-ch
|
||||
if ev.K != 5678 {
|
||||
t.Fatalf("expected K value to be 5678, got %d", ev.K)
|
||||
if ev.Index != 5678 {
|
||||
t.Fatalf("expected index value to be 5678, got %d", ev.Index)
|
||||
}
|
||||
if len(ev.Events) != 1 {
|
||||
t.Fatalf("expected 1 event, got %d", len(ev.Events))
|
||||
@@ -103,8 +103,8 @@ func Test_NewCDCStreamer_CommitTwo(t *testing.T) {
|
||||
}
|
||||
|
||||
ev := <-ch
|
||||
if ev.K != 9012 {
|
||||
t.Fatalf("expected K value to be 9012, got %d", ev.K)
|
||||
if ev.Index != 9012 {
|
||||
t.Fatalf("expected index value to be 9012, got %d", ev.GetIndex())
|
||||
}
|
||||
if len(ev.Events) != 2 {
|
||||
t.Fatalf("expected 2 events, got %d", len(ev.Events))
|
||||
@@ -158,8 +158,8 @@ func Test_NewCDCStreamer_ResetThenPreupdate(t *testing.T) {
|
||||
}
|
||||
|
||||
ev := <-ch
|
||||
if ev.K != 5678 {
|
||||
t.Fatalf("expected K value to be 5678, got %d", ev.K)
|
||||
if ev.Index != 5678 {
|
||||
t.Fatalf("expected K value to be 5678, got %d", ev.Index)
|
||||
}
|
||||
if len(ev.Events) != 1 {
|
||||
t.Fatalf("expected 1 event, got %d", len(ev.Events))
|
||||
|
||||
Reference in New Issue
Block a user