Support setting key-value pairs via Raft

This commit is contained in:
Philip O'Toole
2025-06-11 22:24:22 -04:00
committed by GitHub
parent 038fdaabb2
commit def0ff377c
8 changed files with 183 additions and 41 deletions

View File

@@ -2,6 +2,7 @@
### Implementation changes and bug fixes
- [PR #2099](https://github.com/rqlite/rqlite/pull/2099): Add basic CDC streamer to database module.
- [PR #2101](https://github.com/rqlite/rqlite/pull/2101): Add CDC support to Store component.
- [PR #2105](https://github.com/rqlite/rqlite/pull/2105): Support setting key-value pairs via Raft consensus.
## v8.37.4 (Jun 7th 2025)
### Implementation changes and bug fixes

View File

@@ -172,6 +172,20 @@ func UnmarshalLoadChunkRequest(b []byte, lr *proto.LoadChunkRequest) error {
return pb.Unmarshal(b, lr)
}
// MarshalSetKeyRequest marshals a SetKeyRequest command
func MarshalSetKeyRequest(skr *proto.SetKeyRequest) ([]byte, error) {
b, err := pb.Marshal(skr)
if err != nil {
return nil, err
}
return b, nil
}
// UnmarshalSetKeyRequest unmarshals a SetKeyRequest command
func UnmarshalSetKeyRequest(b []byte, skr *proto.SetKeyRequest) error {
return pb.Unmarshal(b, skr)
}
// UnmarshalSubCommand unmarshals a sub command m. It assumes that
// m is the correct type.
func UnmarshalSubCommand(c *proto.Command, m pb.Message) error {

View File

@@ -136,6 +136,7 @@ const (
Command_COMMAND_TYPE_JOIN Command_Type = 5
Command_COMMAND_TYPE_EXECUTE_QUERY Command_Type = 6
Command_COMMAND_TYPE_LOAD_CHUNK Command_Type = 7
Command_COMMAND_TYPE_SET_KEY Command_Type = 8
)
// Enum value maps for Command_Type.
@@ -149,6 +150,7 @@ var (
5: "COMMAND_TYPE_JOIN",
6: "COMMAND_TYPE_EXECUTE_QUERY",
7: "COMMAND_TYPE_LOAD_CHUNK",
8: "COMMAND_TYPE_SET_KEY",
}
Command_Type_value = map[string]int32{
"COMMAND_TYPE_UNKNOWN": 0,
@@ -159,6 +161,7 @@ var (
"COMMAND_TYPE_JOIN": 5,
"COMMAND_TYPE_EXECUTE_QUERY": 6,
"COMMAND_TYPE_LOAD_CHUNK": 7,
"COMMAND_TYPE_SET_KEY": 8,
}
)
@@ -186,7 +189,7 @@ func (x Command_Type) Number() protoreflect.EnumNumber {
// Deprecated: Use Command_Type.Descriptor instead.
func (Command_Type) EnumDescriptor() ([]byte, []int) {
return file_command_proto_rawDescGZIP(), []int{17, 0}
return file_command_proto_rawDescGZIP(), []int{18, 0}
}
type CDCEvent_Operation int32
@@ -238,7 +241,7 @@ func (x CDCEvent_Operation) Number() protoreflect.EnumNumber {
// Deprecated: Use CDCEvent_Operation.Descriptor instead.
func (CDCEvent_Operation) EnumDescriptor() ([]byte, []int) {
return file_command_proto_rawDescGZIP(), []int{20, 0}
return file_command_proto_rawDescGZIP(), []int{21, 0}
}
type UpdateHookEvent_Operation int32
@@ -290,7 +293,7 @@ func (x UpdateHookEvent_Operation) Number() protoreflect.EnumNumber {
// Deprecated: Use UpdateHookEvent_Operation.Descriptor instead.
func (UpdateHookEvent_Operation) EnumDescriptor() ([]byte, []int) {
return file_command_proto_rawDescGZIP(), []int{22, 0}
return file_command_proto_rawDescGZIP(), []int{23, 0}
}
type Parameter struct {
@@ -1437,6 +1440,58 @@ func (x *Noop) GetId() string {
return ""
}
type SetKeyRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *SetKeyRequest) Reset() {
*x = SetKeyRequest{}
mi := &file_command_proto_msgTypes[17]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *SetKeyRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*SetKeyRequest) ProtoMessage() {}
func (x *SetKeyRequest) ProtoReflect() protoreflect.Message {
mi := &file_command_proto_msgTypes[17]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use SetKeyRequest.ProtoReflect.Descriptor instead.
func (*SetKeyRequest) Descriptor() ([]byte, []int) {
return file_command_proto_rawDescGZIP(), []int{17}
}
func (x *SetKeyRequest) GetKey() []byte {
if x != nil {
return x.Key
}
return nil
}
func (x *SetKeyRequest) GetValue() []byte {
if x != nil {
return x.Value
}
return nil
}
type Command struct {
state protoimpl.MessageState `protogen:"open.v1"`
Type Command_Type `protobuf:"varint,1,opt,name=type,proto3,enum=command.Command_Type" json:"type,omitempty"`
@@ -1448,7 +1503,7 @@ type Command struct {
func (x *Command) Reset() {
*x = Command{}
mi := &file_command_proto_msgTypes[17]
mi := &file_command_proto_msgTypes[18]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -1460,7 +1515,7 @@ func (x *Command) String() string {
func (*Command) ProtoMessage() {}
func (x *Command) ProtoReflect() protoreflect.Message {
mi := &file_command_proto_msgTypes[17]
mi := &file_command_proto_msgTypes[18]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -1473,7 +1528,7 @@ func (x *Command) ProtoReflect() protoreflect.Message {
// Deprecated: Use Command.ProtoReflect.Descriptor instead.
func (*Command) Descriptor() ([]byte, []int) {
return file_command_proto_rawDescGZIP(), []int{17}
return file_command_proto_rawDescGZIP(), []int{18}
}
func (x *Command) GetType() Command_Type {
@@ -1513,7 +1568,7 @@ type CDCValue struct {
func (x *CDCValue) Reset() {
*x = CDCValue{}
mi := &file_command_proto_msgTypes[18]
mi := &file_command_proto_msgTypes[19]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -1525,7 +1580,7 @@ func (x *CDCValue) String() string {
func (*CDCValue) ProtoMessage() {}
func (x *CDCValue) ProtoReflect() protoreflect.Message {
mi := &file_command_proto_msgTypes[18]
mi := &file_command_proto_msgTypes[19]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -1538,7 +1593,7 @@ func (x *CDCValue) ProtoReflect() protoreflect.Message {
// Deprecated: Use CDCValue.ProtoReflect.Descriptor instead.
func (*CDCValue) Descriptor() ([]byte, []int) {
return file_command_proto_rawDescGZIP(), []int{18}
return file_command_proto_rawDescGZIP(), []int{19}
}
func (x *CDCValue) GetValue() isCDCValue_Value {
@@ -1636,7 +1691,7 @@ type CDCRow struct {
func (x *CDCRow) Reset() {
*x = CDCRow{}
mi := &file_command_proto_msgTypes[19]
mi := &file_command_proto_msgTypes[20]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -1648,7 +1703,7 @@ func (x *CDCRow) String() string {
func (*CDCRow) ProtoMessage() {}
func (x *CDCRow) ProtoReflect() protoreflect.Message {
mi := &file_command_proto_msgTypes[19]
mi := &file_command_proto_msgTypes[20]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -1661,7 +1716,7 @@ func (x *CDCRow) ProtoReflect() protoreflect.Message {
// Deprecated: Use CDCRow.ProtoReflect.Descriptor instead.
func (*CDCRow) Descriptor() ([]byte, []int) {
return file_command_proto_rawDescGZIP(), []int{19}
return file_command_proto_rawDescGZIP(), []int{20}
}
func (x *CDCRow) GetValues() []*CDCValue {
@@ -1686,7 +1741,7 @@ type CDCEvent struct {
func (x *CDCEvent) Reset() {
*x = CDCEvent{}
mi := &file_command_proto_msgTypes[20]
mi := &file_command_proto_msgTypes[21]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -1698,7 +1753,7 @@ func (x *CDCEvent) String() string {
func (*CDCEvent) ProtoMessage() {}
func (x *CDCEvent) ProtoReflect() protoreflect.Message {
mi := &file_command_proto_msgTypes[20]
mi := &file_command_proto_msgTypes[21]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -1711,7 +1766,7 @@ func (x *CDCEvent) ProtoReflect() protoreflect.Message {
// Deprecated: Use CDCEvent.ProtoReflect.Descriptor instead.
func (*CDCEvent) Descriptor() ([]byte, []int) {
return file_command_proto_rawDescGZIP(), []int{20}
return file_command_proto_rawDescGZIP(), []int{21}
}
func (x *CDCEvent) GetError() string {
@@ -1773,7 +1828,7 @@ type CDCEvents struct {
func (x *CDCEvents) Reset() {
*x = CDCEvents{}
mi := &file_command_proto_msgTypes[21]
mi := &file_command_proto_msgTypes[22]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -1785,7 +1840,7 @@ func (x *CDCEvents) String() string {
func (*CDCEvents) ProtoMessage() {}
func (x *CDCEvents) ProtoReflect() protoreflect.Message {
mi := &file_command_proto_msgTypes[21]
mi := &file_command_proto_msgTypes[22]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -1798,7 +1853,7 @@ func (x *CDCEvents) ProtoReflect() protoreflect.Message {
// Deprecated: Use CDCEvents.ProtoReflect.Descriptor instead.
func (*CDCEvents) Descriptor() ([]byte, []int) {
return file_command_proto_rawDescGZIP(), []int{21}
return file_command_proto_rawDescGZIP(), []int{22}
}
func (x *CDCEvents) GetK() uint64 {
@@ -1827,7 +1882,7 @@ type UpdateHookEvent struct {
func (x *UpdateHookEvent) Reset() {
*x = UpdateHookEvent{}
mi := &file_command_proto_msgTypes[22]
mi := &file_command_proto_msgTypes[23]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -1839,7 +1894,7 @@ func (x *UpdateHookEvent) String() string {
func (*UpdateHookEvent) ProtoMessage() {}
func (x *UpdateHookEvent) ProtoReflect() protoreflect.Message {
mi := &file_command_proto_msgTypes[22]
mi := &file_command_proto_msgTypes[23]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -1852,7 +1907,7 @@ func (x *UpdateHookEvent) ProtoReflect() protoreflect.Message {
// Deprecated: Use UpdateHookEvent.ProtoReflect.Descriptor instead.
func (*UpdateHookEvent) Descriptor() ([]byte, []int) {
return file_command_proto_rawDescGZIP(), []int{22}
return file_command_proto_rawDescGZIP(), []int{23}
}
func (x *UpdateHookEvent) GetError() string {
@@ -1979,14 +2034,17 @@ const file_command_proto_rawDesc = "" +
"\x11RemoveNodeRequest\x12\x0e\n" +
"\x02id\x18\x01 \x01(\tR\x02id\"\x16\n" +
"\x04Noop\x12\x0e\n" +
"\x02id\x18\x01 \x01(\tR\x02id\"\xcc\x02\n" +
"\x02id\x18\x01 \x01(\tR\x02id\"7\n" +
"\rSetKeyRequest\x12\x10\n" +
"\x03key\x18\x01 \x01(\fR\x03key\x12\x14\n" +
"\x05value\x18\x02 \x01(\fR\x05value\"\xe6\x02\n" +
"\aCommand\x12)\n" +
"\x04type\x18\x01 \x01(\x0e2\x15.command.Command.TypeR\x04type\x12\x1f\n" +
"\vsub_command\x18\x02 \x01(\fR\n" +
"subCommand\x12\x1e\n" +
"\n" +
"compressed\x18\x03 \x01(\bR\n" +
"compressed\"\xd4\x01\n" +
"compressed\"\xee\x01\n" +
"\x04Type\x12\x18\n" +
"\x14COMMAND_TYPE_UNKNOWN\x10\x00\x12\x16\n" +
"\x12COMMAND_TYPE_QUERY\x10\x01\x12\x18\n" +
@@ -1995,7 +2053,8 @@ const file_command_proto_rawDesc = "" +
"\x11COMMAND_TYPE_LOAD\x10\x04\x12\x15\n" +
"\x11COMMAND_TYPE_JOIN\x10\x05\x12\x1e\n" +
"\x1aCOMMAND_TYPE_EXECUTE_QUERY\x10\x06\x12\x1b\n" +
"\x17COMMAND_TYPE_LOAD_CHUNK\x10\a\"c\n" +
"\x17COMMAND_TYPE_LOAD_CHUNK\x10\a\x12\x18\n" +
"\x14COMMAND_TYPE_SET_KEY\x10\b\"c\n" +
"\bCDCValue\x12\x0e\n" +
"\x01i\x18\x01 \x01(\x12H\x00R\x01i\x12\x0e\n" +
"\x01d\x18\x02 \x01(\x01H\x00R\x01d\x12\x0e\n" +
@@ -2053,7 +2112,7 @@ func file_command_proto_rawDescGZIP() []byte {
}
var file_command_proto_enumTypes = make([]protoimpl.EnumInfo, 5)
var file_command_proto_msgTypes = make([]protoimpl.MessageInfo, 23)
var file_command_proto_msgTypes = make([]protoimpl.MessageInfo, 24)
var file_command_proto_goTypes = []any{
(QueryRequest_Level)(0), // 0: command.QueryRequest.Level
(BackupRequest_Format)(0), // 1: command.BackupRequest.Format
@@ -2077,12 +2136,13 @@ var file_command_proto_goTypes = []any{
(*NotifyRequest)(nil), // 19: command.NotifyRequest
(*RemoveNodeRequest)(nil), // 20: command.RemoveNodeRequest
(*Noop)(nil), // 21: command.Noop
(*Command)(nil), // 22: command.Command
(*CDCValue)(nil), // 23: command.CDCValue
(*CDCRow)(nil), // 24: command.CDCRow
(*CDCEvent)(nil), // 25: command.CDCEvent
(*CDCEvents)(nil), // 26: command.CDCEvents
(*UpdateHookEvent)(nil), // 27: command.UpdateHookEvent
(*SetKeyRequest)(nil), // 22: command.SetKeyRequest
(*Command)(nil), // 23: command.Command
(*CDCValue)(nil), // 24: command.CDCValue
(*CDCRow)(nil), // 25: command.CDCRow
(*CDCEvent)(nil), // 26: command.CDCEvent
(*CDCEvents)(nil), // 27: command.CDCEvents
(*UpdateHookEvent)(nil), // 28: command.UpdateHookEvent
}
var file_command_proto_depIdxs = []int32{
5, // 0: command.Statement.parameters:type_name -> command.Parameter
@@ -2098,11 +2158,11 @@ var file_command_proto_depIdxs = []int32{
12, // 10: command.ExecuteQueryResponse.e:type_name -> command.ExecuteResult
1, // 11: command.BackupRequest.format:type_name -> command.BackupRequest.Format
2, // 12: command.Command.type:type_name -> command.Command.Type
23, // 13: command.CDCRow.values:type_name -> command.CDCValue
24, // 13: command.CDCRow.values:type_name -> command.CDCValue
3, // 14: command.CDCEvent.op:type_name -> command.CDCEvent.Operation
24, // 15: command.CDCEvent.old_row:type_name -> command.CDCRow
24, // 16: command.CDCEvent.new_row:type_name -> command.CDCRow
25, // 17: command.CDCEvents.events:type_name -> command.CDCEvent
25, // 15: command.CDCEvent.old_row:type_name -> command.CDCRow
25, // 16: command.CDCEvent.new_row:type_name -> command.CDCRow
26, // 17: command.CDCEvents.events:type_name -> command.CDCEvent
4, // 18: command.UpdateHookEvent.op:type_name -> command.UpdateHookEvent.Operation
19, // [19:19] is the sub-list for method output_type
19, // [19:19] is the sub-list for method input_type
@@ -2128,7 +2188,7 @@ func file_command_proto_init() {
(*ExecuteQueryResponse_E)(nil),
(*ExecuteQueryResponse_Error)(nil),
}
file_command_proto_msgTypes[18].OneofWrappers = []any{
file_command_proto_msgTypes[19].OneofWrappers = []any{
(*CDCValue_I)(nil),
(*CDCValue_D)(nil),
(*CDCValue_B)(nil),
@@ -2141,7 +2201,7 @@ func file_command_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: unsafe.Slice(unsafe.StringData(file_command_proto_rawDesc), len(file_command_proto_rawDesc)),
NumEnums: 5,
NumMessages: 23,
NumMessages: 24,
NumExtensions: 0,
NumServices: 0,
},

View File

@@ -126,6 +126,11 @@ message Noop {
string id = 1;
}
message SetKeyRequest {
bytes key = 1;
bytes value = 2;
}
message Command {
enum Type {
COMMAND_TYPE_UNKNOWN = 0;
@@ -136,6 +141,7 @@ message Command {
COMMAND_TYPE_JOIN = 5;
COMMAND_TYPE_EXECUTE_QUERY = 6;
COMMAND_TYPE_LOAD_CHUNK = 7;
COMMAND_TYPE_SET_KEY = 8;
}
Type type = 1;
bytes sub_command = 2;

View File

@@ -15,6 +15,11 @@ import (
// ExecuteQueryResponses is a slice of ExecuteQueryResponse, which detects mutations.
type ExecuteQueryResponses []*proto.ExecuteQueryResponse
type KeyValueSetter interface {
// Set sets the value for the given key.
Set(key []byte, value []byte) error
}
// Mutation returns true if any of the responses mutated the database.
func (e ExecuteQueryResponses) Mutation() bool {
if len(e) == 0 {
@@ -41,8 +46,9 @@ func NewCommandProcessor(logger *log.Logger, dm *chunking.DechunkerManager) *Com
decMgmr: dm}
}
// Process processes the given command against the given database.
func (c *CommandProcessor) Process(data []byte, db *sql.SwappableDB) (*proto.Command, bool, any) {
// Process processes the given command against the given database or key-value store. If
// the database is actually changed, the third return value will be true, otherwise false.
func (c *CommandProcessor) Process(data []byte, db *sql.SwappableDB, kv KeyValueSetter) (*proto.Command, bool, any) {
cmd := &proto.Command{}
if err := command.Unmarshal(data, cmd); err != nil {
panic(fmt.Sprintf("failed to unmarshal cluster command: %s", err.Error()))
@@ -140,6 +146,15 @@ func (c *CommandProcessor) Process(data []byte, db *sql.SwappableDB) (*proto.Com
return cmd, true, &fsmGenericResponse{}
case proto.Command_COMMAND_TYPE_NOOP:
return cmd, false, &fsmGenericResponse{}
case proto.Command_COMMAND_TYPE_SET_KEY:
var skr proto.SetKeyRequest
if err := command.UnmarshalSetKeyRequest(cmd.SubCommand, &skr); err != nil {
panic(fmt.Sprintf("failed to unmarshal set-key subcommand: %s", err.Error()))
}
if err := kv.Set(skr.Key, skr.Value); err != nil {
return cmd, false, &fsmGenericResponse{error: fmt.Errorf("failed to set key: %s", err)}
}
return cmd, false, &fsmGenericResponse{}
default:
return cmd, false, &fsmGenericResponse{error: fmt.Errorf("unhandled command: %v", cmd.Type)}
}

View File

@@ -3,6 +3,7 @@ package store
import (
"testing"
"github.com/rqlite/rqlite/v8/command"
"github.com/rqlite/rqlite/v8/command/proto"
)
@@ -66,3 +67,48 @@ func Test_ExecuteQueryResponsesMutation_Check(t *testing.T) {
t.Fatalf("expected no mutations")
}
}
func Test_CommandProcessor_SetKey(t *testing.T) {
keyReq := &proto.SetKeyRequest{
Key: []byte("foo"),
Value: []byte("bar"),
}
// Marshal the SetKeyRequest into bytes
data, err := command.MarshalSetKeyRequest(keyReq)
if err != nil {
t.Fatalf("failed to marshal SetKeyRequest: %v", err)
}
req := &proto.Command{
Type: proto.Command_COMMAND_TYPE_SET_KEY,
SubCommand: data,
}
m, err := command.Marshal(req)
if err != nil {
t.Fatalf("failed to marshal command: %v", err)
}
kv := &mockKeyValueSetter{}
processor := NewCommandProcessor(nil, nil)
_, _, r := processor.Process(m, nil, kv)
if resp, ok := r.(*fsmGenericResponse); !ok {
t.Fatalf("expected fsmGenericResponse, got %T", r)
} else if resp.error != nil {
t.Fatalf("expected no error, got %v", resp.error)
}
if string(kv.key) != "foo" || string(kv.value) != "bar" {
t.Fatalf("expected key 'foo' and value 'bar', got key '%s' and value '%s'", kv.key, kv.value)
}
}
type mockKeyValueSetter struct {
key []byte
value []byte
}
func (m *mockKeyValueSetter) Set(key []byte, value []byte) error {
m.key = key
m.value = value
return nil
}

View File

@@ -198,7 +198,7 @@ func RecoverNode(dataDir string, extensions []string, logger *log.Logger, logs r
return fmt.Errorf("failed to get log at index %d: %v", index, err)
}
if entry.Type == raft.LogCommand {
cmdProc.Process(entry.Data, db)
cmdProc.Process(entry.Data, db, stable)
}
lastIndex = entry.Index
lastTerm = entry.Term

View File

@@ -2011,7 +2011,7 @@ func (s *Store) fsmApply(l *raft.Log) (e any) {
if s.cdcStreamer != nil {
s.cdcStreamer.Reset(l.Index)
}
return s.cmdProc.Process(l.Data, s.db)
return s.cmdProc.Process(l.Data, s.db, s.boltStore)
}()
if mutated {