Record commit timestamp for CDC events

This commit is contained in:
Philip O'Toole
2025-09-01 21:00:54 -04:00
committed by GitHub
parent d606b00c57
commit f31d334fdb
6 changed files with 35 additions and 19 deletions

View File

@@ -15,6 +15,8 @@
- [PR #2286](https://github.com/rqlite/rqlite/pull/2286): Actually start CDC service.
- [PR #2287](https://github.com/rqlite/rqlite/pull/2287): Simple CDC end-to-end test.
- [PR #2288](https://github.com/rqlite/rqlite/pull/2288): Ensure sensible CDC defaults.
- [PR #2292](https://github.com/rqlite/rqlite/pull/2292): Improve end-to-end CDC testing.
- [PR #2297](https://github.com/rqlite/rqlite/pull/2297): Record commit timestamp for CDC events.
- [PR #2298](https://github.com/rqlite/rqlite/pull/2298), [PR #2299](https://github.com/rqlite/rqlite/pull/2299), [PR #2300](https://github.com/rqlite/rqlite/pull/2301), [PR #2301](https://github.com/rqlite/rqlite/pull/2301): CDC regex filtering for table names.
## v8.43.4 (August 27th 2025)

View File

@@ -2,7 +2,6 @@ package cdc
import (
"encoding/json"
"time"
"github.com/rqlite/rqlite/v8/command/proto"
)
@@ -12,13 +11,13 @@ type CDCMessagesEnvelope struct {
ServiceID string `json:"service_id,omitempty"`
NodeID string `json:"node_id"`
Payload []*CDCMessage `json:"payload"`
Timestamp int64 `json:"ts_ns,omitempty"`
}
// CDCMessage represents a single CDC message containing an index and a list of events.
type CDCMessage struct {
Index uint64 `json:"index"`
Events []*CDCMessageEvent `json:"events"`
Index uint64 `json:"index"`
Timestamp int64 `json:"ts_ns,omitempty"`
Events []*CDCMessageEvent `json:"events"`
}
// CDCMessageEvent represents a single CDC event within a CDC message.
@@ -42,15 +41,15 @@ func MarshalToEnvelopeJSON(serviceID, nodeID string, ts bool, evs []*proto.CDCIn
NodeID: nodeID,
Payload: make([]*CDCMessage, len(evs)),
}
if ts {
envelope.Timestamp = time.Now().UnixNano()
}
for i, ev := range evs {
envelope.Payload[i] = &CDCMessage{
Index: ev.Index,
Events: make([]*CDCMessageEvent, len(ev.Events)),
}
if ts {
envelope.Payload[i].Timestamp = ev.CommitTimestamp
}
for j, event := range ev.Events {
envelope.Payload[i].Events[j] = &CDCMessageEvent{

View File

@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.36.7
// protoc v3.21.12
// protoc-gen-go v1.36.8
// protoc v3.6.1
// source: message.proto
package proto

View File

@@ -1,6 +1,6 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.36.6
// protoc-gen-go v1.36.8
// protoc v3.6.1
// source: command.proto
@@ -1835,11 +1835,12 @@ func (x *CDCEvent) GetNewRow() *CDCRow {
}
type CDCIndexedEventGroup struct {
state protoimpl.MessageState `protogen:"open.v1"`
Index uint64 `protobuf:"varint,1,opt,name=index,proto3" json:"index,omitempty"`
Events []*CDCEvent `protobuf:"bytes,2,rep,name=events,proto3" json:"events,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
state protoimpl.MessageState `protogen:"open.v1"`
Index uint64 `protobuf:"varint,1,opt,name=index,proto3" json:"index,omitempty"`
CommitTimestamp int64 `protobuf:"varint,2,opt,name=commit_timestamp,json=commitTimestamp,proto3" json:"commit_timestamp,omitempty"`
Events []*CDCEvent `protobuf:"bytes,3,rep,name=events,proto3" json:"events,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *CDCIndexedEventGroup) Reset() {
@@ -1879,6 +1880,13 @@ func (x *CDCIndexedEventGroup) GetIndex() uint64 {
return 0
}
func (x *CDCIndexedEventGroup) GetCommitTimestamp() int64 {
if x != nil {
return x.CommitTimestamp
}
return 0
}
func (x *CDCIndexedEventGroup) GetEvents() []*CDCEvent {
if x != nil {
return x.Events
@@ -2187,10 +2195,11 @@ const file_command_proto_rawDesc = "" +
"\n" +
"\x06UPDATE\x10\x02\x12\n" +
"\n" +
"\x06DELETE\x10\x03\"W\n" +
"\x06DELETE\x10\x03\"\x82\x01\n" +
"\x14CDCIndexedEventGroup\x12\x14\n" +
"\x05index\x18\x01 \x01(\x04R\x05index\x12)\n" +
"\x06events\x18\x02 \x03(\v2\x11.command.CDCEventR\x06events\"T\n" +
"\x10commit_timestamp\x18\x02 \x01(\x03R\x0fcommitTimestamp\x12)\n" +
"\x06events\x18\x03 \x03(\v2\x11.command.CDCEventR\x06events\"T\n" +
"\x19CDCIndexedEventGroupBatch\x127\n" +
"\apayload\x18\x01 \x03(\v2\x1d.command.CDCIndexedEventGroupR\apayload\"\xc6\x01\n" +
"\x0fUpdateHookEvent\x12\x14\n" +

View File

@@ -182,7 +182,8 @@ message CDCEvent {
message CDCIndexedEventGroup {
uint64 index = 1;
repeated CDCEvent events = 2;
int64 commit_timestamp = 2;
repeated CDCEvent events = 3;
}
message CDCIndexedEventGroupBatch {

View File

@@ -1,6 +1,10 @@
package db
import command "github.com/rqlite/rqlite/v8/command/proto"
import (
"time"
command "github.com/rqlite/rqlite/v8/command/proto"
)
// CDCStreamer is a CDC streamer that collects events and sends them
// to a channel when the commit hook is called. It is used to stream
@@ -49,6 +53,7 @@ func (s *CDCStreamer) PreupdateHook(ev *command.CDCEvent) error {
// CommitHook is called after the transaction is committed. It sends the
// pending events to the out channel and clears the pending events.
func (s *CDCStreamer) CommitHook() bool {
s.pending.CommitTimestamp = time.Now().UnixNano()
select {
case s.out <- s.pending:
default: