/* * MockGlobalState.actor.cpp * * This source file is part of the FoundationDB open source project * * Copyright 2013-2024 Apple Inc. and the FoundationDB project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "fdbserver/MockGlobalState.h" #include "fdbserver/workloads/workloads.actor.h" #include "fdbserver/DataDistribution.actor.h" #include "fdbclient/FDBTypes.h" #include "flow/actorcompiler.h" class MockGlobalStateImpl { public: ACTOR static Future, int>> waitStorageMetrics(MockGlobalState* mgs, KeyRange keys, StorageMetrics min, StorageMetrics max, StorageMetrics permittedError, int shardLimit, int expectedShardCount) { state TenantInfo tenantInfo; state Version version = 0; loop { auto locations = mgs->getKeyRangeLocations(tenantInfo, keys, shardLimit, Reverse::False, SpanContext(), Optional(), UseProvisionalProxies::False, version) .get(); TraceEvent(SevDebug, "MGSWaitStorageMetrics") .detail("Phase", "GetLocation") .detail("KeyRange", keys.toString()) .detail("LocationsCount", locations.size()) .detail("ExpectedShardCount", expectedShardCount); // NOTE(xwang): in native API, there's code handling the non-equal situation, but in mock world it's // possible for split shards stay in the same location CODE_PROBE(expectedShardCount >= 0 && locations.size() != expectedShardCount, "Some shard is in the same location.", probe::decoration::rare); try { Optional res = wait( ::waitStorageMetricsWithLocation(tenantInfo, version, keys, locations, min, max, permittedError)); TraceEvent(SevDebug, "MGSWaitStorageMetrics") .detail("Phase", "GetStorageMetrics") .detail("KeyRange", keys.toString()) .detail("Present", res.present()); if (res.present()) { return std::make_pair(res, -1); } } catch (Error& e) { TraceEvent(SevDebug, "MGSWaitStorageMetricsHandleError").error(e); if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed) { wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, TaskPriority::DataDistribution)); } else if (e.code() == error_code_future_version) { wait(delay(CLIENT_KNOBS->FUTURE_VERSION_RETRY_DELAY, TaskPriority::DataDistribution)); } else { bool ok = e.code() == error_code_tenant_not_found; TraceEvent(ok ? SevInfo : SevError, "MGSWaitStorageMetricsError").error(e); throw; } } // Avoid busy spin wait(delay(0.1, TaskPriority::DataDistribution)); } } // SOMEDAY: reuse the NativeAPI implementation ACTOR static Future>> splitStorageMetrics(MockGlobalState* mgs, KeyRange keys, StorageMetrics limit, StorageMetrics estimated, Optional minSplitBytes) { state TenantInfo tenantInfo; loop { state std::vector locations = mgs->getKeyRangeLocations(tenantInfo, keys, CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT, Reverse::False, SpanContext(), Optional(), UseProvisionalProxies::False, 0) .get(); // Same solution to NativeAPI::splitStorageMetrics, wait some merge finished if (locations.size() == CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT) { wait(delay(CLIENT_KNOBS->STORAGE_METRICS_TOO_MANY_SHARDS_DELAY, TaskPriority::DataDistribution)); } Optional>> results = wait(splitStorageMetricsWithLocations(locations, keys, limit, estimated, minSplitBytes)); if (results.present()) { return results.get(); } wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, TaskPriority::DataDistribution)); } } }; class MockStorageServerImpl { public: ACTOR static Future waitMetricsTenantAware(MockStorageServer* self, WaitMetricsRequest req) { if (req.tenantInfo.hasTenant()) { // TODO(xwang) add support for tenant test, search for tenant entry Optional entry; Optional tenantPrefix = entry.map(&TenantMapEntry::prefix); if (tenantPrefix.present()) { UNREACHABLE(); // req.keys = req.keys.withPrefix(tenantPrefix.get(), req.arena); } } if (!self->isReadable(req.keys)) { self->sendErrorWithPenalty(req.reply, wrong_shard_server(), self->getPenalty()); } else { wait(self->metrics.waitMetrics(req, delayJittered(SERVER_KNOBS->STORAGE_METRIC_TIMEOUT))); } return Void(); } // Randomly generate keys and kv size between the fetch range, updating the byte sample. // Once the fetchKeys return, the shard status will become FETCHED. ACTOR static Future waitFetchKeysFinish(MockStorageServer* self, MockStorageServer::FetchKeysParams params) { state TraceInterval interval("MockFetchKeys"); // between each chunk delay for random time, and finally set the fetchComplete signal. ASSERT(params.totalRangeBytes > 0); state int chunkCount = std::ceil(params.totalRangeBytes * 1.0 / SERVER_KNOBS->FETCH_BLOCK_BYTES); state int64_t currentTotal = 0; state Key lastKey = params.keys.begin; TraceEvent(SevDebug, interval.begin(), self->id) .detail("Range", params.keys) .detail("ChunkCount", chunkCount) .detail("TotalBytes", params.totalRangeBytes); state int i = 0; for (; i < chunkCount && currentTotal < params.totalRangeBytes; ++i) { wait(delayJittered(0.1, TaskPriority::FetchKeys)); int remainedBytes = (chunkCount == 1 ? params.totalRangeBytes : SERVER_KNOBS->FETCH_BLOCK_BYTES); while (remainedBytes >= lastKey.size()) { Key nextKey; // try 10 times for (int j = 0; j < 10; j++) { nextKey = randomKeyBetween(KeyRangeRef(lastKey, params.keys.end)); if (nextKey < params.keys.end) break; } ++self->counters.kvFetched; // NOTE: in this case, we accumulate the bytes on lastKey on purpose (shall we?) if (nextKey == params.keys.end) { auto bytes = params.totalRangeBytes - currentTotal; self->counters.bytesFetched += bytes; self->byteSampleApplySet(lastKey, bytes); self->usedDiskSpace += bytes; currentTotal = params.totalRangeBytes; TraceEvent(SevWarn, "MockFetchKeysInaccurateSample", self->id) .detail("PairId", interval.pairID) .detail("LastKey", lastKey) .detail("Size", bytes); break; // break the most outside loop } int maxSize = std::min(remainedBytes, 130000) + 1; int randomSize = deterministicRandom()->randomInt(lastKey.size(), maxSize); self->counters.bytesFetched += randomSize; self->usedDiskSpace += randomSize; currentTotal += randomSize; self->byteSampleApplySet(lastKey, randomSize); remainedBytes -= randomSize; lastKey = nextKey; DisabledTraceEvent(SevDebug, "MockFetchKeys_SingleKey", self->id) .detail("LastKey", lastKey) .detail("RemainedBytes", remainedBytes); } } self->setShardStatus(params.keys, MockShardStatus::FETCHED); TraceEvent(SevDebug, interval.end(), self->id).log(); return Void(); } }; bool MockStorageServer::allShardStatusEqual(const KeyRangeRef& range, MockShardStatus status) const { auto ranges = serverKeys.intersectingRanges(range); ASSERT(!ranges.empty()); // at least the range is allKeys for (auto it = ranges.begin(); it != ranges.end(); ++it) { if (it->cvalue().status != status) return false; } return true; } bool MockStorageServer::allShardStatusIn(const KeyRangeRef& range, const std::set& status) const { auto ranges = serverKeys.intersectingRanges(range); ASSERT(!ranges.empty()); // at least the range is allKeys for (auto it = ranges.begin(); it != ranges.end(); ++it) { // fmt::print("allShardStatusIn: {}: {} \n", id.toString(), it->range().toString()); if (!status.contains(it->cvalue().status)) return false; } return true; } void MockStorageServer::setShardStatus(const KeyRangeRef& range, MockShardStatus status) { auto ranges = serverKeys.intersectingRanges(range); // ranges at least has allKeys ASSERT(!ranges.empty()); DisabledTraceEvent(SevDebug, "SetShardStatus", ssi.id()).detail("Range", range); // change the shard boundary if the status will change if (ranges.begin().begin() < range.begin && ranges.begin().end() > range.end && ranges.begin()->cvalue().status != status) { CODE_PROBE(true, "Implicitly split single shard to 3 pieces"); threeWayShardSplitting(ranges.begin().range(), range, ranges.begin().cvalue().shardSize); } else { if (ranges.begin().begin() < range.begin && ranges.begin()->cvalue().status != status) { CODE_PROBE(true, "Implicitly split begin range to 2 pieces"); twoWayShardSplitting(ranges.begin().range(), range.begin, ranges.begin().cvalue().shardSize); } if (ranges.end().begin() > range.end) { auto lastRange = ranges.end(); --lastRange; if (lastRange->cvalue().status != status) { CODE_PROBE(true, "Implicitly split end range to 2 pieces"); twoWayShardSplitting(lastRange.range(), range.end, lastRange.cvalue().shardSize); } } } ranges = serverKeys.containedRanges(range); for (auto it = ranges.begin(); it != ranges.end(); ++it) { auto oldStatus = it->cvalue().status; if (isStatusTransitionValid(oldStatus, status)) { it->value().status = status; } else if ((oldStatus == MockShardStatus::COMPLETED || oldStatus == MockShardStatus::FETCHED) && (status == MockShardStatus::INFLIGHT || status == MockShardStatus::FETCHED)) { CODE_PROBE(true, "Shard already on server"); } else { TraceEvent(SevError, "MockShardStatusTransitionError", id) .detail("From", oldStatus) .detail("To", status) .detail("KeyBegin", range.begin) .detail("KeyEnd", range.begin); ASSERT(false); } } } void MockStorageServer::coalesceCompletedRange(const KeyRangeRef& range) { auto ranges = serverKeys.intersectingRanges(range); // ranges at least has allKeys ASSERT(!ranges.empty()); auto allRanges = serverKeys.ranges(); auto left = ranges.begin(), right = ranges.end(); while (true) { if (left->cvalue().status != MockShardStatus::COMPLETED) { ASSERT(left != ranges.begin()); ++left; break; } if (left == allRanges.begin()) break; --left; } while (right != allRanges.end() && right->cvalue().status == MockShardStatus::COMPLETED) { ++right; } int newSize = 0; for (auto it = left; it != right; ++it) { ASSERT(it->cvalue().status == MockShardStatus::COMPLETED); newSize += it->cvalue().shardSize; it->value().shardSize = 0; } auto beginKey = left.begin(), endKey = right.begin(); serverKeys.coalesce(KeyRangeRef(beginKey, endKey)); serverKeys[beginKey].shardSize = newSize; } // split the out range [a, d) based on the inner range's boundary [b, c). The result would be [a,b), [b,c), [c,d). The // size of the new shards are randomly split from old size of [a, d) void MockStorageServer::threeWayShardSplitting(const KeyRangeRef& outerRange, const KeyRangeRef& innerRange, uint64_t outerRangeSize) { ASSERT(outerRange.contains(innerRange)); if (outerRange == innerRange) { return; } Key left = outerRange.begin; // assume the split are even int leftSize = outerRangeSize / 3; int rightSize = leftSize; int midSize = outerRangeSize - leftSize - rightSize; serverKeys.insert(innerRange, { serverKeys[left].status, (uint64_t)midSize }); serverKeys[left].shardSize = leftSize; serverKeys[innerRange.end].shardSize = rightSize; } // split the range [a,c) with split point b. The result would be [a, b), [b, c). The // size of the new shards are randomly split from old size of [a, c) void MockStorageServer::twoWayShardSplitting(const KeyRangeRef& range, const KeyRef& splitPoint, uint64_t rangeSize) { if (splitPoint == range.begin || !range.contains(splitPoint)) { return; } Key left = range.begin; DisabledTraceEvent(SevDebug, "TwoWayShardSplitting") .detail("Range", range) .detail("SplitPoint", splitPoint) .detail("RangeSize", rangeSize); // Assume equally split the old range int leftSize = rangeSize / 2, rightSize = rangeSize - leftSize; serverKeys.rawInsert(splitPoint, { serverKeys[left].status, (uint64_t)rightSize }); serverKeys[left].shardSize = leftSize; } void MockStorageServer::removeShard(const KeyRangeRef& range) { auto rangeSize = sumRangeSize(range); usedDiskSpace -= rangeSize; serverKeys.insert(range, MockStorageServer::ShardInfo{ MockShardStatus::UNSET, 0 }); serverKeys.coalesce(range); byteSampleApplyClear(range); metrics.notifyNotReadable(range); } uint64_t MockStorageServer::sumRangeSize(const KeyRangeRef& range) const { auto ranges = serverKeys.intersectingRanges(range); uint64_t totalSize = 0; for (auto it = ranges.begin(); it != ranges.end(); ++it) { totalSize += it->cvalue().shardSize; } return totalSize; } void MockStorageServer::addActor(Future future) { actors.add(future); } void MockStorageServer::getSplitPoints(const SplitRangeRequest& req) {} Future MockStorageServer::waitMetricsTenantAware(const WaitMetricsRequest& req) { return MockStorageServerImpl::waitMetricsTenantAware(this, req); } void MockStorageServer::getStorageMetrics(const GetStorageMetricsRequest& req) { StorageBytes storageBytes( totalDiskSpace - usedDiskSpace, totalDiskSpace, usedDiskSpace, totalDiskSpace - usedDiskSpace); metrics.getStorageMetrics( req, storageBytes, counters.bytesInput.getRate(), 0, now(), 0, counters.bytesInput.getValue()); // FIXME: MockStorageServer does not support bytesDurable yet } void MockStorageServer::getSplitMetrics(const SplitMetricsRequest& req) { this->metrics.splitMetrics(req); } void MockStorageServer::getHotRangeMetrics(const ReadHotSubRangeRequest& req) { this->metrics.getReadHotRanges(req); } int64_t MockStorageServer::getHotShardsMetrics(const KeyRange& range) { return 0; } Future MockStorageServer::run() { ssi.initEndpoints(); ssi.startAcceptingRequests(); IFailureMonitor::failureMonitor().setStatus(ssi.address(), FailureStatus(false)); TraceEvent("MockStorageServerStart", ssi.id()).detail("Address", ssi.address()); auto& recruited = ssi; DUMPTOKEN(recruited.getStorageMetrics); addActor(serveStorageMetricsRequests(this, ssi)); addActor(counters.cc.traceCounters("MockStorageMetrics", ssi.id(), SERVER_KNOBS->STORAGE_LOGGING_DELAY, std::string(), [self = this](TraceEvent& te) { te.detail("CpuUsage", self->calculateCpuUsage()); te.detail("DiskUsedBytes", self->usedDiskSpace); te.detail("BytesStored", self->metrics.byteSample.getEstimate(allKeys)); })); return actors.getResult(); } void MockStorageServer::set(KeyRef const& key, int64_t bytes, int64_t oldBytes) { ++counters.mutations; ++counters.setMutations; counters.mutationBytes += bytes; counters.bytesInput += mvccStorageBytes(bytes); notifyWriteMetrics(key, bytes); byteSampleApplySet(key, bytes); auto delta = bytes - oldBytes; usedDiskSpace += delta; serverKeys[key].shardSize += delta; } void MockStorageServer::clear(KeyRef const& key, int64_t bytes) { ++counters.mutations; ++counters.clearRangeMutations; counters.mutationBytes += key.size(); counters.bytesInput += mvccStorageBytes(key.size()); notifyWriteMetrics(key, bytes); KeyRange sr = singleKeyRange(key); byteSampleApplyClear(sr); usedDiskSpace -= bytes; serverKeys[key].shardSize -= bytes; } int64_t MockStorageServer::clearRange(KeyRangeRef const& range, int64_t beginShardBytes, int64_t endShardBytes) { ++counters.mutations; ++counters.clearRangeMutations; counters.mutationBytes += range.expectedSize(); counters.bytesInput += mvccStorageBytes(range.expectedSize()); notifyWriteMetrics(range.begin, range.begin.size() + range.end.size()); byteSampleApplyClear(range); auto totalByteSize = estimateRangeTotalBytes(range, beginShardBytes, endShardBytes); usedDiskSpace -= totalByteSize; clearRangeTotalBytes(range, beginShardBytes, endShardBytes); return totalByteSize; } void MockStorageServer::get(KeyRef const& key, int64_t bytes) { ++counters.finishedQueries; counters.bytesQueried += bytes; // If the read yields no value, randomly sample the empty read. int64_t bytesReadPerKSecond = std::max(bytes, SERVER_KNOBS->EMPTY_READ_PENALTY); metrics.notifyBytesReadPerKSecond(key, bytesReadPerKSecond); } int64_t MockStorageServer::getRange(KeyRangeRef const& range, int64_t beginShardBytes, int64_t endShardBytes) { ++counters.finishedQueries; int64_t totalByteSize = estimateRangeTotalBytes(range, beginShardBytes, endShardBytes); counters.bytesQueried += totalByteSize; // For performance concerns, the cost of a range read is billed to the start key and end key of the // range. if (totalByteSize > 0) { int64_t bytesReadPerKSecond = std::max(totalByteSize, SERVER_KNOBS->EMPTY_READ_PENALTY) / 2; metrics.notifyBytesReadPerKSecond(range.begin, bytesReadPerKSecond); metrics.notifyBytesReadPerKSecond(range.end, bytesReadPerKSecond); } return totalByteSize; } int64_t MockStorageServer::estimateRangeTotalBytes(KeyRangeRef const& range, int64_t beginShardBytes, int64_t endShardBytes) { int64_t totalByteSize = 0; auto ranges = serverKeys.intersectingRanges(range); // use the beginShardBytes as partial size if (ranges.begin().begin() < range.begin) { ranges.pop_front(); totalByteSize += beginShardBytes; } // use the endShardBytes as partial size if (ranges.end().begin() < range.end) { totalByteSize += endShardBytes; } for (auto it = ranges.begin(); it != ranges.end(); ++it) { totalByteSize += it->cvalue().shardSize; } return totalByteSize; } void MockStorageServer::clearRangeTotalBytes(KeyRangeRef const& range, int64_t beginShardBytes, int64_t endShardBytes) { auto ranges = serverKeys.intersectingRanges(range); // use the beginShardBytes as partial size if (ranges.begin().begin() < range.begin) { auto delta = std::min(ranges.begin().value().shardSize, (uint64_t)beginShardBytes); ranges.begin().value().shardSize -= delta; ranges.pop_front(); } // use the endShardBytes as partial size if (ranges.end().begin() < range.end) { auto delta = std::min(ranges.end().value().shardSize, (uint64_t)endShardBytes); ranges.end().value().shardSize -= delta; } for (auto it = ranges.begin(); it != ranges.end(); ++it) { it->value().shardSize = 0; } } void MockStorageServer::notifyWriteMetrics(KeyRef const& key, int64_t size) { // update write bandwidth and iops as mock the cost of writing a mutation StorageMetrics s; // FIXME: remove the / 2 and double the related knobs. s.bytesWrittenPerKSecond = mvccStorageBytes(size) / 2; s.iosPerKSecond = 1; metrics.notify(key, s); } void MockStorageServer::signalFetchKeys(const KeyRangeRef& range, int64_t rangeTotalBytes) { if (!allShardStatusEqual(range, MockShardStatus::COMPLETED)) { actors.add(MockStorageServerImpl::waitFetchKeysFinish(this, { range, rangeTotalBytes })); } } HealthMetrics::StorageStats MockStorageServer::getStorageStats() const { HealthMetrics::StorageStats res; res.diskUsage = usedDiskSpace * 100.0 / totalDiskSpace; res.cpuUsage = calculateCpuUsage(); return res; } void MockStorageServer::byteSampleApplySet(KeyRef const& key, int64_t kvSize) { // Update byteSample in memory and notify waiting metrics ByteSampleInfo sampleInfo = isKeyValueInSample(key, kvSize); auto& byteSample = metrics.byteSample.sample; int64_t delta = 0; auto old = byteSample.find(key); if (old != byteSample.end()) delta = -byteSample.getMetric(old); if (sampleInfo.inSample) { delta += sampleInfo.sampledSize; byteSample.insert(key, sampleInfo.sampledSize); } else if (old != byteSample.end()) { byteSample.erase(old); } if (delta) metrics.notifyBytes(key, delta); } void MockStorageServer::byteSampleApplyClear(KeyRangeRef const& range) { // Update byteSample and notify waiting metrics auto& byteSample = metrics.byteSample.sample; bool any = false; if (range.begin < allKeys.end) { // NotifyBytes should not be called for keys past allKeys.end KeyRangeRef searchRange = KeyRangeRef(range.begin, std::min(range.end, allKeys.end)); auto r = metrics.waitMetricsMap.intersectingRanges(searchRange); for (auto shard = r.begin(); shard != r.end(); ++shard) { KeyRangeRef intersectingRange = shard.range() & range; int64_t bytes = byteSample.sumRange(intersectingRange.begin, intersectingRange.end); metrics.notifyBytes(shard, -bytes); any = any || bytes > 0; } } if (range.end > allKeys.end && byteSample.sumRange(std::max(allKeys.end, range.begin), range.end) > 0) any = true; if (any) { byteSample.eraseAsync(range.begin, range.end); } } double MockStorageServer::calculateCpuUsage() const { double res = counters.mutations.getRate() * write_op_cpu_multiplier + counters.finishedQueries.getRate() * read_op_cpu_multiplier + counters.mutationBytes.getRate() * write_byte_cpu_multiplier + counters.bytesQueried.getRate() * read_byte_cpu_multiplier; return std::min(100.0, res); } std::shared_ptr& MockGlobalState::g_mockState() { static std::shared_ptr res(new MockGlobalState); return res; } void MockGlobalState::initializeClusterLayout(const BasicSimulationConfig& conf) { fmt::print("MGS Cluster Layout: {} dc, {} machines, {} processes per machine.\n", conf.datacenters, conf.machine_count, conf.processes_per_machine); int mod = conf.machine_count % conf.datacenters; for (int i = 0; i < conf.datacenters; ++i) { Standalone dcId(StringRef(fmt::format("data_hall_{}", i))); clusterLayout.emplace_back(new mock::TopologyObject(mock::TopologyObject::DATA_HALL, dcId)); auto& dc = clusterLayout.back(); int machineCount = conf.machine_count / conf.datacenters + int(i < mod); for (int j = 0; j < machineCount; ++j) { Standalone mcId(StringRef(fmt::format("machine_{}_{}", i, j))); dc->children.emplace_back(new mock::TopologyObject(mock::TopologyObject::MACHINE, mcId, dc)); auto& machine = dc->children.back(); for (int k = 0; k < conf.processes_per_machine; ++k) { Standalone pid(StringRef(fmt::format("process_{}_{}_{}", i, j, k))); LocalityData localityData(pid, mcId, mcId, dcId); processes.emplace_back(new mock::Process(localityData, pid, machine)); machine->children.emplace_back(processes.back()); if (seedProcesses.size() < conf.db.storageTeamSize && j == 0 && k == 0) { seedProcesses.emplace_back(processes.back()); fmt::print("(seed) "); } fmt::print("Mock Process: {}\n", processes.back()->locality.toString()); } } } } void MockGlobalState::initializeAsEmptyDatabaseMGS(const DatabaseConfiguration& conf, uint64_t defaultDiskSpace) { ASSERT(conf.storageTeamSize > 0); ASSERT(!seedProcesses.empty()); configuration = conf; std::vector serverIds; fmt::print("Initial Team Size: {}, initial server Ids: ", conf.storageTeamSize); for (int i = 1; i <= conf.storageTeamSize; ++i) { UID id = indexToUID(i); serverIds.push_back(id); // select seed Storage Server StorageServerInterface ssi(id); auto& process = seedProcesses[(i - 1) % seedProcesses.size()]; ssi.locality = process->locality; process->ssInterfaces.push_back(ssi); fmt::print("{}, ", id.toString()); allServers[id] = makeReference(ssi, defaultDiskSpace); allServers[id]->serverKeys.insert(allKeys, { MockShardStatus::COMPLETED, 0 }); } fmt::print("\n"); shardMapping->assignRangeToTeams(allKeys, { Team(serverIds, true) }); } void MockGlobalState::addStorageServer(StorageServerInterface server, uint64_t diskSpace) { allServers[server.id()] = makeReference(server, diskSpace); } void MockGlobalState::addStoragePerProcess(uint64_t defaultDiskSpace) { for (auto p : processes) { if (p->ssInterfaces.empty()) { p->ssInterfaces.emplace_back(deterministicRandom()->randomUniqueID()); p->ssInterfaces.back().locality = p->locality; addStorageServer(p->ssInterfaces.back(), defaultDiskSpace); } } } bool MockGlobalState::serverIsSourceForShard(const UID& serverId, KeyRangeRef shard, bool inFlightShard) { if (!allServers.contains(serverId)) return false; // check serverKeys auto& mss = allServers.at(serverId); if (!mss->allShardStatusEqual(shard, MockShardStatus::COMPLETED)) { return false; } // check keyServers auto teams = shardMapping->getTeamsForFirstShard(shard); if (inFlightShard) { return std::any_of(teams.second.begin(), teams.second.end(), [&serverId](const Team& team) { return team.hasServer(serverId); }); } return std::any_of( teams.first.begin(), teams.first.end(), [&serverId](const Team& team) { return team.hasServer(serverId); }); } bool MockGlobalState::serverIsDestForShard(const UID& serverId, KeyRangeRef shard) { TraceEvent(SevDebug, "ServerIsDestForShard") .detail("ServerId", serverId) .detail("Keys", shard) .detail("Contains", allServers.contains(serverId)); if (!allServers.contains(serverId)) return false; // check serverKeys auto& mss = allServers.at(serverId); if (!mss->allShardStatusIn(shard, { MockShardStatus::INFLIGHT, MockShardStatus::COMPLETED, MockShardStatus::FETCHED })) { return false; } // check keyServers auto teams = shardMapping->getTeamsForFirstShard(shard); return !teams.second.empty() && std::any_of(teams.first.begin(), teams.first.end(), [&serverId](const Team& team) { return team.hasServer(serverId); }); } bool MockGlobalState::allShardsRemovedFromServer(const UID& serverId) { return allServers.contains(serverId) && shardMapping->getNumberOfShards(serverId) == 0; } Future, int>> MockGlobalState::waitStorageMetrics( const KeyRange& keys, const StorageMetrics& min, const StorageMetrics& max, const StorageMetrics& permittedError, int shardLimit, int expectedShardCount) { return MockGlobalStateImpl::waitStorageMetrics( this, keys, min, max, permittedError, shardLimit, expectedShardCount); } Reference buildLocationInfo(const std::vector& interfaces) { // construct the location info with the servers std::vector>> serverRefs; serverRefs.reserve(interfaces.size()); for (const auto& interf : interfaces) { serverRefs.push_back(makeReference>(interf)); } return makeReference(serverRefs); } Future MockGlobalState::getKeyLocation(TenantInfo tenant, Key key, SpanContext spanContext, Optional debugID, UseProvisionalProxies useProvisionalProxies, Reverse isBackward, Version version) { if (isBackward) { // DD never ask for backward range. UNREACHABLE(); } ASSERT(key < allKeys.end); GetKeyServerLocationsReply rep; KeyRange single = singleKeyRange(key); auto teamPair = shardMapping->getTeamsForFirstShard(single); auto& srcTeam = teamPair.second.empty() ? teamPair.first : teamPair.second; ASSERT_EQ(srcTeam.size(), 1); rep.results.emplace_back(single, extractStorageServerInterfaces(srcTeam.front().servers)); return KeyRangeLocationInfo(KeyRange(toPrefixRelativeRange(rep.results[0].first, tenant.prefix), rep.arena), buildLocationInfo(rep.results[0].second)); } Future> MockGlobalState::getKeyRangeLocations( TenantInfo tenant, KeyRange keys, int limit, Reverse reverse, SpanContext spanContext, Optional debugID, UseProvisionalProxies useProvisionalProxies, Version version) { if (reverse) { // DD never ask for backward range. ASSERT(false); } ASSERT(keys.begin < keys.end); GetKeyServerLocationsReply rep; auto ranges = shardMapping->intersectingRanges(keys); auto it = ranges.begin(); for (int count = 0; it != ranges.end() && count < limit; ++it, ++count) { auto teamPair = shardMapping->getTeamsFor(it->begin()); auto& srcTeam = teamPair.second.empty() ? teamPair.first : teamPair.second; ASSERT_EQ(srcTeam.size(), 1); rep.results.emplace_back(it->range(), extractStorageServerInterfaces(srcTeam.front().servers)); } CODE_PROBE(it != ranges.end(), "getKeyRangeLocations is limited"); std::vector results; for (int shard = 0; shard < rep.results.size(); shard++) { results.emplace_back((toPrefixRelativeRange(rep.results[shard].first, tenant.prefix) & keys), buildLocationInfo(rep.results[shard].second)); } return results; } std::vector MockGlobalState::extractStorageServerInterfaces(const std::vector& ids) const { std::vector interfaces; for (auto& id : ids) { interfaces.emplace_back(allServers.at(id)->ssi); } return interfaces; } Future>> MockGlobalState::splitStorageMetrics(const KeyRange& keys, const StorageMetrics& limit, const StorageMetrics& estimated, const Optional& minSplitBytes) { return MockGlobalStateImpl::splitStorageMetrics(this, keys, limit, estimated, minSplitBytes); } std::vector> MockGlobalState::runAllMockServers() { std::vector> futures; futures.reserve(allServers.size()); for (auto& [id, _] : allServers) { futures.emplace_back(runMockServer(id)); } return futures; } Future MockGlobalState::runMockServer(const UID& id) { return allServers.at(id)->run(); } int MockGlobalState::getRangeSize(KeyRangeRef const& range) { // FIXME: return realistic number return SERVER_KNOBS->MIN_SHARD_BYTES; } int64_t MockGlobalState::get(KeyRef const& key) { auto ids = shardMapping->getSourceServerIdsFor(key); int64_t randomBytes = 0; if (deterministicRandom()->random01() > emptyProb) { randomBytes = deterministicRandom()->randomInt64(minByteSize, maxByteSize + 1); } // randomly choose 1 server auto id = deterministicRandom()->randomChoice(ids); allServers.at(id)->get(key, randomBytes); return randomBytes; } int64_t MockGlobalState::getRange(KeyRangeRef const& range) { auto ranges = shardMapping->intersectingRanges(range); int64_t totalSize = 0; KeyRef begin, end; for (auto it = ranges.begin(); it != ranges.end(); ++it) { auto ids = shardMapping->getSourceServerIdsFor(it->begin()); if (range.begin > it->begin()) { begin = range.begin; } if (range.end < it->end()) { end = range.end; } // randomly choose 1 server auto id = deterministicRandom()->randomChoice(ids); int64_t beginSize = deterministicRandom()->randomInt64(0, SERVER_KNOBS->MIN_SHARD_BYTES), endSize = deterministicRandom()->randomInt64(0, SERVER_KNOBS->MIN_SHARD_BYTES); totalSize += allServers.at(id)->getRange(KeyRangeRef(begin, end), beginSize, endSize); } return totalSize; } int64_t MockGlobalState::set(KeyRef const& key, int valueSize, bool insert) { auto ids = shardMapping->getSourceServerIdsFor(key); int64_t oldKvBytes = 0; insert |= (deterministicRandom()->random01() < emptyProb); if (!insert) { oldKvBytes = key.size() + deterministicRandom()->randomInt64(minByteSize, maxByteSize + 1); } for (auto& id : ids) { allServers.at(id)->set(key, valueSize + key.size(), oldKvBytes); } return oldKvBytes; } int64_t MockGlobalState::clear(KeyRef const& key) { auto ids = shardMapping->getSourceServerIdsFor(key); int64_t randomBytes = 0; if (deterministicRandom()->random01() > emptyProb) { randomBytes = deterministicRandom()->randomInt64(minByteSize, maxByteSize + 1) + key.size(); } for (auto& id : ids) { allServers.at(id)->clear(key, randomBytes); } return randomBytes; } int64_t MockGlobalState::clearRange(KeyRangeRef const& range) { auto ranges = shardMapping->intersectingRanges(range); int64_t totalSize = 0; KeyRef begin, end; for (auto it = ranges.begin(); it != ranges.end(); ++it) { auto ids = shardMapping->getSourceServerIdsFor(it->begin()); if (range.begin > it->begin()) { begin = range.begin; } if (range.end < it->end()) { end = range.end; } int64_t beginSize = deterministicRandom()->randomInt64(0, SERVER_KNOBS->MIN_SHARD_BYTES), endSize = deterministicRandom()->randomInt64(0, SERVER_KNOBS->MIN_SHARD_BYTES); int64_t lastSize = -1; for (auto& id : ids) { int64_t size = allServers.at(id)->clearRange(KeyRangeRef(begin, end), beginSize, endSize); ASSERT(lastSize == size || lastSize == -1); // every server should return the same result } totalSize += lastSize; } return totalSize; } TEST_CASE("/MockGlobalState/initializeAsEmptyDatabaseMGS/SimpleThree") { BasicTestConfig testConfig; testConfig.simpleConfig = true; testConfig.minimumReplication = 3; testConfig.logAntiQuorum = 0; BasicSimulationConfig dbConfig = generateBasicSimulationConfig(testConfig); TraceEvent("UnitTestDBConfig").detail("Config", dbConfig.db.toString()); std::shared_ptr mgs = std::make_shared(); mgs->initializeClusterLayout(dbConfig); mgs->initializeAsEmptyDatabaseMGS(dbConfig.db); for (int i = 1; i <= dbConfig.db.storageTeamSize; ++i) { auto id = MockGlobalState::indexToUID(i); std::cout << "Check server " << i << "\n"; ASSERT(mgs->serverIsSourceForShard(id, allKeys)); ASSERT(mgs->allServers.at(id)->sumRangeSize(allKeys) == 0); } return Void(); } struct MockGlobalStateTester { // expectation [r0.begin, r0.end) => [r0.begin, x1), [x1, x2), [x2, r0.end) void testThreeWaySplitFirstRange(MockStorageServer& mss) { auto it = mss.serverKeys.ranges().begin(); uint64_t oldSize = deterministicRandom()->randomInt(SERVER_KNOBS->MIN_SHARD_BYTES, std::numeric_limits::max()); MockShardStatus oldStatus = it.cvalue().status; it->value().shardSize = oldSize; KeyRangeRef outerRange = it->range(); Key x1 = keyAfter(it->range().begin); Key x2 = keyAfter(x1); std::cout << "it->range.begin: " << it->range().begin.toHexString() << " size: " << oldSize << "\n"; mss.threeWayShardSplitting(outerRange, KeyRangeRef(x1, x2), oldSize); auto ranges = mss.serverKeys.containedRanges(outerRange); ASSERT(ranges.begin().range() == KeyRangeRef(outerRange.begin, x1)); ASSERT(ranges.begin().cvalue().status == oldStatus); ranges.pop_front(); ASSERT(ranges.begin().range() == KeyRangeRef(x1, x2)); ASSERT(ranges.begin().cvalue().status == oldStatus); ranges.pop_front(); ASSERT(ranges.begin().range() == KeyRangeRef(x2, outerRange.end)); ASSERT(ranges.begin().cvalue().status == oldStatus); ranges.pop_front(); ASSERT(ranges.empty()); } // expectation [r0.begin, r0.end) => [r0.begin, x1), [x1, r0.end) void testTwoWaySplitFirstRange(MockStorageServer& mss) { auto it = mss.serverKeys.nthRange(0); MockShardStatus oldStatus = it.cvalue().status; uint64_t oldSize = deterministicRandom()->randomInt(SERVER_KNOBS->MIN_SHARD_BYTES, std::numeric_limits::max()); it->value().shardSize = oldSize; KeyRangeRef outerRange = it->range(); Key x1 = keyAfter(it->range().begin); std::cout << "it->range.begin: " << it->range().begin.toHexString() << " size: " << oldSize << "\n"; mss.twoWayShardSplitting(it->range(), x1, oldSize); auto ranges = mss.serverKeys.containedRanges(outerRange); ASSERT(ranges.begin().range() == KeyRangeRef(outerRange.begin, x1)); ASSERT(ranges.begin().cvalue().status == oldStatus); ranges.pop_front(); ASSERT(ranges.begin().range() == KeyRangeRef(x1, outerRange.end)); ASSERT(ranges.begin().cvalue().status == oldStatus); ranges.pop_front(); ASSERT(ranges.empty()); } KeyRangeLocationInfo getKeyLocationInfo(KeyRef key, std::shared_ptr mgs) { return mgs ->getKeyLocation( TenantInfo(), key, SpanContext(), Optional(), UseProvisionalProxies::False, Reverse::False, 0) .get(); } std::vector getKeyRangeLocations(KeyRangeRef keys, int limit, std::shared_ptr mgs) { return mgs ->getKeyRangeLocations(TenantInfo(), keys, limit, Reverse::False, SpanContext(), Optional(), UseProvisionalProxies::False, 0) .get(); } }; TEST_CASE("/MockGlobalState/MockStorageServer/SplittingFunctions") { BasicTestConfig testConfig; testConfig.simpleConfig = true; testConfig.minimumReplication = 1; testConfig.logAntiQuorum = 0; BasicSimulationConfig dbConfig = generateBasicSimulationConfig(testConfig); TraceEvent("UnitTestDBConfig").detail("Config", dbConfig.db.toString()); std::shared_ptr mgs = std::make_shared(); mgs->initializeClusterLayout(dbConfig); mgs->initializeAsEmptyDatabaseMGS(dbConfig.db); MockGlobalStateTester tester; auto& mss = mgs->allServers.at(MockGlobalState::indexToUID(1)); std::cout << "Test 3-way splitting...\n"; tester.testThreeWaySplitFirstRange(*mss); std::cout << "Test 2-way splitting...\n"; mss->serverKeys.insert(allKeys, { MockShardStatus::COMPLETED, 0 }); // reset to empty tester.testTwoWaySplitFirstRange(*mss); return Void(); } TEST_CASE("/MockGlobalState/MockStorageServer/SetShardStatus") { BasicTestConfig testConfig; testConfig.simpleConfig = true; testConfig.minimumReplication = 1; testConfig.logAntiQuorum = 0; BasicSimulationConfig dbConfig = generateBasicSimulationConfig(testConfig); TraceEvent("UnitTestDBConfig").detail("Config", dbConfig.db.toString()); state std::shared_ptr mgs = std::make_shared(); mgs->initializeClusterLayout(dbConfig); mgs->initializeAsEmptyDatabaseMGS(dbConfig.db); auto& mss = mgs->allServers.at(MockGlobalState::indexToUID(1)); mss->serverKeys.insert(allKeys, { MockShardStatus::UNSET, 1400 }); // manually reset status // split to 3 shards [allKeys.begin, a, b, allKeys.end] KeyRange testRange(KeyRangeRef("a"_sr, "b"_sr)); mss->setShardStatus(testRange, MockShardStatus::INFLIGHT); ASSERT(mss->allShardStatusEqual(testRange, MockShardStatus::INFLIGHT)); ASSERT_EQ(mss->sumRangeSize(allKeys), 1400); ASSERT_EQ(mss->serverKeys.size(), 3); // [allKeys.begin, a, b, bc, allKeys.end] testRange = KeyRangeRef("ac"_sr, "bc"_sr); mss->setShardStatus(testRange, MockShardStatus::INFLIGHT); ASSERT(mss->allShardStatusEqual(testRange, MockShardStatus::INFLIGHT)); ASSERT_EQ(mss->sumRangeSize(allKeys), 1400); ASSERT_EQ(mss->serverKeys.size(), 4); testRange = KeyRangeRef("ab"_sr, "bb"_sr); mss->setShardStatus(testRange, MockShardStatus::INFLIGHT); ASSERT_EQ(mss->serverKeys.size(), 4); testRange = KeyRangeRef("b"_sr, "bc"_sr); // [allKeys.begin, a, b, bc, allKeys.end] mss->setShardStatus(testRange, MockShardStatus::FETCHED); ASSERT(mss->allShardStatusEqual(testRange, MockShardStatus::FETCHED)); mss->setShardStatus(testRange, MockShardStatus::COMPLETED); ASSERT(mss->allShardStatusEqual(testRange, MockShardStatus::COMPLETED)); mss->setShardStatus(testRange, MockShardStatus::FETCHED); ASSERT(mss->allShardStatusEqual(testRange, MockShardStatus::COMPLETED)); ASSERT_EQ(mss->sumRangeSize(allKeys), 1400); ASSERT_EQ(mss->serverKeys.size(), 4); testRange = KeyRangeRef("ac"_sr, allKeys.end); // [allKeys.begin, a, ac, b, bc, allKeys.end] mss->setShardStatus(testRange, MockShardStatus::FETCHED); ASSERT_EQ(mss->sumRangeSize(allKeys), 1400); ASSERT_EQ(mss->serverKeys.size(), 5); ASSERT(mss->allShardStatusEqual(KeyRangeRef("ac"_sr, "b"_sr), MockShardStatus::FETCHED)); ASSERT(mss->allShardStatusEqual(KeyRangeRef("b"_sr, "bc"_sr), MockShardStatus::COMPLETED)); ASSERT(mss->allShardStatusEqual(KeyRangeRef("bc"_sr, allKeys.end), MockShardStatus::FETCHED)); mss->setShardStatus(allKeys, MockShardStatus::INFLIGHT); mss->setShardStatus(allKeys, MockShardStatus::FETCHED); mss->setShardStatus(allKeys, MockShardStatus::COMPLETED); mss->coalesceCompletedRange(KeyRangeRef("a"_sr, "b"_sr)); ASSERT_EQ(mss->sumRangeSize(allKeys), 1400); ASSERT_EQ(mss->serverKeys.size(), 1); return Void(); } namespace { inline bool locationInfoEqualsToTeam(Reference loc, const std::vector& ids) { return loc->locations()->size() == ids.size() && std::all_of(ids.begin(), ids.end(), [loc](const UID& id) { return loc->locations()->hasInterface(id); }); } }; // namespace TEST_CASE("/MockGlobalState/MockStorageServer/GetKeyLocations") { BasicTestConfig testConfig; testConfig.simpleConfig = true; testConfig.minimumReplication = 1; testConfig.logAntiQuorum = 0; BasicSimulationConfig dbConfig = generateBasicSimulationConfig(testConfig); TraceEvent("UnitTestDBConfig").detail("Config", dbConfig.db.toString()); state std::shared_ptr mgs = std::make_shared(); mgs->initializeClusterLayout(dbConfig); mgs->initializeAsEmptyDatabaseMGS(dbConfig.db); // add one empty server mgs->addStorageServer(StorageServerInterface(mgs->indexToUID(mgs->allServers.size() + 1))); // define 3 ranges: // team 1 (UID 1,2,...,n-1):[begin, 1.0), [2.0, end) // team 2 (UID 2,3,...n-1, n): [1.0, 2.0) ShardsAffectedByTeamFailure::Team team1, team2; for (int i = 0; i < mgs->allServers.size() - 1; ++i) { UID id = mgs->indexToUID(i + 1); team1.servers.emplace_back(id); id = mgs->indexToUID(i + 2); team2.servers.emplace_back(id); } Key one = doubleToTestKey(1.0), two = doubleToTestKey(2.0); std::vector ranges{ KeyRangeRef(allKeys.begin, one), KeyRangeRef(one, two), KeyRangeRef(two, allKeys.end) }; mgs->shardMapping->assignRangeToTeams(ranges[0], { team1 }); mgs->shardMapping->assignRangeToTeams(ranges[1], { team2 }); mgs->shardMapping->assignRangeToTeams(ranges[2], { team1 }); // query key location MockGlobalStateTester tester; // -- team 1 Key testKey = doubleToTestKey(0.5); auto locInfo = tester.getKeyLocationInfo(testKey, mgs); ASSERT(locationInfoEqualsToTeam(locInfo.locations, team1.servers)); // -- team 2 testKey = doubleToTestKey(1.3); locInfo = tester.getKeyLocationInfo(testKey, mgs); ASSERT(locationInfoEqualsToTeam(locInfo.locations, team2.servers)); // query range location testKey = doubleToTestKey(3.0); // team 1,2,1 auto locInfos = tester.getKeyRangeLocations(KeyRangeRef(allKeys.begin, testKey), 100, mgs); ASSERT(locInfos.size() == 3); ASSERT(locInfos[0].range == ranges[0]); ASSERT(locationInfoEqualsToTeam(locInfos[0].locations, team1.servers)); ASSERT(locInfos[1].range == ranges[1]); ASSERT(locationInfoEqualsToTeam(locInfos[1].locations, team2.servers)); ASSERT(locInfos[2].range == KeyRangeRef(ranges[2].begin, testKey)); ASSERT(locationInfoEqualsToTeam(locInfos[2].locations, team1.servers)); // team 1,2 locInfos = tester.getKeyRangeLocations(KeyRangeRef(allKeys.begin, testKey), 2, mgs); ASSERT(locInfos.size() == 2); ASSERT(locInfos[0].range == ranges[0]); ASSERT(locationInfoEqualsToTeam(locInfos[0].locations, team1.servers)); ASSERT(locInfos[1].range == ranges[1]); ASSERT(locationInfoEqualsToTeam(locInfos[1].locations, team2.servers)); return Void(); } TEST_CASE("/MockGlobalState/MockStorageServer/WaitStorageMetricsRequest") { BasicTestConfig testConfig; testConfig.simpleConfig = true; testConfig.minimumReplication = 1; testConfig.logAntiQuorum = 0; BasicSimulationConfig dbConfig = generateBasicSimulationConfig(testConfig); TraceEvent("UnitTestDBConfig").detail("Config", dbConfig.db.toString()); state std::shared_ptr mgs = std::make_shared(); mgs->initializeClusterLayout(dbConfig); mgs->initializeAsEmptyDatabaseMGS(dbConfig.db); std::for_each(mgs->allServers.begin(), mgs->allServers.end(), [](auto& server) { server.second->metrics.byteSample.sample.insert("something"_sr, 500000); }); state Future allServerFutures = waitForAll(mgs->runAllMockServers()); KeyRange testRange = allKeys; ShardSizeBounds bounds = ShardSizeBounds::shardSizeBoundsBeforeTrack(); std::pair, int> res = wait(mgs->waitStorageMetrics(testRange, bounds.min, bounds.max, bounds.permittedError, 1, 1)); // std::cout << "get result " << res.second << "\n"; // std::cout << "get byte "<< res.first.get().bytes << "\n"; ASSERT_EQ(res.second, -1); // the valid result always return -1, strange contraction though. ASSERT_EQ(res.first.get().bytes, 500000); return Void(); } TEST_CASE("/MockGlobalState/MockStorageServer/DataOpsSet") { BasicTestConfig testConfig; testConfig.simpleConfig = true; testConfig.minimumReplication = 1; testConfig.logAntiQuorum = 0; BasicSimulationConfig dbConfig = generateBasicSimulationConfig(testConfig); TraceEvent("UnitTestDBConfig").detail("Config", dbConfig.db.toString()); state std::shared_ptr mgs = std::make_shared(); mgs->initializeClusterLayout(dbConfig); mgs->initializeAsEmptyDatabaseMGS(dbConfig.db); state Future allServerFutures = waitForAll(mgs->runAllMockServers()); // insert { mgs->set("a"_sr, 1 * SERVER_KNOBS->BYTES_WRITTEN_UNITS_PER_SAMPLE, true); mgs->set("b"_sr, 2 * SERVER_KNOBS->BYTES_WRITTEN_UNITS_PER_SAMPLE, true); mgs->set("c"_sr, 3 * SERVER_KNOBS->BYTES_WRITTEN_UNITS_PER_SAMPLE, true); for (auto& server : mgs->allServers) { ASSERT_EQ(server.second->usedDiskSpace, 3 + 6 * SERVER_KNOBS->BYTES_WRITTEN_UNITS_PER_SAMPLE); ASSERT_EQ(server.second->serverKeys[""_sr].shardSize, 3 + 6 * SERVER_KNOBS->BYTES_WRITTEN_UNITS_PER_SAMPLE); } ShardSizeBounds bounds = ShardSizeBounds::shardSizeBoundsBeforeTrack(); std::pair, int> res = wait( mgs->waitStorageMetrics(KeyRangeRef("a"_sr, "bc"_sr), bounds.min, bounds.max, bounds.permittedError, 1, 1)); int64_t testSize = 2 + 3 * SERVER_KNOBS->BYTES_WRITTEN_UNITS_PER_SAMPLE; // SOMEDAY: how to integrate with isKeyValueInSample() better? if (res.first.get().bytes > 0) { // If sampled ASSERT_EQ(res.first.get().bytes, testSize); ASSERT_GT(res.first.get().bytesWrittenPerKSecond, 0); } } return Void(); }