From 4626585aca1ac8cdd80cf51a72c0e159abf9f847 Mon Sep 17 00:00:00 2001 From: Copilot <198982749+Copilot@users.noreply.github.com> Date: Tue, 19 Aug 2025 13:55:41 -0400 Subject: [PATCH] Implement HighwaterMarkUpdate broadcast --- CHANGELOG.md | 1 + cluster/client.go | 109 ++++++++++++++ cluster/client_test.go | 150 +++++++++++++++++++ cluster/proto/message.pb.go | 285 ++++++++++++++++++++++++++---------- cluster/proto/message.proto | 11 ++ cluster/service.go | 60 ++++++-- cluster/service_test.go | 85 +++++++++++ 7 files changed, 611 insertions(+), 90 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6bce60a9..1673af2d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ - [PR #2220](https://github.com/rqlite/rqlite/pull/2220): Allow `AppendEntriesRequest` to carry application-specific messages. - [PR #2221](https://github.com/rqlite/rqlite/pull/2221): Improve the CDC HTTP POST event envelope. - [PR #2226](https://github.com/rqlite/rqlite/pull/2226), [PR #2228](https://github.com/rqlite/rqlite/pull/2228): Initial integration between Store and CDC Service. +- [PR #2231](https://github.com/rqlite/rqlite/pull/2231): Implement cluster-wide broadcast of CDC HWM, fixes issue [#2230](https://github.com/rqlite/rqlite/issues/2230). ## v8.43.3 (August 14th 2025) ### Implementation changes and bug fixes diff --git a/cluster/client.go b/cluster/client.go index ad90d146..513e71ec 100644 --- a/cluster/client.go +++ b/cluster/client.go @@ -522,6 +522,115 @@ func (c *Client) Join(jr *command.JoinRequest, nodeAddr string, creds *proto.Cre } } +// BroadcastHWM performs a broadcast to all specified nodes. +func (c *Client) BroadcastHWM(hwm uint64, retries int, timeout time.Duration, nodeAddr ...string) (map[string]*proto.HighwaterMarkUpdateResponse, error) { + if len(nodeAddr) == 0 { + return map[string]*proto.HighwaterMarkUpdateResponse{}, nil + } + + // Get local node address for the broadcast request + c.localMu.RLock() + localAddr := c.localNodeAddr + c.localMu.RUnlock() + + // Create the broadcast request + br := &proto.HighwaterMarkUpdateRequest{ + NodeId: localAddr, + HighwaterMark: hwm, + } + + // Channel to collect results + type result struct { + resp *proto.HighwaterMarkUpdateResponse + addr string + err error + } + + resultChan := make(chan result, len(nodeAddr)) + + // Launch goroutines for parallel requests + for _, addr := range nodeAddr { + go func(nodeAddress string) { + // Create the command + command := &proto.Command{ + Type: proto.Command_COMMAND_TYPE_HIGHWATER_MARK_UPDATE, + Request: &proto.Command_HighwaterMarkUpdateRequest{ + HighwaterMarkUpdateRequest: br, + }, + } + + // Attempt with retries + var lastErr error + for attempt := 0; attempt <= retries; attempt++ { + conn, err := c.dial(nodeAddress) + if err != nil { + lastErr = err + continue + } + + // Write command + if err := writeCommand(conn, command, timeout); err != nil { + conn.Close() + handleConnError(conn) + lastErr = err + continue + } + + // Read response + p, err := readResponse(conn, timeout) + conn.Close() + if err != nil { + handleConnError(conn) + lastErr = err + continue + } + + // Parse response + resp := &proto.HighwaterMarkUpdateResponse{} + if err := pb.Unmarshal(p, resp); err != nil { + lastErr = err + continue + } + + // Success + resultChan <- result{resp: resp, addr: nodeAddress, err: nil} + return + } + + // All retries failed + resultChan <- result{resp: nil, addr: nodeAddress, err: lastErr} + }(addr) + } + + // Collect results with timeout + responses := make(map[string]*proto.HighwaterMarkUpdateResponse) + collected := 0 + + timeoutChan := time.After(timeout) + + for collected < len(nodeAddr) { + select { + case res := <-resultChan: + if res.err != nil { + responses[res.addr] = &proto.HighwaterMarkUpdateResponse{Error: res.err.Error()} + } else { + responses[res.addr] = res.resp + } + collected++ + case <-timeoutChan: + // Timeout reached, fill remaining responses with timeout errors + for _, addr := range nodeAddr { + if _, exists := responses[addr]; !exists { + responses[addr] = &proto.HighwaterMarkUpdateResponse{Error: "timeout"} + } + } + collected = len(nodeAddr) + } + } + + return responses, nil +} + // Stats returns stats on the Client instance func (c *Client) Stats() (map[string]any, error) { c.poolMu.RLock() diff --git a/cluster/client_test.go b/cluster/client_test.go index 50ec31f2..06213579 100644 --- a/cluster/client_test.go +++ b/cluster/client_test.go @@ -368,3 +368,153 @@ func (s *simpleDialer) Dial(address string, timeout time.Duration) (net.Conn, er } return conn, nil } + +func Test_ClientBroadcast(t *testing.T) { + srv := servicetest.NewService() + srv.Handler = func(conn net.Conn) { + var p []byte + var err error + c := readCommand(conn) + if c == nil { + // Error on connection, so give up, as normal + // test exit can cause that too. + return + } + if c.Type != proto.Command_COMMAND_TYPE_HIGHWATER_MARK_UPDATE { + t.Fatalf("unexpected command type: %d", c.Type) + } + br := c.GetHighwaterMarkUpdateRequest() + if br == nil { + t.Fatal("expected highwater mark update request, got nil") + } + if br.NodeId != "node1" { + t.Fatalf("unexpected node_id, got %s", br.NodeId) + } + if br.HighwaterMark != 12345 { + t.Fatalf("unexpected highwater_mark, got %d", br.HighwaterMark) + } + + p, err = pb.Marshal(&proto.HighwaterMarkUpdateResponse{}) + if err != nil { + conn.Close() + } + writeBytesWithLength(conn, p) + } + srv.Start() + defer srv.Close() + + c := NewClient(&simpleDialer{}, 0) + c.SetLocal("node1", nil) // Set local node address to match test expectation + responses, err := c.BroadcastHWM(12345, 0, time.Second, srv.Addr()) + if err != nil { + t.Fatal(err) + } + if len(responses) != 1 { + t.Fatalf("expected 1 response, got %d", len(responses)) + } + resp, exists := responses[srv.Addr()] + if !exists { + t.Fatalf("response for %s not found", srv.Addr()) + } + if resp.Error != "" { + t.Fatalf("unexpected error in response: %s", resp.Error) + } +} + +func Test_ClientBroadcast_MultipleNodes(t *testing.T) { + // Create multiple test services + srv1 := servicetest.NewService() + srv2 := servicetest.NewService() + + handler := func(conn net.Conn) { + var p []byte + var err error + c := readCommand(conn) + if c == nil { + return + } + if c.Type != proto.Command_COMMAND_TYPE_HIGHWATER_MARK_UPDATE { + t.Fatalf("unexpected command type: %d", c.Type) + } + + p, err = pb.Marshal(&proto.HighwaterMarkUpdateResponse{}) + if err != nil { + conn.Close() + } + writeBytesWithLength(conn, p) + } + + srv1.Handler = handler + srv2.Handler = handler + + srv1.Start() + srv2.Start() + defer srv1.Close() + defer srv2.Close() + + c := NewClient(&simpleDialer{}, 0) + c.SetLocal("test-node", nil) // Set local node address to match test expectation + responses, err := c.BroadcastHWM(999, 0, time.Second, srv1.Addr(), srv2.Addr()) + if err != nil { + t.Fatal(err) + } + if len(responses) != 2 { + t.Fatalf("expected 2 responses, got %d", len(responses)) + } + for addr, resp := range responses { + if resp.Error != "" { + t.Fatalf("unexpected error in response from %s: %s", addr, resp.Error) + } + } +} + +func Test_ClientBroadcast_EmptyNodeList(t *testing.T) { + c := NewClient(&simpleDialer{}, 0) + responses, err := c.BroadcastHWM(1, 0, time.Second) + if err != nil { + t.Fatal(err) + } + if len(responses) != 0 { + t.Fatalf("expected 0 responses, got %d", len(responses)) + } +} + +func Test_ClientBroadcast_WithError(t *testing.T) { + srv := servicetest.NewService() + srv.Handler = func(conn net.Conn) { + var p []byte + var err error + c := readCommand(conn) + if c == nil { + return + } + if c.Type != proto.Command_COMMAND_TYPE_HIGHWATER_MARK_UPDATE { + t.Fatalf("unexpected command type: %d", c.Type) + } + + p, err = pb.Marshal(&proto.HighwaterMarkUpdateResponse{Error: "test error"}) + if err != nil { + conn.Close() + } + writeBytesWithLength(conn, p) + } + srv.Start() + defer srv.Close() + + c := NewClient(&simpleDialer{}, 0) + c.SetLocal("node1", nil) // Set local node address to match test expectation + responses, err := c.BroadcastHWM(12345, 0, time.Second, srv.Addr()) + if err != nil { + t.Fatal(err) + } + if len(responses) != 1 { + t.Fatalf("expected 1 response, got %d", len(responses)) + } + resp, exists := responses[srv.Addr()] + if !exists { + t.Fatalf("response for %s not found", srv.Addr()) + } + if resp.Error != "test error" { + t.Fatalf("expected 'test error', got '%s'", resp.Error) + } +} diff --git a/cluster/proto/message.pb.go b/cluster/proto/message.pb.go index c67eb95e..39c50b9c 100644 --- a/cluster/proto/message.pb.go +++ b/cluster/proto/message.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.6 -// protoc v3.6.1 +// protoc-gen-go v1.36.7 +// protoc v3.21.12 // source: message.proto package proto @@ -25,19 +25,20 @@ const ( type Command_Type int32 const ( - Command_COMMAND_TYPE_UNKNOWN Command_Type = 0 - Command_COMMAND_TYPE_GET_NODE_META Command_Type = 1 - Command_COMMAND_TYPE_EXECUTE Command_Type = 2 - Command_COMMAND_TYPE_QUERY Command_Type = 3 - Command_COMMAND_TYPE_BACKUP Command_Type = 4 - Command_COMMAND_TYPE_LOAD Command_Type = 5 - Command_COMMAND_TYPE_REMOVE_NODE Command_Type = 6 - Command_COMMAND_TYPE_NOTIFY Command_Type = 7 - Command_COMMAND_TYPE_JOIN Command_Type = 8 - Command_COMMAND_TYPE_REQUEST Command_Type = 9 - Command_COMMAND_TYPE_LOAD_CHUNK Command_Type = 10 - Command_COMMAND_TYPE_BACKUP_STREAM Command_Type = 11 - Command_COMMAND_TYPE_STEPDOWN Command_Type = 12 + Command_COMMAND_TYPE_UNKNOWN Command_Type = 0 + Command_COMMAND_TYPE_GET_NODE_META Command_Type = 1 + Command_COMMAND_TYPE_EXECUTE Command_Type = 2 + Command_COMMAND_TYPE_QUERY Command_Type = 3 + Command_COMMAND_TYPE_BACKUP Command_Type = 4 + Command_COMMAND_TYPE_LOAD Command_Type = 5 + Command_COMMAND_TYPE_REMOVE_NODE Command_Type = 6 + Command_COMMAND_TYPE_NOTIFY Command_Type = 7 + Command_COMMAND_TYPE_JOIN Command_Type = 8 + Command_COMMAND_TYPE_REQUEST Command_Type = 9 + Command_COMMAND_TYPE_LOAD_CHUNK Command_Type = 10 + Command_COMMAND_TYPE_BACKUP_STREAM Command_Type = 11 + Command_COMMAND_TYPE_STEPDOWN Command_Type = 12 + Command_COMMAND_TYPE_HIGHWATER_MARK_UPDATE Command_Type = 13 ) // Enum value maps for Command_Type. @@ -56,21 +57,23 @@ var ( 10: "COMMAND_TYPE_LOAD_CHUNK", 11: "COMMAND_TYPE_BACKUP_STREAM", 12: "COMMAND_TYPE_STEPDOWN", + 13: "COMMAND_TYPE_HIGHWATER_MARK_UPDATE", } Command_Type_value = map[string]int32{ - "COMMAND_TYPE_UNKNOWN": 0, - "COMMAND_TYPE_GET_NODE_META": 1, - "COMMAND_TYPE_EXECUTE": 2, - "COMMAND_TYPE_QUERY": 3, - "COMMAND_TYPE_BACKUP": 4, - "COMMAND_TYPE_LOAD": 5, - "COMMAND_TYPE_REMOVE_NODE": 6, - "COMMAND_TYPE_NOTIFY": 7, - "COMMAND_TYPE_JOIN": 8, - "COMMAND_TYPE_REQUEST": 9, - "COMMAND_TYPE_LOAD_CHUNK": 10, - "COMMAND_TYPE_BACKUP_STREAM": 11, - "COMMAND_TYPE_STEPDOWN": 12, + "COMMAND_TYPE_UNKNOWN": 0, + "COMMAND_TYPE_GET_NODE_META": 1, + "COMMAND_TYPE_EXECUTE": 2, + "COMMAND_TYPE_QUERY": 3, + "COMMAND_TYPE_BACKUP": 4, + "COMMAND_TYPE_LOAD": 5, + "COMMAND_TYPE_REMOVE_NODE": 6, + "COMMAND_TYPE_NOTIFY": 7, + "COMMAND_TYPE_JOIN": 8, + "COMMAND_TYPE_REQUEST": 9, + "COMMAND_TYPE_LOAD_CHUNK": 10, + "COMMAND_TYPE_BACKUP_STREAM": 11, + "COMMAND_TYPE_STEPDOWN": 12, + "COMMAND_TYPE_HIGHWATER_MARK_UPDATE": 13, } ) @@ -228,6 +231,7 @@ type Command struct { // *Command_ExecuteQueryRequest // *Command_LoadChunkRequest // *Command_StepdownRequest + // *Command_HighwaterMarkUpdateRequest Request isCommand_Request `protobuf_oneof:"request"` Credentials *Credentials `protobuf:"bytes,4,opt,name=credentials,proto3" json:"credentials,omitempty"` unknownFields protoimpl.UnknownFields @@ -368,6 +372,15 @@ func (x *Command) GetStepdownRequest() *proto.StepdownRequest { return nil } +func (x *Command) GetHighwaterMarkUpdateRequest() *HighwaterMarkUpdateRequest { + if x != nil { + if x, ok := x.Request.(*Command_HighwaterMarkUpdateRequest); ok { + return x.HighwaterMarkUpdateRequest + } + } + return nil +} + func (x *Command) GetCredentials() *Credentials { if x != nil { return x.Credentials @@ -419,6 +432,10 @@ type Command_StepdownRequest struct { StepdownRequest *proto.StepdownRequest `protobuf:"bytes,12,opt,name=stepdown_request,json=stepdownRequest,proto3,oneof"` } +type Command_HighwaterMarkUpdateRequest struct { + HighwaterMarkUpdateRequest *HighwaterMarkUpdateRequest `protobuf:"bytes,13,opt,name=highwater_mark_update_request,json=highwaterMarkUpdateRequest,proto3,oneof"` +} + func (*Command_ExecuteRequest) isCommand_Request() {} func (*Command_QueryRequest) isCommand_Request() {} @@ -439,6 +456,8 @@ func (*Command_LoadChunkRequest) isCommand_Request() {} func (*Command_StepdownRequest) isCommand_Request() {} +func (*Command_HighwaterMarkUpdateRequest) isCommand_Request() {} + type CommandExecuteResponse struct { state protoimpl.MessageState `protogen:"open.v1"` Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"` @@ -943,6 +962,102 @@ func (x *CommandStepdownResponse) GetError() string { return "" } +type HighwaterMarkUpdateRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + NodeId string `protobuf:"bytes,1,opt,name=node_id,json=nodeId,proto3" json:"node_id,omitempty"` + HighwaterMark uint64 `protobuf:"varint,2,opt,name=highwater_mark,json=highwaterMark,proto3" json:"highwater_mark,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *HighwaterMarkUpdateRequest) Reset() { + *x = HighwaterMarkUpdateRequest{} + mi := &file_message_proto_msgTypes[13] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *HighwaterMarkUpdateRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*HighwaterMarkUpdateRequest) ProtoMessage() {} + +func (x *HighwaterMarkUpdateRequest) ProtoReflect() protoreflect.Message { + mi := &file_message_proto_msgTypes[13] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use HighwaterMarkUpdateRequest.ProtoReflect.Descriptor instead. +func (*HighwaterMarkUpdateRequest) Descriptor() ([]byte, []int) { + return file_message_proto_rawDescGZIP(), []int{13} +} + +func (x *HighwaterMarkUpdateRequest) GetNodeId() string { + if x != nil { + return x.NodeId + } + return "" +} + +func (x *HighwaterMarkUpdateRequest) GetHighwaterMark() uint64 { + if x != nil { + return x.HighwaterMark + } + return 0 +} + +type HighwaterMarkUpdateResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *HighwaterMarkUpdateResponse) Reset() { + *x = HighwaterMarkUpdateResponse{} + mi := &file_message_proto_msgTypes[14] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *HighwaterMarkUpdateResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*HighwaterMarkUpdateResponse) ProtoMessage() {} + +func (x *HighwaterMarkUpdateResponse) ProtoReflect() protoreflect.Message { + mi := &file_message_proto_msgTypes[14] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use HighwaterMarkUpdateResponse.ProtoReflect.Descriptor instead. +func (*HighwaterMarkUpdateResponse) Descriptor() ([]byte, []int) { + return file_message_proto_rawDescGZIP(), []int{14} +} + +func (x *HighwaterMarkUpdateResponse) GetError() string { + if x != nil { + return x.Error + } + return "" +} + var File_message_proto protoreflect.FileDescriptor const file_message_proto_rawDesc = "" + @@ -954,7 +1069,8 @@ const file_message_proto_rawDesc = "" + "\bNodeMeta\x12\x10\n" + "\x03url\x18\x01 \x01(\tR\x03url\x12!\n" + "\fcommit_index\x18\x02 \x01(\x04R\vcommitIndex\x12\x18\n" + - "\aversion\x18\x03 \x01(\tR\aversion\"\x8a\t\n" + + "\aversion\x18\x03 \x01(\tR\aversion\"\x9c\n" + + "\n" + "\aCommand\x12)\n" + "\x04type\x18\x01 \x01(\x0e2\x15.cluster.Command.TypeR\x04type\x12B\n" + "\x0fexecute_request\x18\x02 \x01(\v2\x17.command.ExecuteRequestH\x00R\x0eexecuteRequest\x12<\n" + @@ -967,8 +1083,9 @@ const file_message_proto_rawDesc = "" + "\x15execute_query_request\x18\n" + " \x01(\v2\x1c.command.ExecuteQueryRequestH\x00R\x13executeQueryRequest\x12I\n" + "\x12load_chunk_request\x18\v \x01(\v2\x19.command.LoadChunkRequestH\x00R\x10loadChunkRequest\x12E\n" + - "\x10stepdown_request\x18\f \x01(\v2\x18.command.StepdownRequestH\x00R\x0fstepdownRequest\x126\n" + - "\vcredentials\x18\x04 \x01(\v2\x14.cluster.CredentialsR\vcredentials\"\xe2\x02\n" + + "\x10stepdown_request\x18\f \x01(\v2\x18.command.StepdownRequestH\x00R\x0fstepdownRequest\x12h\n" + + "\x1dhighwater_mark_update_request\x18\r \x01(\v2#.cluster.HighwaterMarkUpdateRequestH\x00R\x1ahighwaterMarkUpdateRequest\x126\n" + + "\vcredentials\x18\x04 \x01(\v2\x14.cluster.CredentialsR\vcredentials\"\x8a\x03\n" + "\x04Type\x12\x18\n" + "\x14COMMAND_TYPE_UNKNOWN\x10\x00\x12\x1e\n" + "\x1aCOMMAND_TYPE_GET_NODE_META\x10\x01\x12\x18\n" + @@ -983,7 +1100,8 @@ const file_message_proto_rawDesc = "" + "\x17COMMAND_TYPE_LOAD_CHUNK\x10\n" + "\x12\x1e\n" + "\x1aCOMMAND_TYPE_BACKUP_STREAM\x10\v\x12\x19\n" + - "\x15COMMAND_TYPE_STEPDOWN\x10\fB\t\n" + + "\x15COMMAND_TYPE_STEPDOWN\x10\f\x12&\n" + + "\"COMMAND_TYPE_HIGHWATER_MARK_UPDATE\x10\rB\t\n" + "\arequest\"\x87\x01\n" + "\x16CommandExecuteResponse\x12\x14\n" + "\x05error\x18\x01 \x01(\tR\x05error\x129\n" + @@ -1012,6 +1130,11 @@ const file_message_proto_rawDesc = "" + "\x05error\x18\x01 \x01(\tR\x05error\x12\x16\n" + "\x06leader\x18\x02 \x01(\tR\x06leader\"/\n" + "\x17CommandStepdownResponse\x12\x14\n" + + "\x05error\x18\x01 \x01(\tR\x05error\"\\\n" + + "\x1aHighwaterMarkUpdateRequest\x12\x17\n" + + "\anode_id\x18\x01 \x01(\tR\x06nodeId\x12%\n" + + "\x0ehighwater_mark\x18\x02 \x01(\x04R\rhighwaterMark\"3\n" + + "\x1bHighwaterMarkUpdateResponse\x12\x14\n" + "\x05error\x18\x01 \x01(\tR\x05errorB+Z)github.com/rqlite/rqlite/v8/cluster/protob\x06proto3" var ( @@ -1027,56 +1150,59 @@ func file_message_proto_rawDescGZIP() []byte { } var file_message_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_message_proto_msgTypes = make([]protoimpl.MessageInfo, 13) +var file_message_proto_msgTypes = make([]protoimpl.MessageInfo, 15) var file_message_proto_goTypes = []any{ - (Command_Type)(0), // 0: cluster.Command.Type - (*Credentials)(nil), // 1: cluster.Credentials - (*NodeMeta)(nil), // 2: cluster.NodeMeta - (*Command)(nil), // 3: cluster.Command - (*CommandExecuteResponse)(nil), // 4: cluster.CommandExecuteResponse - (*CommandQueryResponse)(nil), // 5: cluster.CommandQueryResponse - (*CommandRequestResponse)(nil), // 6: cluster.CommandRequestResponse - (*CommandBackupResponse)(nil), // 7: cluster.CommandBackupResponse - (*CommandLoadResponse)(nil), // 8: cluster.CommandLoadResponse - (*CommandLoadChunkResponse)(nil), // 9: cluster.CommandLoadChunkResponse - (*CommandRemoveNodeResponse)(nil), // 10: cluster.CommandRemoveNodeResponse - (*CommandNotifyResponse)(nil), // 11: cluster.CommandNotifyResponse - (*CommandJoinResponse)(nil), // 12: cluster.CommandJoinResponse - (*CommandStepdownResponse)(nil), // 13: cluster.CommandStepdownResponse - (*proto.ExecuteRequest)(nil), // 14: command.ExecuteRequest - (*proto.QueryRequest)(nil), // 15: command.QueryRequest - (*proto.BackupRequest)(nil), // 16: command.BackupRequest - (*proto.LoadRequest)(nil), // 17: command.LoadRequest - (*proto.RemoveNodeRequest)(nil), // 18: command.RemoveNodeRequest - (*proto.NotifyRequest)(nil), // 19: command.NotifyRequest - (*proto.JoinRequest)(nil), // 20: command.JoinRequest - (*proto.ExecuteQueryRequest)(nil), // 21: command.ExecuteQueryRequest - (*proto.LoadChunkRequest)(nil), // 22: command.LoadChunkRequest - (*proto.StepdownRequest)(nil), // 23: command.StepdownRequest - (*proto.ExecuteQueryResponse)(nil), // 24: command.ExecuteQueryResponse - (*proto.QueryRows)(nil), // 25: command.QueryRows + (Command_Type)(0), // 0: cluster.Command.Type + (*Credentials)(nil), // 1: cluster.Credentials + (*NodeMeta)(nil), // 2: cluster.NodeMeta + (*Command)(nil), // 3: cluster.Command + (*CommandExecuteResponse)(nil), // 4: cluster.CommandExecuteResponse + (*CommandQueryResponse)(nil), // 5: cluster.CommandQueryResponse + (*CommandRequestResponse)(nil), // 6: cluster.CommandRequestResponse + (*CommandBackupResponse)(nil), // 7: cluster.CommandBackupResponse + (*CommandLoadResponse)(nil), // 8: cluster.CommandLoadResponse + (*CommandLoadChunkResponse)(nil), // 9: cluster.CommandLoadChunkResponse + (*CommandRemoveNodeResponse)(nil), // 10: cluster.CommandRemoveNodeResponse + (*CommandNotifyResponse)(nil), // 11: cluster.CommandNotifyResponse + (*CommandJoinResponse)(nil), // 12: cluster.CommandJoinResponse + (*CommandStepdownResponse)(nil), // 13: cluster.CommandStepdownResponse + (*HighwaterMarkUpdateRequest)(nil), // 14: cluster.HighwaterMarkUpdateRequest + (*HighwaterMarkUpdateResponse)(nil), // 15: cluster.HighwaterMarkUpdateResponse + (*proto.ExecuteRequest)(nil), // 16: command.ExecuteRequest + (*proto.QueryRequest)(nil), // 17: command.QueryRequest + (*proto.BackupRequest)(nil), // 18: command.BackupRequest + (*proto.LoadRequest)(nil), // 19: command.LoadRequest + (*proto.RemoveNodeRequest)(nil), // 20: command.RemoveNodeRequest + (*proto.NotifyRequest)(nil), // 21: command.NotifyRequest + (*proto.JoinRequest)(nil), // 22: command.JoinRequest + (*proto.ExecuteQueryRequest)(nil), // 23: command.ExecuteQueryRequest + (*proto.LoadChunkRequest)(nil), // 24: command.LoadChunkRequest + (*proto.StepdownRequest)(nil), // 25: command.StepdownRequest + (*proto.ExecuteQueryResponse)(nil), // 26: command.ExecuteQueryResponse + (*proto.QueryRows)(nil), // 27: command.QueryRows } var file_message_proto_depIdxs = []int32{ 0, // 0: cluster.Command.type:type_name -> cluster.Command.Type - 14, // 1: cluster.Command.execute_request:type_name -> command.ExecuteRequest - 15, // 2: cluster.Command.query_request:type_name -> command.QueryRequest - 16, // 3: cluster.Command.backup_request:type_name -> command.BackupRequest - 17, // 4: cluster.Command.load_request:type_name -> command.LoadRequest - 18, // 5: cluster.Command.remove_node_request:type_name -> command.RemoveNodeRequest - 19, // 6: cluster.Command.notify_request:type_name -> command.NotifyRequest - 20, // 7: cluster.Command.join_request:type_name -> command.JoinRequest - 21, // 8: cluster.Command.execute_query_request:type_name -> command.ExecuteQueryRequest - 22, // 9: cluster.Command.load_chunk_request:type_name -> command.LoadChunkRequest - 23, // 10: cluster.Command.stepdown_request:type_name -> command.StepdownRequest - 1, // 11: cluster.Command.credentials:type_name -> cluster.Credentials - 24, // 12: cluster.CommandExecuteResponse.response:type_name -> command.ExecuteQueryResponse - 25, // 13: cluster.CommandQueryResponse.rows:type_name -> command.QueryRows - 24, // 14: cluster.CommandRequestResponse.response:type_name -> command.ExecuteQueryResponse - 15, // [15:15] is the sub-list for method output_type - 15, // [15:15] is the sub-list for method input_type - 15, // [15:15] is the sub-list for extension type_name - 15, // [15:15] is the sub-list for extension extendee - 0, // [0:15] is the sub-list for field type_name + 16, // 1: cluster.Command.execute_request:type_name -> command.ExecuteRequest + 17, // 2: cluster.Command.query_request:type_name -> command.QueryRequest + 18, // 3: cluster.Command.backup_request:type_name -> command.BackupRequest + 19, // 4: cluster.Command.load_request:type_name -> command.LoadRequest + 20, // 5: cluster.Command.remove_node_request:type_name -> command.RemoveNodeRequest + 21, // 6: cluster.Command.notify_request:type_name -> command.NotifyRequest + 22, // 7: cluster.Command.join_request:type_name -> command.JoinRequest + 23, // 8: cluster.Command.execute_query_request:type_name -> command.ExecuteQueryRequest + 24, // 9: cluster.Command.load_chunk_request:type_name -> command.LoadChunkRequest + 25, // 10: cluster.Command.stepdown_request:type_name -> command.StepdownRequest + 14, // 11: cluster.Command.highwater_mark_update_request:type_name -> cluster.HighwaterMarkUpdateRequest + 1, // 12: cluster.Command.credentials:type_name -> cluster.Credentials + 26, // 13: cluster.CommandExecuteResponse.response:type_name -> command.ExecuteQueryResponse + 27, // 14: cluster.CommandQueryResponse.rows:type_name -> command.QueryRows + 26, // 15: cluster.CommandRequestResponse.response:type_name -> command.ExecuteQueryResponse + 16, // [16:16] is the sub-list for method output_type + 16, // [16:16] is the sub-list for method input_type + 16, // [16:16] is the sub-list for extension type_name + 16, // [16:16] is the sub-list for extension extendee + 0, // [0:16] is the sub-list for field type_name } func init() { file_message_proto_init() } @@ -1095,6 +1221,7 @@ func file_message_proto_init() { (*Command_ExecuteQueryRequest)(nil), (*Command_LoadChunkRequest)(nil), (*Command_StepdownRequest)(nil), + (*Command_HighwaterMarkUpdateRequest)(nil), } type x struct{} out := protoimpl.TypeBuilder{ @@ -1102,7 +1229,7 @@ func file_message_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_message_proto_rawDesc), len(file_message_proto_rawDesc)), NumEnums: 1, - NumMessages: 13, + NumMessages: 15, NumExtensions: 0, NumServices: 0, }, diff --git a/cluster/proto/message.proto b/cluster/proto/message.proto index be2ad074..3afa508a 100644 --- a/cluster/proto/message.proto +++ b/cluster/proto/message.proto @@ -31,6 +31,7 @@ message Command { COMMAND_TYPE_LOAD_CHUNK = 10; COMMAND_TYPE_BACKUP_STREAM = 11; COMMAND_TYPE_STEPDOWN = 12; + COMMAND_TYPE_HIGHWATER_MARK_UPDATE = 13; } Type type = 1; @@ -45,6 +46,7 @@ message Command { command.ExecuteQueryRequest execute_query_request = 10; command.LoadChunkRequest load_chunk_request = 11; command.StepdownRequest stepdown_request = 12; + HighwaterMarkUpdateRequest highwater_mark_update_request = 13; } Credentials credentials = 4; @@ -97,3 +99,12 @@ message CommandJoinResponse { message CommandStepdownResponse { string error = 1; } + +message HighwaterMarkUpdateRequest { + string node_id = 1; + uint64 highwater_mark = 2; +} + +message HighwaterMarkUpdateResponse { + string error = 1; +} diff --git a/cluster/service.go b/cluster/service.go index a0eaf9ca..dd7315ac 100644 --- a/cluster/service.go +++ b/cluster/service.go @@ -24,17 +24,19 @@ import ( var stats *expvar.Map const ( - numGetNodeAPIRequest = "num_get_node_api_req" - numGetNodeAPIResponse = "num_get_node_api_resp" - numExecuteRequest = "num_execute_req" - numQueryRequest = "num_query_req" - numRequestRequest = "num_request_req" - numBackupRequest = "num_backup_req" - numLoadRequest = "num_load_req" - numRemoveNodeRequest = "num_remove_node_req" - numNotifyRequest = "num_notify_req" - numJoinRequest = "num_join_req" - numStepdownRequest = "num_stepdown_req" + numGetNodeAPIRequest = "num_get_node_api_req" + numGetNodeAPIResponse = "num_get_node_api_resp" + numExecuteRequest = "num_execute_req" + numQueryRequest = "num_query_req" + numRequestRequest = "num_request_req" + numBackupRequest = "num_backup_req" + numLoadRequest = "num_load_req" + numRemoveNodeRequest = "num_remove_node_req" + numNotifyRequest = "num_notify_req" + numJoinRequest = "num_join_req" + numStepdownRequest = "num_stepdown_req" + numBroadcastHWMRequest = "num_broadcast_hwm_req" + numHWMUpdateDropped = "num_hwm_update_dropped" numClientRetries = "num_client_retries" numGetNodeAPIRequestRetries = "num_get_node_api_req_retries" @@ -71,6 +73,8 @@ func init() { stats.Add(numNotifyRequest, 0) stats.Add(numJoinRequest, 0) stats.Add(numStepdownRequest, 0) + stats.Add(numBroadcastHWMRequest, 0) + stats.Add(numHWMUpdateDropped, 0) stats.Add(numClientRetries, 0) stats.Add(numGetNodeAPIRequestRetries, 0) stats.Add(numClientLoadRetries, 0) @@ -151,6 +155,9 @@ type Service struct { apiAddr string // host:port this node serves the HTTP API. version string // Version of software this node is running. + hwmMu sync.RWMutex + hwmUpdateC chan<- uint64 // Channel for HWM updates + logger *log.Logger } @@ -179,6 +186,13 @@ func (s *Service) Close() error { return nil } +// RegisterHWMUpdate registers a channel to receive highwater mark update requests. +func (s *Service) RegisterHWMUpdate(c chan<- uint64) { + s.hwmMu.Lock() + defer s.hwmMu.Unlock() + s.hwmUpdateC = c +} + // Addr returns the address the service is listening on. func (s *Service) Addr() string { return s.addr.String() @@ -557,6 +571,30 @@ func (s *Service) handleConn(conn net.Conn) { if err := marshalAndWrite(conn, resp); err != nil { return } + + case proto.Command_COMMAND_TYPE_HIGHWATER_MARK_UPDATE: + stats.Add(numBroadcastHWMRequest, 1) + resp := &proto.HighwaterMarkUpdateResponse{} + + br := c.GetHighwaterMarkUpdateRequest() + if br == nil { + resp.Error = "HighwaterMarkUpdateRequest is nil" + } else { + // Send to registered channel if available + s.hwmMu.RLock() + if s.hwmUpdateC != nil { + select { + case s.hwmUpdateC <- br.HighwaterMark: + default: + // Channel is full, don't block + stats.Add(numHWMUpdateDropped, 1) + } + } + s.hwmMu.RUnlock() + } + if err := marshalAndWrite(conn, resp); err != nil { + return + } } } } diff --git a/cluster/service_test.go b/cluster/service_test.go index a1c6fc20..8a83e9d8 100644 --- a/cluster/service_test.go +++ b/cluster/service_test.go @@ -424,6 +424,91 @@ func Test_NewServiceJoin(t *testing.T) { } } +func Test_ServiceHandleHighwaterMarkUpdate(t *testing.T) { + ml := mustNewMockTransport() + mgr := mustNewMockManager() + s := New(ml, mustNewMockDatabase(), mgr, mustNewMockCredentialStore()) + if s == nil { + t.Fatalf("failed to create cluster service") + } + + if err := s.Open(); err != nil { + t.Fatalf("failed to open cluster service") + } + defer s.Close() + + // Create a client and send highwater mark update + c := NewClient(ml, 30*time.Second) + c.SetLocal("test-node", nil) + + // Use the client to send a highwater mark update + responses, err := c.BroadcastHWM(987654, 0, 5*time.Second, s.Addr()) + if err != nil { + t.Fatalf("failed to broadcast highwater mark update: %s", err) + } + + // Check that we got a response for the service address + resp, ok := responses[s.Addr()] + if !ok { + t.Fatalf("expected response for address %s", s.Addr()) + } + + // Check response has no error + if resp.Error != "" { + t.Fatalf("expected no error, got: %s", resp.Error) + } +} + +func Test_ServiceRegisterHWMUpdate(t *testing.T) { + ml := mustNewMockTransport() + mgr := mustNewMockManager() + s := New(ml, mustNewMockDatabase(), mgr, mustNewMockCredentialStore()) + if s == nil { + t.Fatalf("failed to create cluster service") + } + + if err := s.Open(); err != nil { + t.Fatalf("failed to open cluster service") + } + defer s.Close() + + // Create a channel to receive highwater mark updates + hwmCh := make(chan uint64, 1) + s.RegisterHWMUpdate(hwmCh) + + // Create a client and send highwater mark update + c := NewClient(ml, 30*time.Second) + c.SetLocal("test-node", nil) + + // Use the client to send a highwater mark update + testHWM := uint64(123456) + responses, err := c.BroadcastHWM(testHWM, 0, 5*time.Second, s.Addr()) + if err != nil { + t.Fatalf("failed to broadcast highwater mark update: %s", err) + } + + // Check that we got a response for the service address + resp, ok := responses[s.Addr()] + if !ok { + t.Fatalf("expected response for address %s", s.Addr()) + } + + // Check response has no error + if resp.Error != "" { + t.Fatalf("expected no error, got: %s", resp.Error) + } + + // Check that we received the update on the channel + select { + case hwmUpdate := <-hwmCh: + if hwmUpdate != testHWM { + t.Fatalf("expected highwater_mark to be %d, got: %d", testHWM, hwmUpdate) + } + case <-time.After(5 * time.Second): + t.Fatalf("timeout waiting for highwater mark update on channel") + } +} + type mockTransport struct { tn net.Listener remoteEncrypted bool