Allow the stepdown HTTP API call to specify target node for new leader

This commit is contained in:
Copilot
2025-07-21 17:00:07 -04:00
committed by GitHub
parent dfd1eb8979
commit 42179e7358
8 changed files with 334 additions and 24 deletions

View File

@@ -125,8 +125,9 @@ type Manager interface {
Join(n *command.JoinRequest) error
// Stepdown forces this node to relinquish leadership to another node in
// the cluster.
Stepdown(wait bool) error
// the cluster. If id is non-empty, leadership will be transferred to the
// node with the given ID.
Stepdown(wait bool, id string) error
}
// CredentialStore is the interface credential stores must support.
@@ -549,7 +550,7 @@ func (s *Service) handleConn(conn net.Conn) {
} else if !s.checkCommandPerm(c, auth.PermLeaderOps) {
resp.Error = "unauthorized"
} else {
if err := s.mgr.Stepdown(sr.Wait); err != nil {
if err := s.mgr.Stepdown(sr.Wait, sr.Id); err != nil {
resp.Error = err.Error()
}
}

View File

@@ -528,7 +528,7 @@ type MockManager struct {
notifyFn func(n *command.NotifyRequest) error
joinFn func(j *command.JoinRequest) error
leaderAddrFn func() (string, error)
stepdownFn func(wait bool) error
stepdownFn func(wait bool, id string) error
commitIndex uint64
}
@@ -553,11 +553,11 @@ func (m *MockManager) Join(j *command.JoinRequest) error {
return m.joinFn(j)
}
func (m *MockManager) Stepdown(wait bool) error {
func (m *MockManager) Stepdown(wait bool, id string) error {
if m.stepdownFn == nil {
return nil
}
return m.stepdownFn(wait)
return m.stepdownFn(wait, id)
}
func (m *MockManager) LeaderAddr() (string, error) {

View File

@@ -238,7 +238,7 @@ func main() {
log.Printf("stepping down as Leader before shutdown")
}
// Perform a stepdown, ignore any errors.
str.Stepdown(true)
str.Stepdown(true, "")
}
muxLn.Close()
defer mux.Close()

View File

@@ -99,8 +99,9 @@ type Store interface {
ReadFrom(r io.Reader) (int64, error)
// Stepdown forces this node to relinquish leadership to another node in
// the cluster.
Stepdown(wait bool) error
// the cluster. If id is non-empty, leadership will be transferred to the
// node with the given ID.
Stepdown(wait bool, id string) error
}
// GetNodeMetaer is the interface that wraps the GetNodeMeta method.
@@ -1091,7 +1092,21 @@ func (s *Service) handleLeader(w http.ResponseWriter, r *http.Request, qp QueryP
case "POST":
// Trigger leader stepdown
wait := qp.Wait()
if err := s.store.Stepdown(wait); err != nil {
// Parse optional JSON body for target node ID
var nodeID string
if r.Header.Get("Content-Type") == "application/json" && r.ContentLength > 0 {
var reqBody struct {
ID string `json:"id"`
}
if err := json.NewDecoder(r.Body).Decode(&reqBody); err != nil {
http.Error(w, fmt.Sprintf("invalid JSON: %s", err.Error()), http.StatusBadRequest)
return
}
nodeID = reqBody.ID
}
if err := s.store.Stepdown(wait, nodeID); err != nil {
if err == store.ErrNotLeader {
if s.DoRedirect(w, r, qp) {
return
@@ -1110,6 +1125,7 @@ func (s *Service) handleLeader(w http.ResponseWriter, r *http.Request, qp QueryP
w.Header().Add(ServedByHTTPHeader, addr)
sr := &command.StepdownRequest{
Id: nodeID,
Wait: wait,
}
stepdownErr := s.cluster.Stepdown(sr, addr, makeCredentials(r), qp.Timeout(defaultTimeout))

View File

@@ -1519,7 +1519,7 @@ type MockStore struct {
snapshotFn func(n uint64) error
readFromFn func(r io.Reader) (int64, error)
committedFn func(timeout time.Duration) (uint64, error)
stepdownFn func(wait bool) error
stepdownFn func(wait bool, id string) error
leaderAddr string
notReady bool // Default value is true, easier to test.
}
@@ -1610,9 +1610,9 @@ func (m *MockStore) ReadFrom(r io.Reader) (int64, error) {
return 0, nil
}
func (m *MockStore) Stepdown(wait bool) error {
func (m *MockStore) Stepdown(wait bool, id string) error {
if m.stepdownFn != nil {
return m.stepdownFn(wait)
return m.stepdownFn(wait, id)
}
return nil
}
@@ -1714,7 +1714,7 @@ func Test_Leader_POST(t *testing.T) {
stepdownWait := false
store := &MockStore{
leaderAddr: "127.0.0.1:8001",
stepdownFn: func(wait bool) error {
stepdownFn: func(wait bool, id string) error {
stepdownCalled = true
stepdownWait = wait
return nil
@@ -1773,7 +1773,7 @@ func Test_LeaderPOST_ForwardToLeader(t *testing.T) {
stepdownCalled := false
store := &MockStore{
leaderAddr: "127.0.0.1:8001",
stepdownFn: func(wait bool) error {
stepdownFn: func(wait bool, id string) error {
return store.ErrNotLeader
},
}
@@ -1807,7 +1807,7 @@ func Test_LeaderPOST_ForwardToLeader(t *testing.T) {
func Test_LeaderPOST_ForwardError(t *testing.T) {
store := &MockStore{
leaderAddr: "127.0.0.1:8001",
stepdownFn: func(wait bool) error {
stepdownFn: func(wait bool, id string) error {
return store.ErrNotLeader
},
}
@@ -1856,6 +1856,168 @@ func Test_LeaderMethodNotAllowed(t *testing.T) {
}
}
func Test_Leader_POST_JSON_TargetNode(t *testing.T) {
stepdownCalled := false
stepdownWait := false
stepdownID := ""
store := &MockStore{
leaderAddr: "127.0.0.1:8001",
stepdownFn: func(wait bool, id string) error {
stepdownCalled = true
stepdownWait = wait
stepdownID = id
return nil
},
}
cluster := &mockClusterService{apiAddr: "http://127.0.0.1:4001"}
cred := &mockCredentialStore{HasPermOK: true}
s := New("127.0.0.1:4001", store, cluster, cred)
// Test with JSON body specifying target node
reqBody := `{"id": "node1"}`
req, err := http.NewRequest("POST", "/leader?wait", strings.NewReader(reqBody))
if err != nil {
t.Fatalf("failed to create POST request: %s", err.Error())
}
req.Header.Set("Content-Type", "application/json")
rr := httptest.NewRecorder()
s.ServeHTTP(rr, req)
if rr.Code != http.StatusOK {
t.Fatalf("expected 200, got %d", rr.Code)
}
if !stepdownCalled {
t.Fatalf("expected stepdown to be called")
}
if !stepdownWait {
t.Fatalf("expected stepdown to be called with wait=true")
}
if stepdownID != "node1" {
t.Fatalf("expected stepdown to be called with id='node1', got '%s'", stepdownID)
}
}
func Test_Leader_POST_JSON_EmptyID(t *testing.T) {
stepdownCalled := false
stepdownID := ""
store := &MockStore{
leaderAddr: "127.0.0.1:8001",
stepdownFn: func(wait bool, id string) error {
stepdownCalled = true
stepdownID = id
return nil
},
}
cluster := &mockClusterService{apiAddr: "http://127.0.0.1:4001"}
cred := &mockCredentialStore{HasPermOK: true}
s := New("127.0.0.1:4001", store, cluster, cred)
// Test with JSON body but empty ID (should behave like no JSON)
reqBody := `{"id": ""}`
req, err := http.NewRequest("POST", "/leader", strings.NewReader(reqBody))
if err != nil {
t.Fatalf("failed to create POST request: %s", err.Error())
}
req.Header.Set("Content-Type", "application/json")
rr := httptest.NewRecorder()
s.ServeHTTP(rr, req)
if rr.Code != http.StatusOK {
t.Fatalf("expected 200, got %d", rr.Code)
}
if !stepdownCalled {
t.Fatalf("expected stepdown to be called")
}
if stepdownID != "" {
t.Fatalf("expected stepdown to be called with empty id, got '%s'", stepdownID)
}
}
func Test_Leader_POST_JSON_InvalidJSON(t *testing.T) {
store := &MockStore{leaderAddr: "127.0.0.1:8001"}
cluster := &mockClusterService{apiAddr: "http://127.0.0.1:4001"}
cred := &mockCredentialStore{HasPermOK: true}
s := New("127.0.0.1:4001", store, cluster, cred)
// Test with invalid JSON
reqBody := `{"id": "node1"`
req, err := http.NewRequest("POST", "/leader", strings.NewReader(reqBody))
if err != nil {
t.Fatalf("failed to create POST request: %s", err.Error())
}
req.Header.Set("Content-Type", "application/json")
rr := httptest.NewRecorder()
s.ServeHTTP(rr, req)
if rr.Code != http.StatusBadRequest {
t.Fatalf("expected 400, got %d", rr.Code)
}
if !strings.Contains(rr.Body.String(), "invalid JSON") {
t.Fatalf("expected error message to contain 'invalid JSON', got %s", rr.Body.String())
}
}
func Test_Leader_POST_JSON_ForwardToLeader(t *testing.T) {
stepdownCalled := false
var forwardedRequest *command.StepdownRequest
store := &MockStore{
leaderAddr: "127.0.0.1:8001",
stepdownFn: func(wait bool, id string) error {
return store.ErrNotLeader
},
}
cluster := &mockClusterService{
apiAddr: "http://127.0.0.1:4001",
stepdownFn: func(sr *command.StepdownRequest, nodeAddr string, t time.Duration) error {
stepdownCalled = true
forwardedRequest = sr
return nil
},
}
cred := &mockCredentialStore{HasPermOK: true}
s := New("127.0.0.1:4001", store, cluster, cred)
// Test forwarding with target node ID
reqBody := `{"id": "node1"}`
req, err := http.NewRequest("POST", "/leader?wait", strings.NewReader(reqBody))
if err != nil {
t.Fatalf("failed to create POST request: %s", err.Error())
}
req.Header.Set("Content-Type", "application/json")
rr := httptest.NewRecorder()
s.ServeHTTP(rr, req)
if rr.Code != http.StatusOK {
t.Fatalf("expected 200, got %d", rr.Code)
}
if !stepdownCalled {
t.Fatalf("expected stepdown to be forwarded to leader")
}
if forwardedRequest == nil {
t.Fatalf("expected stepdown request to be forwarded")
}
if forwardedRequest.Id != "node1" {
t.Fatalf("expected forwarded request to have id='node1', got '%s'", forwardedRequest.Id)
}
if !forwardedRequest.Wait {
t.Fatalf("expected forwarded request to have wait=true")
}
}
type mockCredentialStore struct {
HasPermOK bool
aaFunc func(username, password, perm string) bool

View File

@@ -96,6 +96,9 @@ var (
// ErrNothingNewToSnapshot is returned when a snapshot is requested but there
// are no new log entries to snapshot.
ErrNothingNewToSnapshot = errors.New("nothing new to snapshot")
// ErrNodeNotFound is returned when a node with a given ID is not found in the cluster.
ErrNodeNotFound = errors.New("node not found in cluster")
)
const (
@@ -669,14 +672,48 @@ func (s *Store) Bootstrap(servers ...*Server) error {
return fut.Error()
}
// getServerAddressByID returns the server address for the given server ID.
// It returns an error if the server ID is not found in the cluster configuration.
func (s *Store) getServerAddressByID(id string) (raft.ServerAddress, error) {
configFuture := s.raft.GetConfiguration()
if configFuture.Error() != nil {
return "", configFuture.Error()
}
config := configFuture.Configuration()
for _, server := range config.Servers {
if string(server.ID) == id {
return server.Address, nil
}
}
return "", ErrNodeNotFound
}
// Stepdown forces this node to relinquish leadership to another node in
// the cluster. If this node is not the leader, and 'wait' is true, an error
// will be returned.
func (s *Store) Stepdown(wait bool) error {
// will be returned. If id is non-empty, leadership will be transferred to the
// node with the given ID.
func (s *Store) Stepdown(wait bool, id string) error {
if !s.open.Is() {
return ErrNotOpen
}
f := s.raft.LeadershipTransfer()
var f raft.Future
if id == "" {
// Transfer leadership to any available node
f = s.raft.LeadershipTransfer()
} else {
// Transfer leadership to specific node
targetAddr, err := s.getServerAddressByID(id)
if err != nil {
return err
}
// Transfer leadership to the specific node
f = s.raft.LeadershipTransferToServer(raft.ServerID(id), targetAddr)
}
if !wait {
return nil
}

View File

@@ -755,12 +755,12 @@ func Test_MultiNodeStepdown(t *testing.T) {
}
// Telling a follower to stepdown should fail.
if err := s1.Stepdown(true); err != ErrNotLeader {
if err := s1.Stepdown(true, ""); err != ErrNotLeader {
t.Fatalf("expected ErrNotLeader when follower is told to step down")
}
// Tell leader to step down. After this finishes there should be a new Leader.
if err := s0.Stepdown(true); err != nil {
if err := s0.Stepdown(true, ""); err != nil {
t.Fatalf("leader failed to step down: %s", err.Error())
}
@@ -774,6 +774,75 @@ func Test_MultiNodeStepdown(t *testing.T) {
testPoll(t, check, 250*time.Millisecond, 10*time.Second)
}
func Test_MultiNodeStepdownTargetNode(t *testing.T) {
s0, ln0 := mustNewStore(t)
defer ln0.Close()
if err := s0.Open(); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error())
}
defer s0.Close(true)
s1, ln1 := mustNewStore(t)
defer ln1.Close()
if err := s1.Open(); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error())
}
defer s1.Close(true)
s2, ln2 := mustNewStore(t)
defer ln2.Close()
if err := s2.Open(); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error())
}
defer s2.Close(true)
// Join stores
if err := s0.Bootstrap(NewServer(s0.ID(), s0.Addr(), true)); err != nil {
t.Fatalf("failed to bootstrap single-node store: %s", err.Error())
}
if _, err := s0.WaitForLeader(10 * time.Second); err != nil {
t.Fatalf("Error waiting for leader: %s", err)
}
// Join the second node to the cluster
if err := s0.Join(joinRequest(s1.ID(), s1.Addr(), true)); err != nil {
t.Fatalf("failed to join to node at %s: %s", s0.Addr(), err.Error())
}
if _, err := s1.WaitForLeader(10 * time.Second); err != nil {
t.Fatalf("Error waiting for leader: %s", err)
}
// Join the third node to the cluster
if err := s0.Join(joinRequest(s2.ID(), s2.Addr(), true)); err != nil {
t.Fatalf("failed to join to node at %s: %s", s0.Addr(), err.Error())
}
if _, err := s2.WaitForLeader(10 * time.Second); err != nil {
t.Fatalf("Error waiting for leader: %s", err)
}
// Test stepdown with invalid node ID
if err := s0.Stepdown(true, "nonexistent-node"); err == nil {
t.Fatalf("expected error when stepping down to nonexistent node")
} else if !strings.Contains(err.Error(), "not found in cluster") {
t.Fatalf("expected 'not found in cluster' error, got: %s", err.Error())
}
// Test stepdown with specific target node
if err := s0.Stepdown(true, s1.ID()); err != nil {
t.Fatalf("leader failed to step down to specific node: %s", err.Error())
}
// Verify that s1 became the new leader
check := func() bool {
leader, err := s1.WaitForLeader(10 * time.Second)
if err != nil || leader != s1.Addr() {
return false
}
return true
}
testPoll(t, check, 250*time.Millisecond, 10*time.Second)
}
func Test_MultiNodeStoreNotifyBootstrap(t *testing.T) {
s0, ln0 := mustNewStore(t)
defer ln0.Close()

View File

@@ -26,7 +26,7 @@ func Test_NonOpenStore(t *testing.T) {
defer s.Close(true)
defer ln.Close()
if err := s.Stepdown(false); err != ErrNotOpen {
if err := s.Stepdown(false, ""); err != ErrNotOpen {
t.Fatalf("wrong error received for non-open store: %s", err)
}
if s.IsLeader() {
@@ -2459,11 +2459,36 @@ func Test_SingleNodeStepdown(t *testing.T) {
}
// Tell leader to step down. Should fail as there is no other node available.
if err := s.Stepdown(true); err == nil {
if err := s.Stepdown(true, ""); err == nil {
t.Fatalf("single node stepped down OK")
}
}
func Test_SingleNodeStepdownInvalidID(t *testing.T) {
s, ln := mustNewStore(t)
defer ln.Close()
if err := s.Open(); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error())
}
defer s.Close(true)
if err := s.Bootstrap(NewServer(s.ID(), s.Addr(), true)); err != nil {
t.Fatalf("failed to bootstrap single-node store: %s", err.Error())
}
if _, err := s.WaitForLeader(10 * time.Second); err != nil {
t.Fatalf("Error waiting for leader: %s", err)
}
// Tell leader to step down with invalid node ID. Should fail with proper error.
invalidID := "nonexistent-node"
err := s.Stepdown(true, invalidID)
if err == nil {
t.Fatalf("stepdown with invalid node ID should have failed")
}
if !errors.Is(err, ErrNodeNotFound) {
t.Fatalf("expected ErrNodeNotFound, got %v", err)
}
}
func Test_SingleNodeStepdownNoWaitOK(t *testing.T) {
s, ln := mustNewStore(t)
defer ln.Close()
@@ -2479,7 +2504,7 @@ func Test_SingleNodeStepdownNoWaitOK(t *testing.T) {
}
// Tell leader to step down without waiting.
if err := s.Stepdown(false); err != nil {
if err := s.Stepdown(false, ""); err != nil {
t.Fatalf("single node reported error stepping down even when told not to wait")
}
}