Implement Exclude commands in gRPC (#12603)

This commit is contained in:
Vishesh Yadav
2026-01-12 11:38:36 -08:00
committed by GitHub
parent 2c06c99f1d
commit af732673f1
6 changed files with 471 additions and 50 deletions

View File

@@ -23,10 +23,6 @@
#include <chrono>
#include "fdbctl/ControlService.h"
#include "fdbclient/ClientKnobs.h"
#include "fdbclient/Knobs.h"
#include "flow/ThreadHelper.actor.h"
#include "flow/genericactors.actor.h"
namespace fdbctl {
@@ -34,45 +30,5 @@ using namespace std::chrono_literals;
ControlServiceImpl::ControlServiceImpl(Reference<IDatabase> db) : Service(), db_(db) {}
template <class Handler, class Request, class Reply>
Future<grpc::Status> grpcHandlerWrapper(Reference<IDatabase> db,
Handler* h,
const Request* req,
Reply* rep,
grpc::ServerContext* context) {
try {
double timeout = CLIENT_KNOBS->GRPC_CTL_SERVICE_DEFAULT_TIMEOUT; // Default timeout when no deadline is set
auto deadline = context->deadline();
auto now = std::chrono::system_clock::now();
// TODO: Should use FDBs now(), but in this world we are breaking determinism anyway at this
// point.
if (deadline != std::chrono::system_clock::time_point::max()) {
auto grpc_timeout = std::chrono::duration_cast<std::chrono::seconds>(deadline - now).count();
timeout = static_cast<double>(grpc_timeout);
}
if (timeout <= 0) {
co_return grpc::Status(grpc::StatusCode::DEADLINE_EXCEEDED, "Request deadline already exceeded");
}
co_return co_await timeoutError((*h)(db, req, rep), timeout);
} catch (Error& e) {
if (e.code() == error_code_timed_out) {
co_return grpc::Status(grpc::StatusCode::DEADLINE_EXCEEDED, "Operation timed out");
}
co_return grpc::Status(grpc::StatusCode::INTERNAL, fmt::format("Unknown error '{}'", e.name()));
}
}
template <class Handler, class Request, class Reply>
grpc::Status ControlServiceImpl::handleRequestOnMainThread(Handler* h,
const Request* req,
Reply* rep,
grpc::ServerContext* context) {
return onMainThread([=]() { return grpcHandlerWrapper(db_, h, req, rep, context); }).getBlocking();
}
} // namespace fdbctl
#endif // FLOW_GRPC_ENABLED

422
fdbctl/ExcludeCommand.cpp Normal file
View File

@@ -0,0 +1,422 @@
/*
* ExcludeCommand.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2025 Apple Inc. and the FoundationDB project authors
*
I * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifdef FLOW_GRPC_ENABLED
#include "fdbctl/ControlCommands.h"
#include "fdbclient/IClientApi.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/CoordinationInterface.h"
#include "fdbclient/Schemas.h"
#include "fdbclient/StorageServerInterface.h"
#include "flow/genericactors.actor.h"
#include "fdbctl/ControlCommands.h"
#include "fmt/format.h"
#include <boost/algorithm/string.hpp>
#include <map>
namespace fdbctl {
namespace utils {
Future<std::vector<std::string>> getExcludedServers(Reference<IDatabase> db) {
Reference<ITransaction> tr = db->createTransaction();
loop {
Error err;
try {
ThreadFuture<RangeResult> resultFuture =
tr->getRange(special_keys::excludedServersSpecialKeyRange, CLIENT_KNOBS->TOO_MANY);
RangeResult r = co_await safeThreadFutureToFuture(resultFuture);
ASSERT(!r.more && r.size() < CLIENT_KNOBS->TOO_MANY);
std::vector<std::string> exclusions;
for (const auto& i : r) {
auto addr = i.key.removePrefix(special_keys::excludedServersSpecialKeyRange.begin).toString();
exclusions.push_back(addr);
}
co_return exclusions;
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw e;
}
TraceEvent(SevWarn, "GetExcludedServersError").error(e);
err = e;
}
co_await safeThreadFutureToFuture(tr->onError(err));
}
}
Future<std::vector<std::string>> getFailedServers(Reference<IDatabase> db) {
Reference<ITransaction> tr = db->createTransaction();
loop {
Error err;
try {
ThreadFuture<RangeResult> resultFuture =
tr->getRange(special_keys::failedServersSpecialKeyRange, CLIENT_KNOBS->TOO_MANY);
RangeResult r = co_await safeThreadFutureToFuture(resultFuture);
ASSERT(!r.more && r.size() < CLIENT_KNOBS->TOO_MANY);
std::vector<std::string> exclusions;
for (const auto& i : r) {
auto addr = i.key.removePrefix(special_keys::failedServersSpecialKeyRange.begin).toString();
exclusions.push_back(addr);
}
co_return exclusions;
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw e;
}
TraceEvent(SevWarn, "GetExcludedServersError").error(e);
err = e;
}
co_await safeThreadFutureToFuture(tr->onError(err));
}
}
Future<std::vector<std::string>> getExcludedLocalities(Reference<IDatabase> db) {
Reference<ITransaction> tr = db->createTransaction();
loop {
Error err;
try {
ThreadFuture<RangeResult> resultFuture =
tr->getRange(special_keys::excludedLocalitySpecialKeyRange, CLIENT_KNOBS->TOO_MANY);
RangeResult r = co_await safeThreadFutureToFuture(resultFuture);
ASSERT(!r.more && r.size() < CLIENT_KNOBS->TOO_MANY);
std::vector<std::string> excludedLocalities;
for (const auto& i : r) {
auto locality = i.key.removePrefix(special_keys::excludedLocalitySpecialKeyRange.begin).toString();
excludedLocalities.push_back(locality);
}
co_return excludedLocalities;
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw e;
}
TraceEvent(SevWarn, "GetExcludedLocalitiesError").error(e);
err = e;
}
co_await safeThreadFutureToFuture(tr->onError(err));
}
}
Future<std::set<NetworkAddress>> getInProgressExclusion(Reference<ITransaction> tr) {
ThreadFuture<RangeResult> resultFuture =
tr->getRange(fdbctl::special_keys::exclusionInProgressSpecialKeyRange, CLIENT_KNOBS->TOO_MANY);
RangeResult result = co_await safeThreadFutureToFuture(resultFuture);
ASSERT(!result.more && result.size() < CLIENT_KNOBS->TOO_MANY);
std::set<NetworkAddress> inProgressExclusion;
for (const auto& addr : result) {
inProgressExclusion.insert(NetworkAddress::parse(
addr.key.removePrefix(fdbctl::special_keys::exclusionInProgressSpecialKeyRange.begin).toString()));
}
co_return inProgressExclusion;
}
Future<std::vector<std::string>> getFailedLocalities(Reference<IDatabase> db) {
Reference<ITransaction> tr = db->createTransaction();
loop {
Error err;
try {
ThreadFuture<RangeResult> resultFuture =
tr->getRange(special_keys::failedLocalitySpecialKeyRange, CLIENT_KNOBS->TOO_MANY);
RangeResult r = co_await safeThreadFutureToFuture(resultFuture);
ASSERT(!r.more && r.size() < CLIENT_KNOBS->TOO_MANY);
std::vector<std::string> excludedLocalities;
for (const auto& i : r) {
auto locality = i.key.removePrefix(special_keys::failedLocalitySpecialKeyRange.begin).toString();
excludedLocalities.push_back(locality);
}
co_return excludedLocalities;
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw e;
}
TraceEvent(SevWarn, "GetFailedLocalitiesError").error(e);
err = e;
}
co_await safeThreadFutureToFuture(tr->onError(err));
}
}
} // namespace utils
Future<Void> excludeServersAndLocalities(Reference<IDatabase> db,
std::vector<AddressExclusion> servers,
std::unordered_set<std::string> localities,
bool failed,
bool force) {
Reference<ITransaction> tr = db->createTransaction();
loop {
Error err;
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
try {
if (force && servers.size())
tr->set(failed ? special_keys::failedForceOptionSpecialKey
: special_keys::excludedForceOptionSpecialKey,
ValueRef());
for (const auto& s : servers) {
Key addr = failed ? special_keys::failedServersSpecialKeyRange.begin.withSuffix(s.toString())
: special_keys::excludedServersSpecialKeyRange.begin.withSuffix(s.toString());
tr->set(addr, ValueRef());
}
if (force && localities.size())
tr->set(failed ? special_keys::failedLocalityForceOptionSpecialKey
: special_keys::excludedLocalityForceOptionSpecialKey,
ValueRef());
for (const auto& l : localities) {
Key addr = failed ? special_keys::failedLocalitySpecialKeyRange.begin.withSuffix(l)
: special_keys::excludedLocalitySpecialKeyRange.begin.withSuffix(l);
tr->set(addr, ValueRef());
}
co_await safeThreadFutureToFuture(tr->commit());
co_return;
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw e;
}
TraceEvent(SevWarn, "ExcludeServersAndLocalitiesError").error(e);
err = e;
}
if (err.code() == error_code_special_keys_api_failure) {
std::string errorMsgStr = co_await utils::getSpecialKeysFailureErrorMessage(tr);
// last character is \n
auto pos = errorMsgStr.find_last_of("\n", errorMsgStr.size() - 2);
auto last_line = errorMsgStr.substr(pos + 1);
TraceEvent(SevWarn, "ExcludeServerAndLocalitiesError").error(err).detail("Message", last_line);
throw err;
}
TraceEvent(SevWarn, "ExcludeServersAndLocalitiesError").error(err);
co_await safeThreadFutureToFuture(tr->onError(err));
}
}
Future<std::set<NetworkAddress>> checkForExcludingServers(Reference<IDatabase> db,
std::set<AddressExclusion> exclusions,
bool waitForAllExcluded) {
std::set<NetworkAddress> inProgressExclusion;
Reference<ITransaction> tr = db->createTransaction();
loop {
Error err;
inProgressExclusion.clear();
try {
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
std::set<NetworkAddress> result = co_await utils::getInProgressExclusion(tr);
if (result.empty())
co_return inProgressExclusion;
inProgressExclusion = result;
// Check if all of the specified exclusions are done.
bool allExcluded = true;
for (const auto& inProgressAddr : inProgressExclusion) {
if (!allExcluded) {
break;
}
for (const auto& exclusion : exclusions) {
// We found an exclusion that is still in progress
if (exclusion.excludes(inProgressAddr)) {
allExcluded = false;
break;
}
}
}
if (allExcluded) {
inProgressExclusion.clear();
co_return inProgressExclusion;
}
if (!waitForAllExcluded)
break;
co_await delayJittered(1.0); // SOMEDAY: watches!
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw;
}
TraceEvent(SevWarn, "CheckForExcludingServersError").error(e);
err = e;
}
co_await safeThreadFutureToFuture(tr->onError(err));
}
co_return inProgressExclusion;
}
Future<grpc::Status> exclude(Reference<IDatabase> db, const ExcludeRequest* req, ExcludeReply* rep) {
try {
std::vector<ProcessData> workers;
std::map<std::string, StorageServerInterface> server_interfaces;
Future<bool> future_workers = utils::getWorkersProcessData(db, &workers);
Future<Void> future_server_interfaces = utils::getStorageServerInterfaces(db, &server_interfaces);
co_await success(future_workers);
co_await future_server_interfaces;
bool force = req->force();
bool waitForAllExcluded = !req->no_wait();
bool markFailed = req->failed();
std::set<AddressExclusion> exclusionSet;
std::unordered_set<std::string> exclusionLocalities;
std::vector<std::string> noMatchLocalities;
for (auto& loc : req->localities()) {
ASSERT(loc.starts_with(LocalityData::ExcludeLocalityPrefix.toString()) &&
loc.find(':') != std::string::npos);
exclusionLocalities.insert(loc);
auto localityAddresses = getAddressesByLocality(workers, loc);
auto localityServerAddresses = getServerAddressesByLocality(server_interfaces, loc);
if (localityAddresses.empty() && localityServerAddresses.empty()) {
noMatchLocalities.push_back(loc);
}
if (!localityAddresses.empty()) {
exclusionSet.insert(localityAddresses.begin(), localityAddresses.end());
}
if (!localityServerAddresses.empty()) {
exclusionSet.insert(localityServerAddresses.begin(), localityServerAddresses.end());
}
}
std::vector<AddressExclusion> exclusionAddresses;
for (auto& addr : req->processes()) {
auto a = AddressExclusion::parse(addr);
if (!a.isValid()) {
co_return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "");
}
exclusionSet.insert(a);
exclusionAddresses.push_back(a);
}
// The validation if a locality or address has no match is done below and will result in a warning. If we abort
// here the provided locality and/or address will not be excluded.
if (exclusionAddresses.empty() && exclusionLocalities.empty()) {
co_return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
"At least one valid network endpoint address or a locality must be provided.");
}
try {
co_await excludeServersAndLocalities(db, exclusionAddresses, exclusionLocalities, markFailed, force);
} catch (Error& e) {
co_return grpc::Status(grpc::StatusCode::INTERNAL, fmt::format("error: ", e.name()));
}
std::set<NetworkAddress> notExcludedServers =
co_await checkForExcludingServers(db, exclusionSet, waitForAllExcluded);
// Determine if data movement is complete
rep->set_data_movement_complete(notExcludedServers.empty());
// Populate the list of excluded addresses
for (const auto& addr : exclusionSet) {
rep->add_excluded_addresses(addr.toString());
}
// Build a map of worker addresses for quick lookup
std::map<IPAddress, std::set<uint16_t>> workerPorts;
for (const auto& addr : workers)
workerPorts[addr.address.ip].insert(addr.address.port);
// Find all excluded addresses that don't have a corresponding worker
std::set<AddressExclusion> absentExclusions;
for (const auto& addr : exclusionSet) {
auto worker = workerPorts.find(addr.ip);
if (worker == workerPorts.end())
absentExclusions.insert(addr);
else if (addr.port > 0 && worker->second.count(addr.port) == 0)
absentExclusions.insert(addr);
}
// Populate absent_addresses field
for (const auto& addr : absentExclusions) {
rep->add_absent_addresses(addr.toString());
}
// Report warnings for localities with no matches
if (!noMatchLocalities.empty()) {
TraceEvent(SevWarn, "ExcludeLocalitiesNoMatch")
.detail("Count", noMatchLocalities.size())
.detail("Localities", boost::algorithm::join(noMatchLocalities, ", "));
}
co_return grpc::Status::OK;
} catch (const Error& e) {
co_return grpc::Status(grpc::StatusCode::INTERNAL,
fmt::format("Error getting worker information: {}", e.what()));
}
}
Future<grpc::Status> excludeStatus(Reference<IDatabase> db, const ExcludeStatusRequest* req, ExcludeStatusReply* rep) {
try {
std::vector<std::string> excludedAddresses = co_await fdbctl::utils::getExcludedServers(db);
std::vector<std::string> excludedLocalities = co_await fdbctl::utils::getExcludedLocalities(db);
std::vector<std::string> failedAddresses = co_await fdbctl::utils::getFailedServers(db);
std::vector<std::string> failedLocalities = co_await fdbctl::utils::getFailedLocalities(db);
for (const auto& e : excludedAddresses) {
rep->add_excluded_addresses(e);
}
for (const auto& e : excludedLocalities) {
rep->add_excluded_localities(e);
}
for (const auto& f : failedAddresses) {
rep->add_failed_addresses(f);
}
for (const auto& f : failedLocalities) {
rep->add_failed_localities(f);
}
Reference<ITransaction> tr = db->createTransaction();
std::set<NetworkAddress> inProgressExclusion = co_await utils::getInProgressExclusion(tr);
for (const auto& addr : inProgressExclusion) {
rep->add_in_progress_excludes(addr.toString());
}
co_return grpc::Status::OK;
} catch (const Error& e) {
co_return grpc::Status(grpc::StatusCode::INTERNAL,
fmt::format("Error getting worker information: {}", e.what()));
}
}
} // namespace fdbctl
#endif

View File

@@ -59,6 +59,7 @@ Future<grpc::Status> kill(Reference<IDatabase> db, const KillRequest* req, KillR
namespace utils {
Future<std::string> getSpecialKeysFailureErrorMessage(Reference<ITransaction> tr);
// Returns addresses of excluded/failed/in-progress processes.
Future<std::vector<std::string>> getExcludedServers(Reference<IDatabase> db);
Future<std::vector<std::string>> getFailedServers(Reference<IDatabase> db);
Future<std::vector<std::string>> getExcludedLocalities(Reference<IDatabase> db);

View File

@@ -25,7 +25,11 @@
#include <grpcpp/support/status.h>
#include "fdbctl/ControlCommands.h"
#include "fdbclient/ClientKnobs.h"
#include "fdbclient/IClientApi.h"
#include "fdbclient/Knobs.h"
#include "flow/ThreadHelper.actor.h"
#include "flow/genericactors.actor.h"
#define DEFINE_GRPC_HANDLER(rpcName, handlerName) \
grpc::Status rpcName(grpc::ServerContext* context, const rpcName##Request* req, rpcName##Reply* rep) override { \
@@ -33,6 +37,39 @@
}
namespace fdbctl {
template <class Handler, class Request, class Reply>
Future<grpc::Status> grpcHandlerWrapper(Reference<IDatabase> db,
Handler* h,
const Request* req,
Reply* rep,
grpc::ServerContext* context) {
try {
double timeout = CLIENT_KNOBS->GRPC_CTL_SERVICE_DEFAULT_TIMEOUT; // Default timeout when no deadline is set
auto deadline = context->deadline();
auto now = std::chrono::system_clock::now();
// TODO: Should use FDBs now(), but in this world we are breaking determinism anyway at this
// point.
if (deadline != std::chrono::system_clock::time_point::max()) {
auto grpc_timeout = std::chrono::duration_cast<std::chrono::seconds>(deadline - now).count();
timeout = static_cast<double>(grpc_timeout);
}
if (timeout <= 0) {
co_return grpc::Status(grpc::StatusCode::DEADLINE_EXCEEDED, "Request deadline already exceeded");
}
co_return co_await timeoutError((*h)(db, req, rep), timeout);
} catch (Error& e) {
if (e.code() == error_code_timed_out) {
co_return grpc::Status(grpc::StatusCode::DEADLINE_EXCEEDED, "Operation timed out");
}
co_return grpc::Status(grpc::StatusCode::INTERNAL, fmt::format("Unknown error '{}'", e.name()));
}
}
class ControlServiceImpl final : public fdbctl::ControlService::Service {
public:
ControlServiceImpl(Reference<IDatabase> db);
@@ -43,15 +80,17 @@ public:
DEFINE_GRPC_HANDLER(GetStatus, getStatus);
DEFINE_GRPC_HANDLER(GetWorkers, getWorkers);
DEFINE_GRPC_HANDLER(Include, include);
// DEFINE_GRPC_HANDLER(Exclude, exclude);
// DEFINE_GRPC_HANDLER(ExcludeStatus, excludeStatus);
DEFINE_GRPC_HANDLER(Exclude, exclude);
DEFINE_GRPC_HANDLER(ExcludeStatus, excludeStatus);
DEFINE_GRPC_HANDLER(Kill, kill);
private:
// Bridges flow with gRPC handlers. The RPC handlers are defined using `DEFINE_GRPC_HANDLER`
// which uses this method to invoke the actual handler written in Flow and runs it on the main thread.
template <class Handler, class Request, class Reply>
grpc::Status handleRequestOnMainThread(Handler* h, const Request* req, Reply* rep, grpc::ServerContext* context);
grpc::Status handleRequestOnMainThread(Handler* h, const Request* req, Reply* rep, grpc::ServerContext* context) {
return onMainThread([=]() { return grpcHandlerWrapper(db_, h, req, rep, context); }).getBlocking();
}
private:
Reference<IDatabase> db_;

View File

@@ -357,11 +357,14 @@ message ExcludeRequest {
// Response to an exclude operation.
message ExcludeReply {
// Number of workers excluded
optional int32 num_excluded = 1;
// List of excluded worker addresses (format: "ip:port")
repeated string excluded_addresses = 1;
// True if data movement is complete.
optional bool data_movement_complete = 2;
// List of excluded addresses that don't have a corresponding worker.
repeated string absent_addresses = 3;
}
// Request to get the status of exclusions.

View File

@@ -2161,7 +2161,7 @@ ACTOR Future<Void> registerWorkerGrpcServices(UID id, Reference<IClusterConnecti
auto db = Database::createDatabase(ccr, ApiVersion::LATEST_VERSION);
Reference<IDatabase> idb = wait(safeThreadFutureToFuture(ThreadSafeDatabase::createFromExistingDatabase(db)));
auto services = GrpcServer::ServiceList{};
auto services = GrpcServer::ServiceList{ std::make_shared<fdbctl::ControlServiceImpl>(idb) };
GrpcServer::instance()->registerRoleServices(UID(), services);
TraceEvent("WorkerGrpcServerStart").detail("Address", GrpcServer::instance()->getAddress());
return Never();