Add HTTP API endpoint for leader stepdown functionality

This commit is contained in:
Copilot
2025-07-11 20:07:39 -04:00
committed by GitHub
parent 7cba6fdfcf
commit b4e92616f5
6 changed files with 411 additions and 0 deletions

View File

@@ -35,6 +35,8 @@ const (
PermLoad = "load" PermLoad = "load"
// PermSnapshot means user can request a snapshot. // PermSnapshot means user can request a snapshot.
PermSnapshot = "snapshot" PermSnapshot = "snapshot"
// PermLeaderOps means user can perform leader-related operations
PermLeaderOps = "leader-ops"
) )
// BasicAuther is the interface an object must support to return basic auth information. // BasicAuther is the interface an object must support to return basic auth information.

View File

@@ -97,6 +97,10 @@ type Store interface {
// the Raft system. It then triggers a Raft snapshot, which will then make // the Raft system. It then triggers a Raft snapshot, which will then make
// Raft aware of the new data. // Raft aware of the new data.
ReadFrom(r io.Reader) (int64, error) ReadFrom(r io.Reader) (int64, error)
// Stepdown forces this node to relinquish leadership to another node in
// the cluster.
Stepdown(wait bool) error
} }
// GetNodeMetaer is the interface that wraps the GetNodeMeta method. // GetNodeMetaer is the interface that wraps the GetNodeMeta method.
@@ -527,6 +531,8 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.handleStatus(w, r, params) s.handleStatus(w, r, params)
case strings.HasPrefix(r.URL.Path, "/nodes"): case strings.HasPrefix(r.URL.Path, "/nodes"):
s.handleNodes(w, r, params) s.handleNodes(w, r, params)
case r.URL.Path == "/leader":
s.handleLeader(w, r, params)
case strings.HasPrefix(r.URL.Path, "/readyz"): case strings.HasPrefix(r.URL.Path, "/readyz"):
stats.Add(numReadyz, 1) stats.Add(numReadyz, 1)
s.handleReadyz(w, r, params) s.handleReadyz(w, r, params)
@@ -1026,6 +1032,63 @@ func (s *Service) handleNodes(w http.ResponseWriter, r *http.Request, qp QueryPa
} }
} }
// handleLeader returns leader information, or triggers leader stepdown.
func (s *Service) handleLeader(w http.ResponseWriter, r *http.Request, qp QueryParams) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
if !s.CheckRequestPerm(r, auth.PermLeaderOps) {
w.WriteHeader(http.StatusUnauthorized)
return
}
switch r.Method {
case "GET":
// Return leader information
leaderAddr, err := s.store.LeaderAddr()
if err != nil {
statusCode := http.StatusInternalServerError
if err == store.ErrNotOpen {
statusCode = http.StatusServiceUnavailable
}
http.Error(w, fmt.Sprintf("leader address: %s", err.Error()), statusCode)
return
}
leaderAPIAddr := s.LeaderAPIAddr()
response := map[string]string{
"addr": leaderAddr,
"api_addr": leaderAPIAddr,
}
enc := json.NewEncoder(w)
if qp.Pretty() {
enc.SetIndent("", " ")
}
if err := enc.Encode(response); err != nil {
http.Error(w, fmt.Sprintf("JSON encode: %s", err.Error()),
http.StatusInternalServerError)
}
case "DELETE":
// Trigger leader stepdown
wait := qp.Wait()
if err := s.store.Stepdown(wait); err != nil {
statusCode := http.StatusInternalServerError
if err == store.ErrNotOpen {
statusCode = http.StatusServiceUnavailable
} else if err == store.ErrNotLeader {
statusCode = http.StatusServiceUnavailable
}
http.Error(w, fmt.Sprintf("stepdown: %s", err.Error()), statusCode)
return
}
w.WriteHeader(http.StatusOK)
default:
w.WriteHeader(http.StatusMethodNotAllowed)
}
}
// handleReadyz returns whether the node is ready. // handleReadyz returns whether the node is ready.
func (s *Service) handleReadyz(w http.ResponseWriter, r *http.Request, qp QueryParams) { func (s *Service) handleReadyz(w http.ResponseWriter, r *http.Request, qp QueryParams) {
if !s.CheckRequestPerm(r, auth.PermReady) { if !s.CheckRequestPerm(r, auth.PermReady) {

View File

@@ -414,6 +414,7 @@ func Test_401Routes_NoBasicAuth(t *testing.T) {
"/remove", "/remove",
"/status", "/status",
"/nodes", "/nodes",
"/leader",
"/readyz", "/readyz",
"/debug/vars", "/debug/vars",
"/debug/pprof/cmdline", "/debug/pprof/cmdline",
@@ -452,6 +453,7 @@ func Test_401Routes_BasicAuthBadPassword(t *testing.T) {
"/boot", "/boot",
"/status", "/status",
"/nodes", "/nodes",
"/leader",
"/readyz", "/readyz",
"/debug/vars", "/debug/vars",
"/debug/pprof/cmdline", "/debug/pprof/cmdline",
@@ -498,6 +500,7 @@ func Test_401Routes_BasicAuthBadPerm(t *testing.T) {
"/snapshot", "/snapshot",
"/status", "/status",
"/nodes", "/nodes",
"/leader",
"/readyz", "/readyz",
"/debug/vars", "/debug/vars",
"/debug/pprof/cmdline", "/debug/pprof/cmdline",
@@ -1515,6 +1518,7 @@ type MockStore struct {
snapshotFn func(n uint64) error snapshotFn func(n uint64) error
readFromFn func(r io.Reader) (int64, error) readFromFn func(r io.Reader) (int64, error)
committedFn func(timeout time.Duration) (uint64, error) committedFn func(timeout time.Duration) (uint64, error)
stepdownFn func(wait bool) error
leaderAddr string leaderAddr string
notReady bool // Default value is true, easier to test. notReady bool // Default value is true, easier to test.
} }
@@ -1603,6 +1607,13 @@ func (m *MockStore) ReadFrom(r io.Reader) (int64, error) {
return 0, nil return 0, nil
} }
func (m *MockStore) Stepdown(wait bool) error {
if m.stepdownFn != nil {
return m.stepdownFn(wait)
}
return nil
}
type mockClusterService struct { type mockClusterService struct {
apiAddr string apiAddr string
executeFn func(er *command.ExecuteRequest, addr string, t time.Duration) ([]*command.ExecuteQueryResponse, uint64, error) executeFn func(er *command.ExecuteRequest, addr string, t time.Duration) ([]*command.ExecuteQueryResponse, uint64, error)
@@ -1661,6 +1672,140 @@ func (m *mockClusterService) RemoveNode(rn *command.RemoveNodeRequest, addr stri
return nil return nil
} }
func Test_LeaderGET(t *testing.T) {
store := &MockStore{leaderAddr: "127.0.0.1:8001"}
cluster := &mockClusterService{apiAddr: "http://127.0.0.1:4001"}
cred := &mockCredentialStore{HasPermOK: true}
s := New("127.0.0.1:4001", store, cluster, cred)
// Test GET request
req, err := http.NewRequest("GET", "/leader", nil)
if err != nil {
t.Fatalf("failed to create GET request: %s", err.Error())
}
rr := httptest.NewRecorder()
s.ServeHTTP(rr, req)
if rr.Code != http.StatusOK {
t.Fatalf("expected 200, got %d", rr.Code)
}
// Check response body
expected := `{"addr":"127.0.0.1:8001","api_addr":"http://127.0.0.1:4001"}`
actual := strings.TrimSpace(rr.Body.String())
if actual != expected {
t.Fatalf("expected %s, got %s", expected, actual)
}
}
func Test_LeaderDELETE(t *testing.T) {
stepdownCalled := false
stepdownWait := false
store := &MockStore{
leaderAddr: "127.0.0.1:8001",
stepdownFn: func(wait bool) error {
stepdownCalled = true
stepdownWait = wait
return nil
},
}
cluster := &mockClusterService{apiAddr: "http://127.0.0.1:4001"}
cred := &mockCredentialStore{HasPermOK: true}
s := New("127.0.0.1:4001", store, cluster, cred)
// Test DELETE request without wait
req, err := http.NewRequest("DELETE", "/leader", nil)
if err != nil {
t.Fatalf("failed to create DELETE request: %s", err.Error())
}
rr := httptest.NewRecorder()
s.ServeHTTP(rr, req)
if rr.Code != http.StatusOK {
t.Fatalf("expected 200, got %d", rr.Code)
}
if !stepdownCalled {
t.Fatalf("expected stepdown to be called")
}
if stepdownWait {
t.Fatalf("expected stepdown to be called with wait=false")
}
// Test DELETE request with wait
stepdownCalled = false
stepdownWait = false
req, err = http.NewRequest("DELETE", "/leader?wait", nil)
if err != nil {
t.Fatalf("failed to create DELETE request with wait: %s", err.Error())
}
rr = httptest.NewRecorder()
s.ServeHTTP(rr, req)
if rr.Code != http.StatusOK {
t.Fatalf("expected 200, got %d", rr.Code)
}
if !stepdownCalled {
t.Fatalf("expected stepdown to be called")
}
if !stepdownWait {
t.Fatalf("expected stepdown to be called with wait=true")
}
}
func Test_LeaderDELETE_Error(t *testing.T) {
store := &MockStore{
leaderAddr: "127.0.0.1:8001",
stepdownFn: func(wait bool) error {
return store.ErrNotLeader
},
}
cluster := &mockClusterService{apiAddr: "http://127.0.0.1:4001"}
cred := &mockCredentialStore{HasPermOK: true}
s := New("127.0.0.1:4001", store, cluster, cred)
req, err := http.NewRequest("DELETE", "/leader", nil)
if err != nil {
t.Fatalf("failed to create DELETE request: %s", err.Error())
}
rr := httptest.NewRecorder()
s.ServeHTTP(rr, req)
if rr.Code != http.StatusServiceUnavailable {
t.Fatalf("expected 503, got %d", rr.Code)
}
if !strings.Contains(rr.Body.String(), "stepdown") {
t.Fatalf("expected error message to contain 'stepdown', got %s", rr.Body.String())
}
}
func Test_LeaderMethodNotAllowed(t *testing.T) {
store := &MockStore{leaderAddr: "127.0.0.1:8001"}
cluster := &mockClusterService{apiAddr: "http://127.0.0.1:4001"}
cred := &mockCredentialStore{HasPermOK: true}
s := New("127.0.0.1:4001", store, cluster, cred)
req, err := http.NewRequest("POST", "/leader", nil)
if err != nil {
t.Fatalf("failed to create POST request: %s", err.Error())
}
rr := httptest.NewRecorder()
s.ServeHTTP(rr, req)
if rr.Code != http.StatusMethodNotAllowed {
t.Fatalf("expected 405, got %d", rr.Code)
}
}
type mockCredentialStore struct { type mockCredentialStore struct {
HasPermOK bool HasPermOK bool
aaFunc func(username, password, perm string) bool aaFunc func(username, password, perm string) bool

View File

@@ -2,6 +2,7 @@ package system
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"net" "net"
"os" "os"
@@ -2262,6 +2263,113 @@ func Test_MultiNodeCluster_Boot(t *testing.T) {
} }
} }
func Test_ClusterLeader_GET(t *testing.T) {
leader := mustNewLeaderNode("leader1")
defer leader.Deprovision()
follower := mustNewNode("follower1", false)
defer follower.Deprovision()
if err := follower.Join(leader); err != nil {
t.Fatalf("failed to join node: %s", err.Error())
}
// Test leader endpoint on leader
leaderInfo, err := leader.Leader()
if err != nil {
t.Fatalf("failed to get leader info from leader: %s", err.Error())
}
// Parse the JSON response
var leaderData map[string]string
if err := json.Unmarshal([]byte(leaderInfo), &leaderData); err != nil {
t.Fatalf("failed to parse leader response: %s", err.Error())
}
// Check that we have addr and api_addr
if leaderData["addr"] == "" {
t.Fatalf("leader addr is empty")
}
if leaderData["api_addr"] == "" {
t.Fatalf("leader api_addr is empty")
}
// Test leader endpoint on follower - should return same leader info
followerInfo, err := follower.Leader()
if err != nil {
t.Fatalf("failed to get leader info from follower: %s", err.Error())
}
var followerData map[string]string
if err := json.Unmarshal([]byte(followerInfo), &followerData); err != nil {
t.Fatalf("failed to parse follower leader response: %s", err.Error())
}
// Both should report the same leader
if leaderData["addr"] != followerData["addr"] {
t.Fatalf("leader and follower report different leader addresses: %s vs %s",
leaderData["addr"], followerData["addr"])
}
}
func Test_ClusterLeader_Stepdown(t *testing.T) {
leader := mustNewLeaderNode("leader1")
defer leader.Deprovision()
follower := mustNewNode("follower1", false)
defer follower.Deprovision()
if err := follower.Join(leader); err != nil {
t.Fatalf("failed to join node: %s", err.Error())
}
// Wait for cluster to stabilize
if _, err := leader.WaitForLeader(); err != nil {
t.Fatalf("failed waiting for leader: %s", err.Error())
}
// Get leader info before stepdown
leaderInfoBefore, err := leader.Leader()
if err != nil {
t.Fatalf("failed to get leader info before stepdown: %s", err.Error())
}
var leaderDataBefore map[string]string
if err := json.Unmarshal([]byte(leaderInfoBefore), &leaderDataBefore); err != nil {
t.Fatalf("failed to parse leader response before stepdown: %s", err.Error())
}
// Test stepdown on leader with wait=true for maximum testing
err = leader.Stepdown(true)
if err != nil {
t.Fatalf("failed to trigger stepdown on leader: %s", err.Error())
}
// Give some time for leadership transition
time.Sleep(2 * time.Second)
// Verify that cluster still has a leader (follower should become leader)
if _, err := follower.WaitForLeader(); err != nil {
t.Fatalf("cluster has no leader after stepdown: %s", err.Error())
}
// Get leader info after stepdown and confirm it's different
leaderInfoAfter, err := follower.Leader()
if err != nil {
t.Fatalf("failed to get leader info after stepdown: %s", err.Error())
}
var leaderDataAfter map[string]string
if err := json.Unmarshal([]byte(leaderInfoAfter), &leaderDataAfter); err != nil {
t.Fatalf("failed to parse leader response after stepdown: %s", err.Error())
}
// Confirm that the leader has changed
if leaderDataBefore["addr"] == leaderDataAfter["addr"] {
t.Fatalf("leader did not change after stepdown: %s", leaderDataBefore["addr"])
}
}
func sleepForSecond() { func sleepForSecond() {
time.Sleep(mustParseDuration("1s")) time.Sleep(mustParseDuration("1s"))
} }

View File

@@ -360,6 +360,52 @@ func (n *Node) Status() (string, error) {
return string(body), nil return string(body), nil
} }
// Leader returns the leader information for the node.
func (n *Node) Leader() (string, error) {
v, _ := url.Parse("http://" + n.APIAddr + "/leader")
resp, err := http.Get(v.String())
if err != nil {
return "", err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("failed to read leader response: %w", err)
}
if resp.StatusCode != 200 {
return "", fmt.Errorf("leader endpoint returned: %s (%s)", resp.Status,
strings.TrimSuffix(string(body), "\n"))
}
return string(body), nil
}
// Stepdown triggers leader stepdown on the node.
func (n *Node) Stepdown(wait bool) error {
urlStr := "http://" + n.APIAddr + "/leader"
if wait {
urlStr += "?wait"
}
req, err := http.NewRequest("DELETE", urlStr, nil)
if err != nil {
return err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("stepdown endpoint returned: %s (%s)", resp.Status,
strings.TrimSuffix(string(body), "\n"))
}
return nil
}
// IsVoter returns whether the node is a voter or not. // IsVoter returns whether the node is a voter or not.
func (n *Node) IsVoter() (bool, error) { func (n *Node) IsVoter() (bool, error) {
statusJSON, err := n.Status() statusJSON, err := n.Status()

View File

@@ -5,6 +5,7 @@ package system
import ( import (
"compress/gzip" "compress/gzip"
"encoding/json"
"fmt" "fmt"
"io" "io"
"os" "os"
@@ -1851,3 +1852,49 @@ func Test_SingleNodeLoad_OK(t *testing.T) {
t.Fatalf("test received wrong result got %s", r) t.Fatalf("test received wrong result got %s", r)
} }
} }
func Test_SingleNodeLeader_GET(t *testing.T) {
node := mustNewLeaderNode("leader1")
defer node.Deprovision()
// Test leader endpoint GET
leader, err := node.Leader()
if err != nil {
t.Fatalf("failed to get leader: %s", err.Error())
}
// Parse the JSON response
var leaderInfo map[string]string
if err := json.Unmarshal([]byte(leader), &leaderInfo); err != nil {
t.Fatalf("failed to parse leader response: %s", err.Error())
}
// Check that we have addr and api_addr
if leaderInfo["addr"] == "" {
t.Fatalf("leader addr is empty")
}
if leaderInfo["api_addr"] == "" {
t.Fatalf("leader api_addr is empty")
}
}
func Test_SingleNodeLeader_Stepdown(t *testing.T) {
node := mustNewLeaderNode("leader1")
defer node.Deprovision()
// Test stepdown without wait - should succeed but not wait for result
err := node.Stepdown(false)
if err != nil {
t.Fatalf("failed to trigger stepdown: %s", err.Error())
}
// Test stepdown with wait - should fail in single node cluster as there's no peer to transfer to
err = node.Stepdown(true)
if err == nil {
t.Fatalf("expected stepdown with wait to fail in single-node cluster")
}
// Check that it's the expected error
if !strings.Contains(err.Error(), "cannot find peer") {
t.Fatalf("unexpected error message: %s", err.Error())
}
}