mirror of
https://github.com/rqlite/rqlite.git
synced 2026-01-25 04:16:26 +00:00
Support SQL-format loading via Follower
This commit is contained in:
@@ -2,6 +2,7 @@
|
||||
### Implementation changes and bug fixes
|
||||
- [PR #2056](https://github.com/rqlite/rqlite/pull/2056): Minor refactoring of HTTP credentials handling.
|
||||
- [PR #2054](https://github.com/rqlite/rqlite/pull/2054): Bump golang.org/x/net from 0.35.0 to 0.36.0.
|
||||
- [PR #2055](https://github.com/rqlite/rqlite/pull/2055): Support SQL-format loading via Follower. Fixes issue [#2053](https://github.com/rqlite/rqlite/issues/2053).
|
||||
|
||||
## v8.36.13 (March 11th 2025)
|
||||
### Implementation changes and bug fixes
|
||||
|
||||
@@ -687,6 +687,27 @@ func (s *Service) handleLoad(w http.ResponseWriter, r *http.Request, qp QueryPar
|
||||
return
|
||||
}
|
||||
|
||||
// Determine some perhaps-needed details.
|
||||
ldrAddr, err := s.store.LeaderAddr()
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("leader address: %s", err.Error()),
|
||||
http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
if ldrAddr == "" {
|
||||
stats.Add(numLeaderNotFound, 1)
|
||||
http.Error(w, ErrLeaderNotFound.Error(), http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
|
||||
handleRemoteErr := func(err error) {
|
||||
if err.Error() == "unauthorized" {
|
||||
http.Error(w, "remote load not authorized", http.StatusUnauthorized)
|
||||
} else {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
}
|
||||
}
|
||||
|
||||
resp := NewResponse()
|
||||
b, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
@@ -710,27 +731,10 @@ func (s *Service) handleLoad(w http.ResponseWriter, r *http.Request, qp QueryPar
|
||||
return
|
||||
}
|
||||
|
||||
addr, err := s.store.LeaderAddr()
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("leader address: %s", err.Error()),
|
||||
http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
if addr == "" {
|
||||
stats.Add(numLeaderNotFound, 1)
|
||||
http.Error(w, ErrLeaderNotFound.Error(), http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Add(ServedByHTTPHeader, addr)
|
||||
loadErr := s.cluster.Load(lr, addr, makeCredentials(r),
|
||||
qp.Timeout(defaultTimeout), qp.Retries(0))
|
||||
w.Header().Add(ServedByHTTPHeader, ldrAddr)
|
||||
loadErr := s.cluster.Load(lr, ldrAddr, makeCredentials(r), qp.Timeout(defaultTimeout), qp.Retries(0))
|
||||
if loadErr != nil {
|
||||
if loadErr.Error() == "unauthorized" {
|
||||
http.Error(w, "remote load not authorized", http.StatusUnauthorized)
|
||||
} else {
|
||||
http.Error(w, loadErr.Error(), http.StatusInternalServerError)
|
||||
}
|
||||
handleRemoteErr(loadErr)
|
||||
return
|
||||
}
|
||||
stats.Add(numRemoteLoads, 1)
|
||||
@@ -738,7 +742,7 @@ func (s *Service) handleLoad(w http.ResponseWriter, r *http.Request, qp QueryPar
|
||||
// forwarding was put in place.
|
||||
}
|
||||
} else {
|
||||
// No JSON structure expected for this API.
|
||||
// No JSON structure expected for this API, just a bunch of SQL statements.
|
||||
queries := []string{string(b)}
|
||||
er := executeRequestFromStrings(queries, qp.Timings(), false)
|
||||
|
||||
@@ -748,9 +752,24 @@ func (s *Service) handleLoad(w http.ResponseWriter, r *http.Request, qp QueryPar
|
||||
if s.DoRedirect(w, r, qp) {
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Add(ServedByHTTPHeader, ldrAddr)
|
||||
var exErr error
|
||||
response, exErr = s.cluster.Execute(er, ldrAddr, makeCredentials(r),
|
||||
qp.Timeout(defaultTimeout), qp.Retries(0))
|
||||
if exErr != nil {
|
||||
handleRemoteErr(exErr)
|
||||
return
|
||||
}
|
||||
resp.Results.ExecuteQueryResponse = response
|
||||
stats.Add(numRemoteLoads, 1)
|
||||
} else {
|
||||
// Local execute failed for some reason other than not
|
||||
// being the leader. Nothing we can do here.
|
||||
resp.Error = err.Error()
|
||||
}
|
||||
resp.Error = err.Error()
|
||||
} else {
|
||||
// Successful local execute.
|
||||
resp.Results.ExecuteQueryResponse = response
|
||||
}
|
||||
resp.end = time.Now()
|
||||
|
||||
@@ -709,7 +709,9 @@ func Test_BackupFlagsInvalid(t *testing.T) {
|
||||
}
|
||||
|
||||
func Test_LoadOK(t *testing.T) {
|
||||
m := &MockStore{}
|
||||
m := &MockStore{
|
||||
leaderAddr: "foo:1234",
|
||||
}
|
||||
c := &mockClusterService{}
|
||||
s := New("127.0.0.1:0", m, c, nil)
|
||||
if err := s.Start(); err != nil {
|
||||
@@ -725,7 +727,7 @@ func Test_LoadOK(t *testing.T) {
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
t.Fatalf("failed to get expected StatusOK for load, got %d", resp.StatusCode)
|
||||
t.Fatalf("failed to get expected StatusOK for load, got %d, %s", resp.StatusCode, mustReadBody(t, resp))
|
||||
}
|
||||
if exp, got := `{"results":[]}`, mustReadBody(t, resp); exp != got {
|
||||
t.Fatalf("incorrect response body, exp: %s, got %s", exp, got)
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -1456,6 +1457,39 @@ COMMIT;
|
||||
}
|
||||
}
|
||||
|
||||
func Test_MultiNodeCluster_FollowerLoad_SQL(t *testing.T) {
|
||||
node1 := mustNewLeaderNode("leader1")
|
||||
defer node1.Deprovision()
|
||||
|
||||
node2 := mustNewNode("node2", false)
|
||||
defer node2.Deprovision()
|
||||
if err := node2.Join(node1); err != nil {
|
||||
t.Fatalf("node failed to join leader: %s", err.Error())
|
||||
}
|
||||
_, err := node2.WaitForLeader()
|
||||
if err != nil {
|
||||
t.Fatalf("failed waiting for leader: %s", err.Error())
|
||||
}
|
||||
|
||||
// Get a follower, make sure Load works via it.
|
||||
c := Cluster{node1, node2}
|
||||
followers, err := c.Followers()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to get followers: %s", err.Error())
|
||||
}
|
||||
if _, err := followers[0].Load(filepath.Join("testdata", "auto-restore.sql")); err != nil {
|
||||
t.Fatalf("failed to load via follower: %s", err.Error())
|
||||
}
|
||||
|
||||
r, err := node1.QueryStrongConsistency("SELECT * FROM foo WHERE id=2")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to execute query: %s", err.Error())
|
||||
}
|
||||
if r != `{"results":[{"columns":["id","name"],"types":["integer","text"],"values":[[2,"fiona"]]}]}` {
|
||||
t.Fatalf("test received wrong result got %s", r)
|
||||
}
|
||||
}
|
||||
|
||||
// Test_MultiNodeClusterWithNonVoter tests formation of a 4-node cluster, one of which is
|
||||
// a non-voter. This test also checks that if the Leader changes the non-voter is still in
|
||||
// the cluster and gets updates from the new leader.
|
||||
|
||||
@@ -267,7 +267,7 @@ func (n *Node) Backup(filename string, compress bool, format string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Load loads a SQLite database file into the node.
|
||||
// Boot boots a node using a SQLite database file.
|
||||
func (n *Node) Boot(filename string) (string, error) {
|
||||
return n.postFile("/boot", filename)
|
||||
}
|
||||
|
||||
@@ -1823,3 +1823,21 @@ func Test_SingleNodeBoot_FailNotLeader(t *testing.T) {
|
||||
t.Fatalf("expected error loading data")
|
||||
}
|
||||
}
|
||||
|
||||
func Test_SingleNodeLoad_OK(t *testing.T) {
|
||||
node := mustNewLeaderNode("leader1")
|
||||
defer node.Deprovision()
|
||||
|
||||
_, err := node.Load(filepath.Join("testdata", "auto-restore.sql"))
|
||||
if err != nil {
|
||||
t.Fatalf("failed to load data: %s", err.Error())
|
||||
}
|
||||
|
||||
r, err := node.Query("SELECT * FROM foo WHERE id=2")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to execute query: %s", err.Error())
|
||||
}
|
||||
if r != `{"results":[{"columns":["id","name"],"types":["integer","text"],"values":[[2,"fiona"]]}]}` {
|
||||
t.Fatalf("test received wrong result got %s", r)
|
||||
}
|
||||
}
|
||||
|
||||
7
system_test/testdata/auto-restore.sql
vendored
Normal file
7
system_test/testdata/auto-restore.sql
vendored
Normal file
@@ -0,0 +1,7 @@
|
||||
PRAGMA foreign_keys=OFF;
|
||||
BEGIN TRANSACTION;
|
||||
CREATE TABLE foo (id integer not null primary key, name text);
|
||||
INSERT INTO foo VALUES(1,'fiona');
|
||||
INSERT INTO foo VALUES(2,'fiona');
|
||||
INSERT INTO foo VALUES(3,'fiona');
|
||||
COMMIT;
|
||||
Reference in New Issue
Block a user