Initial Store-CDCService integration

This commit is contained in:
Philip O'Toole
2025-08-16 16:26:43 -04:00
committed by GitHub
parent 11b8b90868
commit 6c8721c03a
7 changed files with 178 additions and 11 deletions

View File

@@ -2,6 +2,7 @@
### Implementation changes and bug fixes
- [PR #2220](https://github.com/rqlite/rqlite/pull/2220): Allow `AppendEntriesRequest` to carry application-specific messages.
- [PR #2221](https://github.com/rqlite/rqlite/pull/2221): Improve the CDC HTTP POST event envelope.
- [PR #2226](https://github.com/rqlite/rqlite/pull/2226): Initial integration between Store and CDC Service.
## v8.43.3 (August 14th 2025)
### Implementation changes and bug fixes

View File

@@ -56,9 +56,6 @@ type Cluster interface {
// SetHighWatermark sets the high watermark across the cluster.
SetHighWatermark(value uint64) error
// GetHighWatermark retrieves the high watermark for the cluster.
GetHighWatermark() (uint64, error)
}
// Service is a CDC service that reads events from a channel and processes them.

View File

@@ -531,11 +531,6 @@ func (m *mockCluster) SetHighWatermark(value uint64) error {
return nil
}
func (m *mockCluster) GetHighWatermark() (uint64, error) {
// Mock implementation does nothing.
return 0, nil
}
func pollExpvarUntil(t *testing.T, name string, expected int64, timeout time.Duration) {
t.Helper()
ticker := time.NewTicker(10 * time.Millisecond)

View File

@@ -172,6 +172,18 @@ func UnmarshalLoadChunkRequest(b []byte, lr *proto.LoadChunkRequest) error {
return pb.Unmarshal(b, lr)
}
// MarshalAppendEntriesExtension marshals an AppendEntriesExtension
// object into a byte slice.
func MarshalAppendEntriesExtension(ext *proto.AppendEntriesExtension) ([]byte, error) {
return pb.Marshal(ext)
}
// UnmarshalAppendEntriesExtension unmarshals a byte slice into an
// AppendEntriesExtension object.
func UnmarshalAppendEntriesExtension(b []byte, ext *proto.AppendEntriesExtension) error {
return pb.Unmarshal(b, ext)
}
// UnmarshalSubCommand unmarshals a sub command m. It assumes that
// m is the correct type.
func UnmarshalSubCommand(c *proto.Command, m pb.Message) error {

View File

@@ -1998,6 +1998,50 @@ func (x *UpdateHookEvent) GetRowId() int64 {
return 0
}
type AppendEntriesExtension struct {
state protoimpl.MessageState `protogen:"open.v1"`
CdcHWM uint64 `protobuf:"varint,1,opt,name=cdcHWM,proto3" json:"cdcHWM,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *AppendEntriesExtension) Reset() {
*x = AppendEntriesExtension{}
mi := &file_command_proto_msgTypes[25]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *AppendEntriesExtension) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*AppendEntriesExtension) ProtoMessage() {}
func (x *AppendEntriesExtension) ProtoReflect() protoreflect.Message {
mi := &file_command_proto_msgTypes[25]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use AppendEntriesExtension.ProtoReflect.Descriptor instead.
func (*AppendEntriesExtension) Descriptor() ([]byte, []int) {
return file_command_proto_rawDescGZIP(), []int{25}
}
func (x *AppendEntriesExtension) GetCdcHWM() uint64 {
if x != nil {
return x.CdcHWM
}
return 0
}
var File_command_proto protoreflect.FileDescriptor
const file_command_proto_rawDesc = "" +
@@ -2161,7 +2205,9 @@ const file_command_proto_rawDesc = "" +
"\n" +
"\x06UPDATE\x10\x02\x12\n" +
"\n" +
"\x06DELETE\x10\x03B+Z)github.com/rqlite/rqlite/v8/command/protob\x06proto3"
"\x06DELETE\x10\x03\"0\n" +
"\x16AppendEntriesExtension\x12\x16\n" +
"\x06cdcHWM\x18\x01 \x01(\x04R\x06cdcHWMB+Z)github.com/rqlite/rqlite/v8/command/protob\x06proto3"
var (
file_command_proto_rawDescOnce sync.Once
@@ -2176,7 +2222,7 @@ func file_command_proto_rawDescGZIP() []byte {
}
var file_command_proto_enumTypes = make([]protoimpl.EnumInfo, 5)
var file_command_proto_msgTypes = make([]protoimpl.MessageInfo, 25)
var file_command_proto_msgTypes = make([]protoimpl.MessageInfo, 26)
var file_command_proto_goTypes = []any{
(QueryRequest_Level)(0), // 0: command.QueryRequest.Level
(BackupRequest_Format)(0), // 1: command.BackupRequest.Format
@@ -2208,6 +2254,7 @@ var file_command_proto_goTypes = []any{
(*CDCIndexedEventGroup)(nil), // 27: command.CDCIndexedEventGroup
(*CDCIndexedEventGroupBatch)(nil), // 28: command.CDCIndexedEventGroupBatch
(*UpdateHookEvent)(nil), // 29: command.UpdateHookEvent
(*AppendEntriesExtension)(nil), // 30: command.AppendEntriesExtension
}
var file_command_proto_depIdxs = []int32{
5, // 0: command.Statement.parameters:type_name -> command.Parameter
@@ -2267,7 +2314,7 @@ func file_command_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: unsafe.Slice(unsafe.StringData(file_command_proto_rawDesc), len(file_command_proto_rawDesc)),
NumEnums: 5,
NumMessages: 25,
NumMessages: 26,
NumExtensions: 0,
NumServices: 0,
},

View File

@@ -201,3 +201,7 @@ message UpdateHookEvent {
string table = 3;
int64 row_id = 4;
}
message AppendEntriesExtension {
uint64 cdcHWM = 1;
}

111
store/cdc_cluster.go Normal file
View File

@@ -0,0 +1,111 @@
package store
import (
"sync"
"sync/atomic"
"github.com/hashicorp/raft"
"github.com/rqlite/rqlite/v8/command"
"github.com/rqlite/rqlite/v8/command/proto"
)
// CDCCluster is a struct that wraps a Store and implements the functionality
// needed by the CDC service.
type CDCCluster struct {
str *Store
hwm atomic.Uint64
prevHWM atomic.Uint64
hwmObserversMu sync.RWMutex
hwmObservers []chan<- uint64
}
// NewCDCCluster creates a new CDCCluster instance.
func NewCDCCluster(s *Store) *CDCCluster {
c := &CDCCluster{
str: s,
hwm: atomic.Uint64{},
hwmObservers: make([]chan<- uint64, 0),
}
s.raftTn.SetAppendEntriesTxHandler(c.appendEntriesTxHandler)
s.raftTn.SetAppendEntriesRxHandler(c.appendEntriesRxHandler)
return c
}
// SetHighWatermark sets the high watermark for the CDC service.
func (c *CDCCluster) SetHighWatermark(value uint64) error {
c.hwm.Store(value)
return nil
}
// RegisterLeaderChange registers a channel to receive notifications
// when the leader changes in the cluster.
func (c *CDCCluster) RegisterLeaderChange(ch chan<- bool) {
c.str.RegisterLeaderChange(ch)
}
// RegisterHWMChange registers the given channel which will
// receive a notification when the high watermark changes.
func (c *CDCCluster) RegisterHWMChange(ch chan<- uint64) {
c.hwmObserversMu.Lock()
defer c.hwmObserversMu.Unlock()
c.hwmObservers = append(c.hwmObservers, ch)
}
func (c *CDCCluster) appendEntriesTxHandler(req *raft.AppendEntriesRequest) (retErr error) {
hwm := c.hwm.Load()
if c.prevHWM.Load() == hwm {
return nil
}
defer func() {
if retErr == nil {
c.prevHWM.Store(hwm)
}
}()
ex := proto.AppendEntriesExtension{
CdcHWM: hwm,
}
b, err := command.MarshalAppendEntriesExtension(&ex)
if err != nil {
return err
}
if len(req.Entries) > 0 {
req.Entries[len(req.Entries)-1].Extensions = b
} else {
req.Entries = append(req.Entries, &raft.Log{
Extensions: b,
})
}
return nil
}
func (c *CDCCluster) appendEntriesRxHandler(req *raft.AppendEntriesRequest) error {
if len(req.Entries) == 0 {
return nil
}
// Only the last log will carry an extension.
lastLog := req.Entries[len(req.Entries)-1]
if lastLog.Extensions == nil {
return nil
}
var ex proto.AppendEntriesExtension
if err := command.UnmarshalAppendEntriesExtension(lastLog.Extensions, &ex); err != nil {
return err
}
c.hwmObserversMu.RLock()
for _, ch := range c.hwmObservers {
select {
case ch <- ex.CdcHWM:
default:
// Avoid blocking the AppendEntries handler.
}
}
c.hwmObserversMu.RUnlock()
return nil
}