Implement HighwaterMarkUpdate broadcast

This commit is contained in:
Copilot
2025-08-19 13:55:41 -04:00
committed by GitHub
parent 3bc13402f2
commit 4626585aca
7 changed files with 611 additions and 90 deletions

View File

@@ -3,6 +3,7 @@
- [PR #2220](https://github.com/rqlite/rqlite/pull/2220): Allow `AppendEntriesRequest` to carry application-specific messages.
- [PR #2221](https://github.com/rqlite/rqlite/pull/2221): Improve the CDC HTTP POST event envelope.
- [PR #2226](https://github.com/rqlite/rqlite/pull/2226), [PR #2228](https://github.com/rqlite/rqlite/pull/2228): Initial integration between Store and CDC Service.
- [PR #2231](https://github.com/rqlite/rqlite/pull/2231): Implement cluster-wide broadcast of CDC HWM, fixes issue [#2230](https://github.com/rqlite/rqlite/issues/2230).
## v8.43.3 (August 14th 2025)
### Implementation changes and bug fixes

View File

@@ -522,6 +522,115 @@ func (c *Client) Join(jr *command.JoinRequest, nodeAddr string, creds *proto.Cre
}
}
// BroadcastHWM performs a broadcast to all specified nodes.
func (c *Client) BroadcastHWM(hwm uint64, retries int, timeout time.Duration, nodeAddr ...string) (map[string]*proto.HighwaterMarkUpdateResponse, error) {
if len(nodeAddr) == 0 {
return map[string]*proto.HighwaterMarkUpdateResponse{}, nil
}
// Get local node address for the broadcast request
c.localMu.RLock()
localAddr := c.localNodeAddr
c.localMu.RUnlock()
// Create the broadcast request
br := &proto.HighwaterMarkUpdateRequest{
NodeId: localAddr,
HighwaterMark: hwm,
}
// Channel to collect results
type result struct {
resp *proto.HighwaterMarkUpdateResponse
addr string
err error
}
resultChan := make(chan result, len(nodeAddr))
// Launch goroutines for parallel requests
for _, addr := range nodeAddr {
go func(nodeAddress string) {
// Create the command
command := &proto.Command{
Type: proto.Command_COMMAND_TYPE_HIGHWATER_MARK_UPDATE,
Request: &proto.Command_HighwaterMarkUpdateRequest{
HighwaterMarkUpdateRequest: br,
},
}
// Attempt with retries
var lastErr error
for attempt := 0; attempt <= retries; attempt++ {
conn, err := c.dial(nodeAddress)
if err != nil {
lastErr = err
continue
}
// Write command
if err := writeCommand(conn, command, timeout); err != nil {
conn.Close()
handleConnError(conn)
lastErr = err
continue
}
// Read response
p, err := readResponse(conn, timeout)
conn.Close()
if err != nil {
handleConnError(conn)
lastErr = err
continue
}
// Parse response
resp := &proto.HighwaterMarkUpdateResponse{}
if err := pb.Unmarshal(p, resp); err != nil {
lastErr = err
continue
}
// Success
resultChan <- result{resp: resp, addr: nodeAddress, err: nil}
return
}
// All retries failed
resultChan <- result{resp: nil, addr: nodeAddress, err: lastErr}
}(addr)
}
// Collect results with timeout
responses := make(map[string]*proto.HighwaterMarkUpdateResponse)
collected := 0
timeoutChan := time.After(timeout)
for collected < len(nodeAddr) {
select {
case res := <-resultChan:
if res.err != nil {
responses[res.addr] = &proto.HighwaterMarkUpdateResponse{Error: res.err.Error()}
} else {
responses[res.addr] = res.resp
}
collected++
case <-timeoutChan:
// Timeout reached, fill remaining responses with timeout errors
for _, addr := range nodeAddr {
if _, exists := responses[addr]; !exists {
responses[addr] = &proto.HighwaterMarkUpdateResponse{Error: "timeout"}
}
}
collected = len(nodeAddr)
}
}
return responses, nil
}
// Stats returns stats on the Client instance
func (c *Client) Stats() (map[string]any, error) {
c.poolMu.RLock()

View File

@@ -368,3 +368,153 @@ func (s *simpleDialer) Dial(address string, timeout time.Duration) (net.Conn, er
}
return conn, nil
}
func Test_ClientBroadcast(t *testing.T) {
srv := servicetest.NewService()
srv.Handler = func(conn net.Conn) {
var p []byte
var err error
c := readCommand(conn)
if c == nil {
// Error on connection, so give up, as normal
// test exit can cause that too.
return
}
if c.Type != proto.Command_COMMAND_TYPE_HIGHWATER_MARK_UPDATE {
t.Fatalf("unexpected command type: %d", c.Type)
}
br := c.GetHighwaterMarkUpdateRequest()
if br == nil {
t.Fatal("expected highwater mark update request, got nil")
}
if br.NodeId != "node1" {
t.Fatalf("unexpected node_id, got %s", br.NodeId)
}
if br.HighwaterMark != 12345 {
t.Fatalf("unexpected highwater_mark, got %d", br.HighwaterMark)
}
p, err = pb.Marshal(&proto.HighwaterMarkUpdateResponse{})
if err != nil {
conn.Close()
}
writeBytesWithLength(conn, p)
}
srv.Start()
defer srv.Close()
c := NewClient(&simpleDialer{}, 0)
c.SetLocal("node1", nil) // Set local node address to match test expectation
responses, err := c.BroadcastHWM(12345, 0, time.Second, srv.Addr())
if err != nil {
t.Fatal(err)
}
if len(responses) != 1 {
t.Fatalf("expected 1 response, got %d", len(responses))
}
resp, exists := responses[srv.Addr()]
if !exists {
t.Fatalf("response for %s not found", srv.Addr())
}
if resp.Error != "" {
t.Fatalf("unexpected error in response: %s", resp.Error)
}
}
func Test_ClientBroadcast_MultipleNodes(t *testing.T) {
// Create multiple test services
srv1 := servicetest.NewService()
srv2 := servicetest.NewService()
handler := func(conn net.Conn) {
var p []byte
var err error
c := readCommand(conn)
if c == nil {
return
}
if c.Type != proto.Command_COMMAND_TYPE_HIGHWATER_MARK_UPDATE {
t.Fatalf("unexpected command type: %d", c.Type)
}
p, err = pb.Marshal(&proto.HighwaterMarkUpdateResponse{})
if err != nil {
conn.Close()
}
writeBytesWithLength(conn, p)
}
srv1.Handler = handler
srv2.Handler = handler
srv1.Start()
srv2.Start()
defer srv1.Close()
defer srv2.Close()
c := NewClient(&simpleDialer{}, 0)
c.SetLocal("test-node", nil) // Set local node address to match test expectation
responses, err := c.BroadcastHWM(999, 0, time.Second, srv1.Addr(), srv2.Addr())
if err != nil {
t.Fatal(err)
}
if len(responses) != 2 {
t.Fatalf("expected 2 responses, got %d", len(responses))
}
for addr, resp := range responses {
if resp.Error != "" {
t.Fatalf("unexpected error in response from %s: %s", addr, resp.Error)
}
}
}
func Test_ClientBroadcast_EmptyNodeList(t *testing.T) {
c := NewClient(&simpleDialer{}, 0)
responses, err := c.BroadcastHWM(1, 0, time.Second)
if err != nil {
t.Fatal(err)
}
if len(responses) != 0 {
t.Fatalf("expected 0 responses, got %d", len(responses))
}
}
func Test_ClientBroadcast_WithError(t *testing.T) {
srv := servicetest.NewService()
srv.Handler = func(conn net.Conn) {
var p []byte
var err error
c := readCommand(conn)
if c == nil {
return
}
if c.Type != proto.Command_COMMAND_TYPE_HIGHWATER_MARK_UPDATE {
t.Fatalf("unexpected command type: %d", c.Type)
}
p, err = pb.Marshal(&proto.HighwaterMarkUpdateResponse{Error: "test error"})
if err != nil {
conn.Close()
}
writeBytesWithLength(conn, p)
}
srv.Start()
defer srv.Close()
c := NewClient(&simpleDialer{}, 0)
c.SetLocal("node1", nil) // Set local node address to match test expectation
responses, err := c.BroadcastHWM(12345, 0, time.Second, srv.Addr())
if err != nil {
t.Fatal(err)
}
if len(responses) != 1 {
t.Fatalf("expected 1 response, got %d", len(responses))
}
resp, exists := responses[srv.Addr()]
if !exists {
t.Fatalf("response for %s not found", srv.Addr())
}
if resp.Error != "test error" {
t.Fatalf("expected 'test error', got '%s'", resp.Error)
}
}

View File

@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.36.6
// protoc v3.6.1
// protoc-gen-go v1.36.7
// protoc v3.21.12
// source: message.proto
package proto
@@ -25,19 +25,20 @@ const (
type Command_Type int32
const (
Command_COMMAND_TYPE_UNKNOWN Command_Type = 0
Command_COMMAND_TYPE_GET_NODE_META Command_Type = 1
Command_COMMAND_TYPE_EXECUTE Command_Type = 2
Command_COMMAND_TYPE_QUERY Command_Type = 3
Command_COMMAND_TYPE_BACKUP Command_Type = 4
Command_COMMAND_TYPE_LOAD Command_Type = 5
Command_COMMAND_TYPE_REMOVE_NODE Command_Type = 6
Command_COMMAND_TYPE_NOTIFY Command_Type = 7
Command_COMMAND_TYPE_JOIN Command_Type = 8
Command_COMMAND_TYPE_REQUEST Command_Type = 9
Command_COMMAND_TYPE_LOAD_CHUNK Command_Type = 10
Command_COMMAND_TYPE_BACKUP_STREAM Command_Type = 11
Command_COMMAND_TYPE_STEPDOWN Command_Type = 12
Command_COMMAND_TYPE_UNKNOWN Command_Type = 0
Command_COMMAND_TYPE_GET_NODE_META Command_Type = 1
Command_COMMAND_TYPE_EXECUTE Command_Type = 2
Command_COMMAND_TYPE_QUERY Command_Type = 3
Command_COMMAND_TYPE_BACKUP Command_Type = 4
Command_COMMAND_TYPE_LOAD Command_Type = 5
Command_COMMAND_TYPE_REMOVE_NODE Command_Type = 6
Command_COMMAND_TYPE_NOTIFY Command_Type = 7
Command_COMMAND_TYPE_JOIN Command_Type = 8
Command_COMMAND_TYPE_REQUEST Command_Type = 9
Command_COMMAND_TYPE_LOAD_CHUNK Command_Type = 10
Command_COMMAND_TYPE_BACKUP_STREAM Command_Type = 11
Command_COMMAND_TYPE_STEPDOWN Command_Type = 12
Command_COMMAND_TYPE_HIGHWATER_MARK_UPDATE Command_Type = 13
)
// Enum value maps for Command_Type.
@@ -56,21 +57,23 @@ var (
10: "COMMAND_TYPE_LOAD_CHUNK",
11: "COMMAND_TYPE_BACKUP_STREAM",
12: "COMMAND_TYPE_STEPDOWN",
13: "COMMAND_TYPE_HIGHWATER_MARK_UPDATE",
}
Command_Type_value = map[string]int32{
"COMMAND_TYPE_UNKNOWN": 0,
"COMMAND_TYPE_GET_NODE_META": 1,
"COMMAND_TYPE_EXECUTE": 2,
"COMMAND_TYPE_QUERY": 3,
"COMMAND_TYPE_BACKUP": 4,
"COMMAND_TYPE_LOAD": 5,
"COMMAND_TYPE_REMOVE_NODE": 6,
"COMMAND_TYPE_NOTIFY": 7,
"COMMAND_TYPE_JOIN": 8,
"COMMAND_TYPE_REQUEST": 9,
"COMMAND_TYPE_LOAD_CHUNK": 10,
"COMMAND_TYPE_BACKUP_STREAM": 11,
"COMMAND_TYPE_STEPDOWN": 12,
"COMMAND_TYPE_UNKNOWN": 0,
"COMMAND_TYPE_GET_NODE_META": 1,
"COMMAND_TYPE_EXECUTE": 2,
"COMMAND_TYPE_QUERY": 3,
"COMMAND_TYPE_BACKUP": 4,
"COMMAND_TYPE_LOAD": 5,
"COMMAND_TYPE_REMOVE_NODE": 6,
"COMMAND_TYPE_NOTIFY": 7,
"COMMAND_TYPE_JOIN": 8,
"COMMAND_TYPE_REQUEST": 9,
"COMMAND_TYPE_LOAD_CHUNK": 10,
"COMMAND_TYPE_BACKUP_STREAM": 11,
"COMMAND_TYPE_STEPDOWN": 12,
"COMMAND_TYPE_HIGHWATER_MARK_UPDATE": 13,
}
)
@@ -228,6 +231,7 @@ type Command struct {
// *Command_ExecuteQueryRequest
// *Command_LoadChunkRequest
// *Command_StepdownRequest
// *Command_HighwaterMarkUpdateRequest
Request isCommand_Request `protobuf_oneof:"request"`
Credentials *Credentials `protobuf:"bytes,4,opt,name=credentials,proto3" json:"credentials,omitempty"`
unknownFields protoimpl.UnknownFields
@@ -368,6 +372,15 @@ func (x *Command) GetStepdownRequest() *proto.StepdownRequest {
return nil
}
func (x *Command) GetHighwaterMarkUpdateRequest() *HighwaterMarkUpdateRequest {
if x != nil {
if x, ok := x.Request.(*Command_HighwaterMarkUpdateRequest); ok {
return x.HighwaterMarkUpdateRequest
}
}
return nil
}
func (x *Command) GetCredentials() *Credentials {
if x != nil {
return x.Credentials
@@ -419,6 +432,10 @@ type Command_StepdownRequest struct {
StepdownRequest *proto.StepdownRequest `protobuf:"bytes,12,opt,name=stepdown_request,json=stepdownRequest,proto3,oneof"`
}
type Command_HighwaterMarkUpdateRequest struct {
HighwaterMarkUpdateRequest *HighwaterMarkUpdateRequest `protobuf:"bytes,13,opt,name=highwater_mark_update_request,json=highwaterMarkUpdateRequest,proto3,oneof"`
}
func (*Command_ExecuteRequest) isCommand_Request() {}
func (*Command_QueryRequest) isCommand_Request() {}
@@ -439,6 +456,8 @@ func (*Command_LoadChunkRequest) isCommand_Request() {}
func (*Command_StepdownRequest) isCommand_Request() {}
func (*Command_HighwaterMarkUpdateRequest) isCommand_Request() {}
type CommandExecuteResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"`
@@ -943,6 +962,102 @@ func (x *CommandStepdownResponse) GetError() string {
return ""
}
type HighwaterMarkUpdateRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
NodeId string `protobuf:"bytes,1,opt,name=node_id,json=nodeId,proto3" json:"node_id,omitempty"`
HighwaterMark uint64 `protobuf:"varint,2,opt,name=highwater_mark,json=highwaterMark,proto3" json:"highwater_mark,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *HighwaterMarkUpdateRequest) Reset() {
*x = HighwaterMarkUpdateRequest{}
mi := &file_message_proto_msgTypes[13]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *HighwaterMarkUpdateRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*HighwaterMarkUpdateRequest) ProtoMessage() {}
func (x *HighwaterMarkUpdateRequest) ProtoReflect() protoreflect.Message {
mi := &file_message_proto_msgTypes[13]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use HighwaterMarkUpdateRequest.ProtoReflect.Descriptor instead.
func (*HighwaterMarkUpdateRequest) Descriptor() ([]byte, []int) {
return file_message_proto_rawDescGZIP(), []int{13}
}
func (x *HighwaterMarkUpdateRequest) GetNodeId() string {
if x != nil {
return x.NodeId
}
return ""
}
func (x *HighwaterMarkUpdateRequest) GetHighwaterMark() uint64 {
if x != nil {
return x.HighwaterMark
}
return 0
}
type HighwaterMarkUpdateResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *HighwaterMarkUpdateResponse) Reset() {
*x = HighwaterMarkUpdateResponse{}
mi := &file_message_proto_msgTypes[14]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *HighwaterMarkUpdateResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*HighwaterMarkUpdateResponse) ProtoMessage() {}
func (x *HighwaterMarkUpdateResponse) ProtoReflect() protoreflect.Message {
mi := &file_message_proto_msgTypes[14]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use HighwaterMarkUpdateResponse.ProtoReflect.Descriptor instead.
func (*HighwaterMarkUpdateResponse) Descriptor() ([]byte, []int) {
return file_message_proto_rawDescGZIP(), []int{14}
}
func (x *HighwaterMarkUpdateResponse) GetError() string {
if x != nil {
return x.Error
}
return ""
}
var File_message_proto protoreflect.FileDescriptor
const file_message_proto_rawDesc = "" +
@@ -954,7 +1069,8 @@ const file_message_proto_rawDesc = "" +
"\bNodeMeta\x12\x10\n" +
"\x03url\x18\x01 \x01(\tR\x03url\x12!\n" +
"\fcommit_index\x18\x02 \x01(\x04R\vcommitIndex\x12\x18\n" +
"\aversion\x18\x03 \x01(\tR\aversion\"\x8a\t\n" +
"\aversion\x18\x03 \x01(\tR\aversion\"\x9c\n" +
"\n" +
"\aCommand\x12)\n" +
"\x04type\x18\x01 \x01(\x0e2\x15.cluster.Command.TypeR\x04type\x12B\n" +
"\x0fexecute_request\x18\x02 \x01(\v2\x17.command.ExecuteRequestH\x00R\x0eexecuteRequest\x12<\n" +
@@ -967,8 +1083,9 @@ const file_message_proto_rawDesc = "" +
"\x15execute_query_request\x18\n" +
" \x01(\v2\x1c.command.ExecuteQueryRequestH\x00R\x13executeQueryRequest\x12I\n" +
"\x12load_chunk_request\x18\v \x01(\v2\x19.command.LoadChunkRequestH\x00R\x10loadChunkRequest\x12E\n" +
"\x10stepdown_request\x18\f \x01(\v2\x18.command.StepdownRequestH\x00R\x0fstepdownRequest\x126\n" +
"\vcredentials\x18\x04 \x01(\v2\x14.cluster.CredentialsR\vcredentials\"\xe2\x02\n" +
"\x10stepdown_request\x18\f \x01(\v2\x18.command.StepdownRequestH\x00R\x0fstepdownRequest\x12h\n" +
"\x1dhighwater_mark_update_request\x18\r \x01(\v2#.cluster.HighwaterMarkUpdateRequestH\x00R\x1ahighwaterMarkUpdateRequest\x126\n" +
"\vcredentials\x18\x04 \x01(\v2\x14.cluster.CredentialsR\vcredentials\"\x8a\x03\n" +
"\x04Type\x12\x18\n" +
"\x14COMMAND_TYPE_UNKNOWN\x10\x00\x12\x1e\n" +
"\x1aCOMMAND_TYPE_GET_NODE_META\x10\x01\x12\x18\n" +
@@ -983,7 +1100,8 @@ const file_message_proto_rawDesc = "" +
"\x17COMMAND_TYPE_LOAD_CHUNK\x10\n" +
"\x12\x1e\n" +
"\x1aCOMMAND_TYPE_BACKUP_STREAM\x10\v\x12\x19\n" +
"\x15COMMAND_TYPE_STEPDOWN\x10\fB\t\n" +
"\x15COMMAND_TYPE_STEPDOWN\x10\f\x12&\n" +
"\"COMMAND_TYPE_HIGHWATER_MARK_UPDATE\x10\rB\t\n" +
"\arequest\"\x87\x01\n" +
"\x16CommandExecuteResponse\x12\x14\n" +
"\x05error\x18\x01 \x01(\tR\x05error\x129\n" +
@@ -1012,6 +1130,11 @@ const file_message_proto_rawDesc = "" +
"\x05error\x18\x01 \x01(\tR\x05error\x12\x16\n" +
"\x06leader\x18\x02 \x01(\tR\x06leader\"/\n" +
"\x17CommandStepdownResponse\x12\x14\n" +
"\x05error\x18\x01 \x01(\tR\x05error\"\\\n" +
"\x1aHighwaterMarkUpdateRequest\x12\x17\n" +
"\anode_id\x18\x01 \x01(\tR\x06nodeId\x12%\n" +
"\x0ehighwater_mark\x18\x02 \x01(\x04R\rhighwaterMark\"3\n" +
"\x1bHighwaterMarkUpdateResponse\x12\x14\n" +
"\x05error\x18\x01 \x01(\tR\x05errorB+Z)github.com/rqlite/rqlite/v8/cluster/protob\x06proto3"
var (
@@ -1027,56 +1150,59 @@ func file_message_proto_rawDescGZIP() []byte {
}
var file_message_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
var file_message_proto_msgTypes = make([]protoimpl.MessageInfo, 13)
var file_message_proto_msgTypes = make([]protoimpl.MessageInfo, 15)
var file_message_proto_goTypes = []any{
(Command_Type)(0), // 0: cluster.Command.Type
(*Credentials)(nil), // 1: cluster.Credentials
(*NodeMeta)(nil), // 2: cluster.NodeMeta
(*Command)(nil), // 3: cluster.Command
(*CommandExecuteResponse)(nil), // 4: cluster.CommandExecuteResponse
(*CommandQueryResponse)(nil), // 5: cluster.CommandQueryResponse
(*CommandRequestResponse)(nil), // 6: cluster.CommandRequestResponse
(*CommandBackupResponse)(nil), // 7: cluster.CommandBackupResponse
(*CommandLoadResponse)(nil), // 8: cluster.CommandLoadResponse
(*CommandLoadChunkResponse)(nil), // 9: cluster.CommandLoadChunkResponse
(*CommandRemoveNodeResponse)(nil), // 10: cluster.CommandRemoveNodeResponse
(*CommandNotifyResponse)(nil), // 11: cluster.CommandNotifyResponse
(*CommandJoinResponse)(nil), // 12: cluster.CommandJoinResponse
(*CommandStepdownResponse)(nil), // 13: cluster.CommandStepdownResponse
(*proto.ExecuteRequest)(nil), // 14: command.ExecuteRequest
(*proto.QueryRequest)(nil), // 15: command.QueryRequest
(*proto.BackupRequest)(nil), // 16: command.BackupRequest
(*proto.LoadRequest)(nil), // 17: command.LoadRequest
(*proto.RemoveNodeRequest)(nil), // 18: command.RemoveNodeRequest
(*proto.NotifyRequest)(nil), // 19: command.NotifyRequest
(*proto.JoinRequest)(nil), // 20: command.JoinRequest
(*proto.ExecuteQueryRequest)(nil), // 21: command.ExecuteQueryRequest
(*proto.LoadChunkRequest)(nil), // 22: command.LoadChunkRequest
(*proto.StepdownRequest)(nil), // 23: command.StepdownRequest
(*proto.ExecuteQueryResponse)(nil), // 24: command.ExecuteQueryResponse
(*proto.QueryRows)(nil), // 25: command.QueryRows
(Command_Type)(0), // 0: cluster.Command.Type
(*Credentials)(nil), // 1: cluster.Credentials
(*NodeMeta)(nil), // 2: cluster.NodeMeta
(*Command)(nil), // 3: cluster.Command
(*CommandExecuteResponse)(nil), // 4: cluster.CommandExecuteResponse
(*CommandQueryResponse)(nil), // 5: cluster.CommandQueryResponse
(*CommandRequestResponse)(nil), // 6: cluster.CommandRequestResponse
(*CommandBackupResponse)(nil), // 7: cluster.CommandBackupResponse
(*CommandLoadResponse)(nil), // 8: cluster.CommandLoadResponse
(*CommandLoadChunkResponse)(nil), // 9: cluster.CommandLoadChunkResponse
(*CommandRemoveNodeResponse)(nil), // 10: cluster.CommandRemoveNodeResponse
(*CommandNotifyResponse)(nil), // 11: cluster.CommandNotifyResponse
(*CommandJoinResponse)(nil), // 12: cluster.CommandJoinResponse
(*CommandStepdownResponse)(nil), // 13: cluster.CommandStepdownResponse
(*HighwaterMarkUpdateRequest)(nil), // 14: cluster.HighwaterMarkUpdateRequest
(*HighwaterMarkUpdateResponse)(nil), // 15: cluster.HighwaterMarkUpdateResponse
(*proto.ExecuteRequest)(nil), // 16: command.ExecuteRequest
(*proto.QueryRequest)(nil), // 17: command.QueryRequest
(*proto.BackupRequest)(nil), // 18: command.BackupRequest
(*proto.LoadRequest)(nil), // 19: command.LoadRequest
(*proto.RemoveNodeRequest)(nil), // 20: command.RemoveNodeRequest
(*proto.NotifyRequest)(nil), // 21: command.NotifyRequest
(*proto.JoinRequest)(nil), // 22: command.JoinRequest
(*proto.ExecuteQueryRequest)(nil), // 23: command.ExecuteQueryRequest
(*proto.LoadChunkRequest)(nil), // 24: command.LoadChunkRequest
(*proto.StepdownRequest)(nil), // 25: command.StepdownRequest
(*proto.ExecuteQueryResponse)(nil), // 26: command.ExecuteQueryResponse
(*proto.QueryRows)(nil), // 27: command.QueryRows
}
var file_message_proto_depIdxs = []int32{
0, // 0: cluster.Command.type:type_name -> cluster.Command.Type
14, // 1: cluster.Command.execute_request:type_name -> command.ExecuteRequest
15, // 2: cluster.Command.query_request:type_name -> command.QueryRequest
16, // 3: cluster.Command.backup_request:type_name -> command.BackupRequest
17, // 4: cluster.Command.load_request:type_name -> command.LoadRequest
18, // 5: cluster.Command.remove_node_request:type_name -> command.RemoveNodeRequest
19, // 6: cluster.Command.notify_request:type_name -> command.NotifyRequest
20, // 7: cluster.Command.join_request:type_name -> command.JoinRequest
21, // 8: cluster.Command.execute_query_request:type_name -> command.ExecuteQueryRequest
22, // 9: cluster.Command.load_chunk_request:type_name -> command.LoadChunkRequest
23, // 10: cluster.Command.stepdown_request:type_name -> command.StepdownRequest
1, // 11: cluster.Command.credentials:type_name -> cluster.Credentials
24, // 12: cluster.CommandExecuteResponse.response:type_name -> command.ExecuteQueryResponse
25, // 13: cluster.CommandQueryResponse.rows:type_name -> command.QueryRows
24, // 14: cluster.CommandRequestResponse.response:type_name -> command.ExecuteQueryResponse
15, // [15:15] is the sub-list for method output_type
15, // [15:15] is the sub-list for method input_type
15, // [15:15] is the sub-list for extension type_name
15, // [15:15] is the sub-list for extension extendee
0, // [0:15] is the sub-list for field type_name
16, // 1: cluster.Command.execute_request:type_name -> command.ExecuteRequest
17, // 2: cluster.Command.query_request:type_name -> command.QueryRequest
18, // 3: cluster.Command.backup_request:type_name -> command.BackupRequest
19, // 4: cluster.Command.load_request:type_name -> command.LoadRequest
20, // 5: cluster.Command.remove_node_request:type_name -> command.RemoveNodeRequest
21, // 6: cluster.Command.notify_request:type_name -> command.NotifyRequest
22, // 7: cluster.Command.join_request:type_name -> command.JoinRequest
23, // 8: cluster.Command.execute_query_request:type_name -> command.ExecuteQueryRequest
24, // 9: cluster.Command.load_chunk_request:type_name -> command.LoadChunkRequest
25, // 10: cluster.Command.stepdown_request:type_name -> command.StepdownRequest
14, // 11: cluster.Command.highwater_mark_update_request:type_name -> cluster.HighwaterMarkUpdateRequest
1, // 12: cluster.Command.credentials:type_name -> cluster.Credentials
26, // 13: cluster.CommandExecuteResponse.response:type_name -> command.ExecuteQueryResponse
27, // 14: cluster.CommandQueryResponse.rows:type_name -> command.QueryRows
26, // 15: cluster.CommandRequestResponse.response:type_name -> command.ExecuteQueryResponse
16, // [16:16] is the sub-list for method output_type
16, // [16:16] is the sub-list for method input_type
16, // [16:16] is the sub-list for extension type_name
16, // [16:16] is the sub-list for extension extendee
0, // [0:16] is the sub-list for field type_name
}
func init() { file_message_proto_init() }
@@ -1095,6 +1221,7 @@ func file_message_proto_init() {
(*Command_ExecuteQueryRequest)(nil),
(*Command_LoadChunkRequest)(nil),
(*Command_StepdownRequest)(nil),
(*Command_HighwaterMarkUpdateRequest)(nil),
}
type x struct{}
out := protoimpl.TypeBuilder{
@@ -1102,7 +1229,7 @@ func file_message_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: unsafe.Slice(unsafe.StringData(file_message_proto_rawDesc), len(file_message_proto_rawDesc)),
NumEnums: 1,
NumMessages: 13,
NumMessages: 15,
NumExtensions: 0,
NumServices: 0,
},

View File

@@ -31,6 +31,7 @@ message Command {
COMMAND_TYPE_LOAD_CHUNK = 10;
COMMAND_TYPE_BACKUP_STREAM = 11;
COMMAND_TYPE_STEPDOWN = 12;
COMMAND_TYPE_HIGHWATER_MARK_UPDATE = 13;
}
Type type = 1;
@@ -45,6 +46,7 @@ message Command {
command.ExecuteQueryRequest execute_query_request = 10;
command.LoadChunkRequest load_chunk_request = 11;
command.StepdownRequest stepdown_request = 12;
HighwaterMarkUpdateRequest highwater_mark_update_request = 13;
}
Credentials credentials = 4;
@@ -97,3 +99,12 @@ message CommandJoinResponse {
message CommandStepdownResponse {
string error = 1;
}
message HighwaterMarkUpdateRequest {
string node_id = 1;
uint64 highwater_mark = 2;
}
message HighwaterMarkUpdateResponse {
string error = 1;
}

View File

@@ -24,17 +24,19 @@ import (
var stats *expvar.Map
const (
numGetNodeAPIRequest = "num_get_node_api_req"
numGetNodeAPIResponse = "num_get_node_api_resp"
numExecuteRequest = "num_execute_req"
numQueryRequest = "num_query_req"
numRequestRequest = "num_request_req"
numBackupRequest = "num_backup_req"
numLoadRequest = "num_load_req"
numRemoveNodeRequest = "num_remove_node_req"
numNotifyRequest = "num_notify_req"
numJoinRequest = "num_join_req"
numStepdownRequest = "num_stepdown_req"
numGetNodeAPIRequest = "num_get_node_api_req"
numGetNodeAPIResponse = "num_get_node_api_resp"
numExecuteRequest = "num_execute_req"
numQueryRequest = "num_query_req"
numRequestRequest = "num_request_req"
numBackupRequest = "num_backup_req"
numLoadRequest = "num_load_req"
numRemoveNodeRequest = "num_remove_node_req"
numNotifyRequest = "num_notify_req"
numJoinRequest = "num_join_req"
numStepdownRequest = "num_stepdown_req"
numBroadcastHWMRequest = "num_broadcast_hwm_req"
numHWMUpdateDropped = "num_hwm_update_dropped"
numClientRetries = "num_client_retries"
numGetNodeAPIRequestRetries = "num_get_node_api_req_retries"
@@ -71,6 +73,8 @@ func init() {
stats.Add(numNotifyRequest, 0)
stats.Add(numJoinRequest, 0)
stats.Add(numStepdownRequest, 0)
stats.Add(numBroadcastHWMRequest, 0)
stats.Add(numHWMUpdateDropped, 0)
stats.Add(numClientRetries, 0)
stats.Add(numGetNodeAPIRequestRetries, 0)
stats.Add(numClientLoadRetries, 0)
@@ -151,6 +155,9 @@ type Service struct {
apiAddr string // host:port this node serves the HTTP API.
version string // Version of software this node is running.
hwmMu sync.RWMutex
hwmUpdateC chan<- uint64 // Channel for HWM updates
logger *log.Logger
}
@@ -179,6 +186,13 @@ func (s *Service) Close() error {
return nil
}
// RegisterHWMUpdate registers a channel to receive highwater mark update requests.
func (s *Service) RegisterHWMUpdate(c chan<- uint64) {
s.hwmMu.Lock()
defer s.hwmMu.Unlock()
s.hwmUpdateC = c
}
// Addr returns the address the service is listening on.
func (s *Service) Addr() string {
return s.addr.String()
@@ -557,6 +571,30 @@ func (s *Service) handleConn(conn net.Conn) {
if err := marshalAndWrite(conn, resp); err != nil {
return
}
case proto.Command_COMMAND_TYPE_HIGHWATER_MARK_UPDATE:
stats.Add(numBroadcastHWMRequest, 1)
resp := &proto.HighwaterMarkUpdateResponse{}
br := c.GetHighwaterMarkUpdateRequest()
if br == nil {
resp.Error = "HighwaterMarkUpdateRequest is nil"
} else {
// Send to registered channel if available
s.hwmMu.RLock()
if s.hwmUpdateC != nil {
select {
case s.hwmUpdateC <- br.HighwaterMark:
default:
// Channel is full, don't block
stats.Add(numHWMUpdateDropped, 1)
}
}
s.hwmMu.RUnlock()
}
if err := marshalAndWrite(conn, resp); err != nil {
return
}
}
}
}

View File

@@ -424,6 +424,91 @@ func Test_NewServiceJoin(t *testing.T) {
}
}
func Test_ServiceHandleHighwaterMarkUpdate(t *testing.T) {
ml := mustNewMockTransport()
mgr := mustNewMockManager()
s := New(ml, mustNewMockDatabase(), mgr, mustNewMockCredentialStore())
if s == nil {
t.Fatalf("failed to create cluster service")
}
if err := s.Open(); err != nil {
t.Fatalf("failed to open cluster service")
}
defer s.Close()
// Create a client and send highwater mark update
c := NewClient(ml, 30*time.Second)
c.SetLocal("test-node", nil)
// Use the client to send a highwater mark update
responses, err := c.BroadcastHWM(987654, 0, 5*time.Second, s.Addr())
if err != nil {
t.Fatalf("failed to broadcast highwater mark update: %s", err)
}
// Check that we got a response for the service address
resp, ok := responses[s.Addr()]
if !ok {
t.Fatalf("expected response for address %s", s.Addr())
}
// Check response has no error
if resp.Error != "" {
t.Fatalf("expected no error, got: %s", resp.Error)
}
}
func Test_ServiceRegisterHWMUpdate(t *testing.T) {
ml := mustNewMockTransport()
mgr := mustNewMockManager()
s := New(ml, mustNewMockDatabase(), mgr, mustNewMockCredentialStore())
if s == nil {
t.Fatalf("failed to create cluster service")
}
if err := s.Open(); err != nil {
t.Fatalf("failed to open cluster service")
}
defer s.Close()
// Create a channel to receive highwater mark updates
hwmCh := make(chan uint64, 1)
s.RegisterHWMUpdate(hwmCh)
// Create a client and send highwater mark update
c := NewClient(ml, 30*time.Second)
c.SetLocal("test-node", nil)
// Use the client to send a highwater mark update
testHWM := uint64(123456)
responses, err := c.BroadcastHWM(testHWM, 0, 5*time.Second, s.Addr())
if err != nil {
t.Fatalf("failed to broadcast highwater mark update: %s", err)
}
// Check that we got a response for the service address
resp, ok := responses[s.Addr()]
if !ok {
t.Fatalf("expected response for address %s", s.Addr())
}
// Check response has no error
if resp.Error != "" {
t.Fatalf("expected no error, got: %s", resp.Error)
}
// Check that we received the update on the channel
select {
case hwmUpdate := <-hwmCh:
if hwmUpdate != testHWM {
t.Fatalf("expected highwater_mark to be %d, got: %d", testHWM, hwmUpdate)
}
case <-time.After(5 * time.Second):
t.Fatalf("timeout waiting for highwater mark update on channel")
}
}
type mockTransport struct {
tn net.Listener
remoteEncrypted bool